Rocketmq redis replicator implement Redis Replication protocol written in java. It can parse, filter, broadcast the RDB and AOF events in a real time manner and downstream these event to RocketMQ.
+-------+ PSNC +--------------+ | |<--------------| | event +--------------+ | Redis | | |------------->| | | |-------------->|Rocketmq-redis| event | | +-------+ data | (parse data) |------------->| Rocketmq | | | event | | | |------------->| | +--------------+ +--------------+
jdk 1.8+
maven-3.3.1+
redis 2.6 - 5.0.x
rocketmq 4.2.0 or higher
$mvn clean install package -Dmaven.test.skip=true
Configure configure = new Configure(); Replicator replicator = new RocketMQRedisReplicator(configure); final RocketMQRedisProducer producer = new RocketMQRedisProducer(configure); producer.open(); replicator.addEventListener(new EventListener() { @Override public void onEvent(Replicator replicator, Event event) { try { if (!producer.send(event)) { LOGGER.error("Failed to send Event"); } } catch (Exception e) { LOGGER.error("Failed to send Event", e); } } }); replicator.addCloseListener(new CloseListener() { @Override public void handle(Replicator replicator) { producer.close(); } }); replicator.open();
mvn clean package -Dmaven.test.skipsh target/rocketmq-redis-pack/bin/start.sh
Configure configure = new Configure(); RocketMQRedisConsumer consumer = new RocketMQRedisConsumer(configure); consumer.addEventListener(new EventListener() { @Override public void onEvent(Event event) { if (event instanceof PreRdbSyncEvent) { // pre rdb sync // your code goes here } else if (event instanceof AuxField) { // rdb aux field event // your code goes here } else if (event instanceof KeyValuePair) { // rdb event // your code goes here } else if (event instanceof PostRdbSyncEvent) { // post full sync // your code goes here } else if (event instanceof Command) { // aof command event // your code goes here } else if (event instanceof PreCommandSyncEvent) { // pre command sync // your code goes here } else if (event instanceof PostCommandSyncEvent) { // post command sync // your code goes here } } }); consumer.open();
The config file located at target/rocketmq-redis-pack/conf/replicator.conf
| parameter | default value | detail |
|---|---|---|
| rocketmq.nameserver.address | 127.0.0.1:9876 | rocketmq server address |
| rocketmq.producer.groupname | REDIS_REPLICATOR_PRODUCER_GROUP | rocketmq producer group name |
| rocketmq.consumer.groupname | REDIS_REPLICATOR_CONSUMER_GROUP | rocketmq consumer group name |
| rocketmq.data.topic | redisdata | rocketmq topic name |
| deploy.model | single | single or cluster |
| zookeeper.address | 127.0.0.1:2181 | run on cluster model |
| redis.uri | redis://127.0.0.1:6379 | the uri of redis master which replicate from |
By default the configuration file replicator.conf loaded from your classpath.
But you can specify your own configuration using Configure like following:
Properties properties = new Properties() properties.setProperty("zookeeper.address", "127.0.0.1:2181"); properties.setProperty("redis.uri", "redis://127.0.0.1:6379"); properties.setProperty("rocketmq.nameserver.address", "localhost:9876"); properties.setProperty("rocketmq.producer.groupname", "REDIS_REPLICATOR_PRODUCER_GROUP"); properties.setProperty("rocketmq.consumer.groupname", "REDIS_REPLICATOR_CONSUMER_GROUP"); properties.setProperty("rocketmq.data.topic", "redisdata"); properties.setProperty("deploy.model", "single"); Configure configure = new Configure(properties);
| commands | commands | commands | commands | commands | commands |
|---|---|---|---|---|---|
| PING | APPEND | SET | SETEX | MSET | DEL |
| SADD | HMSET | HSET | LSET | EXPIRE | EXPIREAT |
| GETSET | HSETNX | MSETNX | PSETEX | SETNX | SETRANGE |
| HDEL | UNLINK | SREM | LPOP | LPUSH | LPUSHX |
| LRem | RPOP | RPUSH | RPUSHX | ZREM | ZINTERSTORE |
| INCR | DECR | INCRBY | PERSIST | SELECT | FLUSHALL |
| FLUSHDB | HINCRBY | ZINCRBY | MOVE | SMOVE | BRPOPLPUSH |
| PFCOUNT | PFMERGE | SDIFFSTORE | RENAMENX | PEXPIREAT | SINTERSTORE |
| ZADD | BITFIELD | SUNIONSTORE | RESTORE | LINSERT | ZREMRANGEBYLEX |
| GEOADD | PEXPIRE | ZUNIONSTORE | EVAL | SCRIPT | ZREMRANGEBYRANK |
| PUBLISH | BITOP | SETBIT | SWAPDB | PFADD | ZREMRANGEBYSCORE |
| RENAME | MULTI | EXEC | LTRIM | RPOPLPUSH | SORT |
| EVALSHA | ZPOPMAX | ZPOPMIN | XACK | XADD | XCLAIM |
| XDEL | XGROUP | XTRIM | XSETID |
client-output-buffer-limit slave 0 0 0
WARNNING: this setting may run out of memory of redis server in some cases.
<Logger name="com.moilioncircle" level="info"> <AppenderRef ref="YourAppender"/> </Logger>
// redis uri "redis://127.0.0.1:6379?verbose=yes"
// redis uri "redis://127.0.0.1:6379?authPassword=foobared"
repl-backlog-size repl-backlog-ttl repl-ping-slave-periods
repl-ping-slave-period MUST less than readTimeout, default readTimeout is 30 seconds