IM系统redis迁移到codis实践

公司决定使用自己搭建的codis集群代替阿里云的redis,二者性能差别不大,但是自运维的codis成本更低,同时也借这次迁移来规范key的命名,统一命名成”系统(必须):子系统(必须):存储类型(可选,cache|storage):数据(必须):版本(可选):其它(可选)”的形式,方便维护。我负责IM即时通信系统的redis迁移。IM使用redis的地方很多,并且数据量很大,例如消息拉链、联系人、最近一条消息、未读消息数、黑名单、频率及敏感词限制、Token等,而消息拉链、最近一条消息和未读消息数是进行持久存储的,因此这些数据需要进行迁移。

一、方案

为了不影响用户的使用,代码上线定在凌晨一点。数据迁移到codis的同时还要修改key,因此不能直接使用redis-port等工具将旧的redis里面的数据同步到新的codis集群,最终决定写脚本进行数据迁移。先使用keys命令模糊匹配出旧redis里的所有key,取出数据然后设置新的key批量保存到codis集群中。消息拉链的数据量很大,可以先进行一次全量的数据迁移,代码上线之前再进行一次增量数据迁移。最近一条消息和未读消息数数据时效性比较大,也在代码上线前进行同步。具体步骤如下:

  • 将代码中记录redis key的常量文件备份一份,将原来的key修改成规范形式,并添加codis的配置文件及连接方法,写脚本通过备份的redis key文件从旧数据库读数据,然后批量写入codis集群;
  • 在beta机器上跑脚本全量迁移消息拉链数据;
  • 上线前IM系统停止服务,避免在上线过程中有数据写入;
  • 从MySQL数据库取出全量迁移消息拉链之后新增的数据,添加到codis,并迁移最近一条消息和未读消息数的数据;
  • 通过查看日志、自测等方法验证数据迁移是否成功;
  • 代码上线;
  • QA进行回归测试

二、遇到的问题及解决方法

  1. 数据迁移脚本运行时间很长(全量迁移消息拉链的脚本运行了5个小时左右),如果直接运行脚本,断网或者终端与服务器连接断开会导致脚本停止运行
    解决方法:使用nohup在后台运行脚本,如下所示,并且在退出服务器时要通过exit命令退出,不能直接关掉,否则进程也会中止。

    1
    nohup php cli.php redis_migrate migrateTotal &>> /apps2/tmp/im/redis_migrate_migrateTotal.log 2>&1 &
  2. 日志文件太大(最大的一个有1.8G),beta机器的一个磁盘打满了,导致进程结束
    解决方法:通过df命令查看磁盘空间使用情况,选择剩余空间比较大的磁盘打日志,这里选择/dev/xvdc1,即/apps2文件夹下面。
    df结果

  3. 一次mset太多键值对数据,报Segmentation fault,如图:
    Segmentation fault
    解决方法:先对数据进行分组,如5000个键值对为一组,然后循环进行mset,代码见第三部分。

  4. 从MySQL数据库查消息拉链的增量数据,报timeout exception,如图:
    slow query
    根据日志可以看出这是一个慢查询,sql语句如下,查看表结构发现update_time字段没有建索引,而且数据量很大(查了下一对一聊天记录表有36133336条数据),所以查询慢,而另外一个字段create_time建了联合索引,根据最左前缀法则,使用create_time代替update_time将用到索引,问题得到解决。

    1
    2
    3
    4
    5
    6
    7
    8
    -- 慢查询sql
    SELECT * FROM im_message_info_single WHERE update_time > "2017-04-25 23:07:51";

    -- create_time和sender_r字段建立的联合索引
    KEY `idx_createtime_send_r` (`create_time`,`sender_r`)

    -- 优化之后的sql
    SELECT * FROM im_message_info_single WHERE create_time > "2017-04-25 23:07:51";
  5. 使用keys命令获取所有用户一对一聊天未读消息数的key时,redis报”read error on connection”错误,如图:
    read error on connection
    google了一下发现是数据量太大,读取超时的原因,通过setOption命令设置不超时解决,参见:https://nil.sh/RedisException-with-message-read-error-on-connection.html

  6. 迁完之后发现codis里找不到数据,可能是db不对,默认是db 0,通过select命令选择数据所在db之后再查看。

三、相关代码

所有业务的消息拉链都是采用sorted set结构,所有业务的最后一条消息都是采用string结构,社区、考研、金囿、群组这几个业务的未读消息数采用hash结构,系统未读消息采用sorted set结构,一对一聊天的未读消息数采用string结构,sorted set、string和hash这三种结构的数据分别先通过keys命令获取保存所有key的数组,然后调用以下函数(略去Action函数)将数据迁移到codis。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class RedisMigrateTask extends \Phalcon\Cli\Task {

const CHUNK_SIZE = 5000; // 数据分组大小

public $redisInstance = null; // redis实例
public $codisInstance = null; // codis实例

/**
* 批量迁移sorted set数据
* @param $oldKeys array 存储旧key的数组
* @param $oldPrefix string 旧key前缀
* @param $newPrefix string 新key前缀
*/
public function migrateSortedSetData($oldKeys, $oldPrefix, $newPrefix) {
if ($oldKeys === false) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] get keys [{$oldPrefix}*] failed!" . PHP_EOL;
} elseif (empty($oldKeys)) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] keys [{$oldPrefix}*] are empty!" . PHP_EOL;
} else {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] get keys [{$oldPrefix}*] successfully!" . PHP_EOL;
foreach ($oldKeys as $oldKey) {
$newKey = $this->getNewKey($oldKey, $oldPrefix, $newPrefix);
$sortedSetData = $this->redisInstance->zRange($oldKey, 0, -1, true);
$items = [];
$logArr = [];
foreach ($sortedSetData as $val => $score) {
$items[] = $score;
$items[] = $val;
$logArr[] = $score . ':' . $val;
}
array_unshift($items, $newKey);
$num = call_user_func_array([$this->codisInstance, 'zAdd'], $items); // 添加成功个数
if ($num === false) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] old key [{$oldKey}] failed to migrate to codis! old score=>value: " . join(',', $logArr) . PHP_EOL;
} elseif ($num == 0) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] old key [{$oldKey}] has existed and is the same! old score=>value: " . join(',', $logArr) . PHP_EOL;
} else {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] old key [{$oldKey}] with {$num} values was migrated to codis successfully! old score=>value: " . join(',', $logArr) . PHP_EOL;
}
}
}
}

/**
* 分组批量迁移string数据。数据太多全量用mset会报Segmentation fault, 分组mget, mset
* @param $oldKeys
* @param $oldPrefix
* @param $newPrefix
*/
public function migrateKvDataChunk($oldKeys, $oldPrefix, $newPrefix) {
if ($oldKeys === false) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] get keys [{$oldPrefix}*] failed!" . PHP_EOL;
} elseif (empty($oldKeys)) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] keys [{$oldPrefix}*] are empty!" . PHP_EOL;
} else {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] get keys [{$oldPrefix}*] successfully!" . PHP_EOL;
$oldKeysChunk = array_chunk($oldKeys, self::CHUNK_SIZE);
foreach ($oldKeysChunk as $oldkeysGroup) {
$valuesGroup = $this->redisInstance->mget($oldkeysGroup);
$valuesGroupAssoc = [];
foreach ($oldkeysGroup as $idx => $oldKey) {
$newKey = $this->getNewKey($oldKey, $oldPrefix, $newPrefix);
$valuesGroupAssoc[$newKey] = $valuesGroup[$idx];
}
$ret = $this->codisInstance->mset($valuesGroupAssoc); // bool值
if ($ret) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] old kvs was migrated to codis successfully! old keys: " . join(',', $oldkeysGroup) . PHP_EOL;
} else {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] old kvs failed to migrate to codis! old keys: " . join(',', $oldkeysGroup) . PHP_EOL;
}
}
}
}

/**
* 批量迁移hash数据
* @param $oldKeys
* @param $oldPrefix
* @param $newPrefix
*/
public function migrateHashData($oldKeys, $oldPrefix, $newPrefix) {
if ($oldKeys === false) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] get keys [{$oldPrefix}*] failed!" . PHP_EOL;
} elseif (empty($oldKeys)) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] keys [{$oldPrefix}*] are empty!" . PHP_EOL;
} else {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] get keys [{$oldPrefix}*] successfully!" . PHP_EOL;
foreach ($oldKeys as $oldKey) {
$hashData = $this->redisInstance->hGetAll($oldKey);
$newKey = $this->getNewKey($oldKey, $oldPrefix, $newPrefix);
$ret = $this->codisInstance->hMset($newKey, $hashData); // bool值
$logArr = [];
foreach ($hashData as $hashKey => $hashVal) {
$logArr[] = $hashKey . ':' . $hashVal;
}
if ($ret) {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] old hash key [{$oldKey}] was migrated to codis successfully! old hash data: " . join(',', $logArr) . PHP_EOL;
} else {
echo __METHOD__ . ' [' . date('Y-m-d H:i:s') . "] old hash key [{$oldKey}] failed to migrate to codis! old hash data: " . join(',', $logArr) . PHP_EOL;
}
}
}
}

/**
* 生成新的key
* @param $oldKey string 旧redis的key
* @param $oldPrefix string 旧key前缀
* @param $newPrefix string 新key前缀
* @return string
*/
public function getNewKey($oldKey, $oldPrefix, $newPrefix) {
$offset = strlen($oldPrefix);
$suffix = substr($oldKey, $offset);
$newKey = $newPrefix . $suffix;
return $newKey;
}
}