blob: e6fb35509970d021eca4d293d0669200e65fd475 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.redis;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
/**
* A sink that delivers data to a Redis channel using the Jedis client.
* <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}.
* <p> When {@link FlinkJedisPoolConfig} is passed as the first argument,
* the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when
* you want to connect to a single Redis server.
* <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection
* using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel.
* <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to
* a Redis Cluster.
*
* <p>Example:
*
* <pre>
*{@code
*public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
*
* private RedisCommand redisCommand;
*
* public RedisExampleMapper(RedisCommand redisCommand){
* this.redisCommand = redisCommand;
* }
* public RedisCommandDescription getCommandDescription() {
* return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
* }
* public String getKeyFromData(Tuple2<String, String> data) {
* return data.f0;
* }
* public String getValueFromData(Tuple2<String, String> data) {
* return data.f1;
* }
*}
*JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
* .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
*new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH));
*}</pre>
*
* @param <IN> Type of the elements emitted by this sink
*/
public class RedisSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
/**
* This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
* Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
* But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
* <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
* {@code additionalKey} used as hash name for {@link RedisDataType#HASH}
* <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
* {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET}
*/
private String additionalKey;
/**
* This additional time to live is optional for {@link RedisDataType#HASH} and required for {@link RedisCommand#SETEX}.
* It sets the TTL for a specific key.
*/
private Integer additionalTTL;
private RedisMapper<IN> redisSinkMapper;
private RedisCommand redisCommand;
private FlinkJedisConfigBase flinkJedisConfigBase;
private RedisCommandsContainer redisCommandsContainer;
/**
* Creates a new {@link RedisSink} that connects to the Redis server.
*
* @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase}
* @param redisSinkMapper This is used to generate Redis command and key value from incoming elements.
*/
public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null");
Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
this.flinkJedisConfigBase = flinkJedisConfigBase;
this.redisSinkMapper = redisSinkMapper;
RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
this.redisCommand = redisCommandDescription.getCommand();
this.additionalTTL = redisCommandDescription.getAdditionalTTL();
this.additionalKey = redisCommandDescription.getAdditionalKey();
}
/**
* Called when new data arrives to the sink, and forwards it to Redis channel.
* Depending on the specified Redis data type (see {@link RedisDataType}),
* a different Redis command will be applied.
* Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, SETEX, PFADD, HSET, ZADD.
*
* @param input The incoming data
*/
@Override
public void invoke(IN input, Context context) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
Optional<String> optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
Optional<Integer> optAdditionalTTL = redisSinkMapper.getAdditionalTTL(input);
switch (redisCommand) {
case RPUSH:
this.redisCommandsContainer.rpush(key, value);
break;
case LPUSH:
this.redisCommandsContainer.lpush(key, value);
break;
case SADD:
this.redisCommandsContainer.sadd(key, value);
break;
case SET:
this.redisCommandsContainer.set(key, value);
break;
case SETEX:
this.redisCommandsContainer.setex(key, value, optAdditionalTTL.orElse(this.additionalTTL));
break;
case PFADD:
this.redisCommandsContainer.pfadd(key, value);
break;
case PUBLISH:
this.redisCommandsContainer.publish(key, value);
break;
case ZADD:
this.redisCommandsContainer.zadd(optAdditionalKey.orElse(this.additionalKey), value, key);
break;
case ZREM:
this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), key);
break;
case HSET:
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value,
optAdditionalTTL.orElse(this.additionalTTL));
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
}
/**
* Initializes the connection to Redis by either cluster or sentinels or single server.
*
* @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
*/
@Override
public void open(Configuration parameters) throws Exception {
try {
this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
this.redisCommandsContainer.open();
} catch (Exception e) {
LOG.error("Redis has not been properly initialized: ", e);
throw e;
}
}
/**
* Closes commands container.
* @throws IOException if command container is unable to close.
*/
@Override
public void close() throws IOException {
if (redisCommandsContainer != null) {
redisCommandsContainer.close();
}
}
}