Rocketmq redis replicator implement Redis Replication protocol written in java. It can parse, filter, broadcast the RDB and AOF events in a real time manner and downstream these event to RocketMQ.
+-------+ PSNC +--------------+ | |<--------------| | event +--------------+ | Redis | | |------------->| | | |-------------->|Rocketmq-redis| event | | +-------+ data | (parse data) |------------->| Rocketmq | | | event | | | |------------->| | +--------------+ +--------------+
jdk 1.8+
maven-3.3.1+
redis 2.6 - 5.0.x
rocketmq 4.2.0 or higher
$mvn clean install package -Dmaven.test.skip=true
Configure configure = new Configure(); Replicator replicator = new RocketMQRedisReplicator(configure); final RocketMQRedisProducer producer = new RocketMQRedisProducer(configure); producer.open(); replicator.addEventListener(new EventListener() { @Override public void onEvent(Replicator replicator, Event event) { try { if (!producer.send(event)) { LOGGER.error("Failed to send Event"); } } catch (Exception e) { LOGGER.error("Failed to send Event", e); } } }); replicator.addCloseListener(new CloseListener() { @Override public void handle(Replicator replicator) { producer.close(); } }); replicator.open();
mvn clean package -Dmaven.test.skip
sh target/rocketmq-redis-pack/bin/start.sh
Configure configure = new Configure(); RocketMQRedisConsumer consumer = new RocketMQRedisConsumer(configure); consumer.addEventListener(new EventListener() { @Override public void onEvent(Event event) { if (event instanceof PreRdbSyncEvent) { // pre rdb sync // your code goes here } else if (event instanceof AuxField) { // rdb aux field event // your code goes here } else if (event instanceof KeyValuePair) { // rdb event // your code goes here } else if (event instanceof PostRdbSyncEvent) { // post full sync // your code goes here } else if (event instanceof Command) { // aof command event // your code goes here } else if (event instanceof PreCommandSyncEvent) { // pre command sync // your code goes here } else if (event instanceof PostCommandSyncEvent) { // post command sync // your code goes here } } }); consumer.open();
The config file located at target/rocketmq-redis-pack/conf/replicator.conf
parameter | default value | detail |
---|---|---|
rocketmq.nameserver.address | 127.0.0.1:9876 | rocketmq server address |
rocketmq.producer.groupname | REDIS_REPLICATOR_PRODUCER_GROUP | rocketmq producer group name |
rocketmq.consumer.groupname | REDIS_REPLICATOR_CONSUMER_GROUP | rocketmq consumer group name |
rocketmq.data.topic | redisdata | rocketmq topic name |
deploy.model | single | single or cluster |
zookeeper.address | 127.0.0.1:2181 | run on cluster model |
redis.uri | redis://127.0.0.1:6379 | the uri of redis master which replicate from |
By default the configuration file replicator.conf
loaded from your classpath.
But you can specify your own configuration using Configure
like following:
Properties properties = new Properties() properties.setProperty("zookeeper.address", "127.0.0.1:2181"); properties.setProperty("redis.uri", "redis://127.0.0.1:6379"); properties.setProperty("rocketmq.nameserver.address", "localhost:9876"); properties.setProperty("rocketmq.producer.groupname", "REDIS_REPLICATOR_PRODUCER_GROUP"); properties.setProperty("rocketmq.consumer.groupname", "REDIS_REPLICATOR_CONSUMER_GROUP"); properties.setProperty("rocketmq.data.topic", "redisdata"); properties.setProperty("deploy.model", "single"); Configure configure = new Configure(properties);
commands | commands | commands | commands | commands | commands |
---|---|---|---|---|---|
PING | APPEND | SET | SETEX | MSET | DEL |
SADD | HMSET | HSET | LSET | EXPIRE | EXPIREAT |
GETSET | HSETNX | MSETNX | PSETEX | SETNX | SETRANGE |
HDEL | UNLINK | SREM | LPOP | LPUSH | LPUSHX |
LRem | RPOP | RPUSH | RPUSHX | ZREM | ZINTERSTORE |
INCR | DECR | INCRBY | PERSIST | SELECT | FLUSHALL |
FLUSHDB | HINCRBY | ZINCRBY | MOVE | SMOVE | BRPOPLPUSH |
PFCOUNT | PFMERGE | SDIFFSTORE | RENAMENX | PEXPIREAT | SINTERSTORE |
ZADD | BITFIELD | SUNIONSTORE | RESTORE | LINSERT | ZREMRANGEBYLEX |
GEOADD | PEXPIRE | ZUNIONSTORE | EVAL | SCRIPT | ZREMRANGEBYRANK |
PUBLISH | BITOP | SETBIT | SWAPDB | PFADD | ZREMRANGEBYSCORE |
RENAME | MULTI | EXEC | LTRIM | RPOPLPUSH | SORT |
EVALSHA | ZPOPMAX | ZPOPMIN | XACK | XADD | XCLAIM |
XDEL | XGROUP | XTRIM | XSETID |
client-output-buffer-limit slave 0 0 0
WARNNING: this setting may run out of memory of redis server in some cases.
<Logger name="com.moilioncircle" level="info"> <AppenderRef ref="YourAppender"/> </Logger>
// redis uri "redis://127.0.0.1:6379?verbose=yes"
// redis uri "redis://127.0.0.1:6379?authPassword=foobared"
repl-backlog-size repl-backlog-ttl repl-ping-slave-periods
repl-ping-slave-period
MUST less than readTimeout
, default readTimeout
is 30 seconds