package org.apache.flink.streaming.connectors.redis;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import static org.junit.Assert.assertEquals;
public class RedisSinkITCase extends RedisITCaseBase {
private FlinkJedisPoolConfig jedisPoolConfig;
private static final Long NUM_ELEMENTS = 20L;
private static final Long ZERO = 0L;
private static final String REDIS_KEY = "TEST_KEY";
private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
StreamExecutionEnvironment env;
private Jedis jedis;
public void setUp(){
jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
jedis = new Jedis(REDIS_HOST, REDIS_PORT);
env = StreamExecutionEnvironment.getExecutionEnvironment();
public void testRedisListDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
new RedisCommandMapper(RedisCommand.LPUSH));
env.execute("Test Redis List Data Type");
assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
public void testRedisSetDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
new RedisCommandMapper(RedisCommand.SADD));
env.execute("Test Redis Set Data Type");
assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
public void testRedisHyperLogLogDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
new RedisCommandMapper(RedisCommand.PFADD));
env.execute("Test Redis Hyper Log Log Data Type");
assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY)));
public void testRedisSortedSetDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
RedisSink<Tuple2<String, String>> redisZaddSink = new RedisSink<>(jedisPoolConfig,
new RedisAdditionalDataMapper(RedisCommand.ZADD));
env.execute("Test ZADD");
assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig,
new RedisAdditionalDataMapper(RedisCommand.ZREM));
env.execute("Test ZREM");
assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY));
public void testRedisHashDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
new RedisAdditionalDataMapper(RedisCommand.HSET));
env.execute("Test Redis Hash Data Type");
assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
public void tearDown(){
if(jedis != null){
private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
private static final long serialVersionUID = 1L;
private volatile boolean running = true;
public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
for (int i = 0; i < NUM_ELEMENTS && running; i++) {
ctx.collect(new Tuple2<>(REDIS_KEY, "message #" + i));
public void cancel() {
running = false;
private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String, String>> {
private static final long serialVersionUID = 1L;
private volatile boolean running = true;
public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
for (int i = 0; i < NUM_ELEMENTS && running; i++) {
ctx.collect(new Tuple2<>("" + i, "message #" + i));
public void cancel() {
running = false;
private static class TestSourceFunctionSortedSet implements SourceFunction<Tuple2<String, String>> {
private static final long serialVersionUID = 1L;
private volatile boolean running = true;
public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
for (int i = 0; i < NUM_ELEMENTS && running; i++) {
ctx.collect(new Tuple2<>( "message #" + i, "" + i));
public void cancel() {
running = false;
public static class RedisCommandMapper implements RedisMapper<Tuple2<String, String>> {
private RedisCommand redisCommand;
public RedisCommandMapper(RedisCommand redisCommand){
this.redisCommand = redisCommand;
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(redisCommand);
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String, String>> {
private RedisCommand redisCommand;
RedisAdditionalDataMapper(RedisCommand redisCommand){
this.redisCommand = redisCommand;
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;