[BAHIR-155] TTL to HSET and SETEX command (#66)
- Adds Possibility to include TTL to a HASH in HSET operation.
- Adds SETEX command.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index e468772..e6fb355 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -92,6 +92,13 @@
* {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET}
*/
private String additionalKey;
+
+ /**
+ * This additional time to live is optional for {@link RedisDataType#HASH} and required for {@link RedisCommand#SETEX}.
+ * It sets the TTL for a specific key.
+ */
+ private Integer additionalTTL;
+
private RedisMapper<IN> redisSinkMapper;
private RedisCommand redisCommand;
@@ -113,7 +120,9 @@
this.redisSinkMapper = redisSinkMapper;
RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
+
this.redisCommand = redisCommandDescription.getCommand();
+ this.additionalTTL = redisCommandDescription.getAdditionalTTL();
this.additionalKey = redisCommandDescription.getAdditionalKey();
}
@@ -121,7 +130,7 @@
* Called when new data arrives to the sink, and forwards it to Redis channel.
* Depending on the specified Redis data type (see {@link RedisDataType}),
* a different Redis command will be applied.
- * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.
+ * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, SETEX, PFADD, HSET, ZADD.
*
* @param input The incoming data
*/
@@ -129,7 +138,9 @@
public void invoke(IN input, Context context) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
+
Optional<String> optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
+ Optional<Integer> optAdditionalTTL = redisSinkMapper.getAdditionalTTL(input);
switch (redisCommand) {
case RPUSH:
@@ -144,6 +155,9 @@
case SET:
this.redisCommandsContainer.set(key, value);
break;
+ case SETEX:
+ this.redisCommandsContainer.setex(key, value, optAdditionalTTL.orElse(this.additionalTTL));
+ break;
case PFADD:
this.redisCommandsContainer.pfadd(key, value);
break;
@@ -157,7 +171,8 @@
this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), key);
break;
case HSET:
- this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value);
+ this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value,
+ optAdditionalTTL.orElse(this.additionalTTL));
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 8b18578..886b94f 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -57,9 +57,12 @@
}
@Override
- public void hset(final String key, final String hashField, final String value) {
+ public void hset(final String key, final String hashField, final String value, final Integer ttl) {
try {
jedisCluster.hset(key, hashField, value);
+ if (ttl != null) {
+ jedisCluster.expire(key, ttl);
+ }
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command HSET to hash {} of key {} error message {}",
@@ -135,6 +138,19 @@
}
@Override
+ public void setex(final String key, final String value, final Integer ttl) {
+ try {
+ jedisCluster.setex(key, ttl, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
public void pfadd(final String key, final String element) {
try {
jedisCluster.pfadd(key, element);
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 5d7993c..486784b 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -32,15 +32,17 @@
void open() throws Exception;
/**
- * Sets field in the hash stored at key to value.
+ * Sets field in the hash stored at key to value, with TTL, if needed.
+ * Setting expire time to key is optional.
* If key does not exist, a new key holding a hash is created.
* If field already exists in the hash, it is overwritten.
*
* @param key Hash name
* @param hashField Hash field
* @param value Hash value
+ * @param ttl Hash expire time
*/
- void hset(String key, String hashField, String value);
+ void hset(String key, String hashField, String value, Integer ttl);
/**
* Insert the specified value at the tail of the list stored at key.
@@ -89,6 +91,17 @@
void set(String key, String value);
/**
+ * Set key to hold the string value, with a time to live (TTL). If key already holds a value,
+ * it is overwritten, regardless of its type. Any previous time to live associated with the key is
+ * reset on successful SETEX operation.
+ *
+ * @param key the key name in which value to be set
+ * @param value the value
+ * @param ttl time to live (TTL)
+ */
+ void setex(String key, String value, Integer ttl);
+
+ /**
* Adds all the element arguments to the HyperLogLog data structure
* stored at the variable name specified as first argument.
*
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index b862ea4..4af84d1 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -86,11 +86,14 @@
}
@Override
- public void hset(final String key, final String hashField, final String value) {
+ public void hset(final String key, final String hashField, final String value, final Integer ttl) {
Jedis jedis = null;
try {
jedis = getInstance();
jedis.hset(key, hashField, value);
+ if (ttl != null) {
+ jedis.expire(key, ttl);
+ }
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}",
@@ -188,6 +191,23 @@
}
@Override
+ public void setex(final String key, final String value, final Integer ttl) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.setex(key, ttl, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
public void pfadd(final String key, final String element) {
Jedis jedis = null;
try {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 019ad46..d465e83 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -46,6 +46,12 @@
SET(RedisDataType.STRING),
/**
+ * Set key to hold the string value, with a time to live (TTL). If key already holds a value,
+ * it is overwritten, regardless of its type.
+ */
+ SETEX(RedisDataType.STRING),
+
+ /**
* Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
*/
PFADD(RedisDataType.HYPER_LOG_LOG),
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
index 6ab329f..3284361 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -25,7 +25,9 @@
* you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}.
* If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException}
*
- * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
+ * If command is {@link RedisCommand#SETEX}, its required to use TTL. The proper constructor is {@link #RedisCommandDescription(RedisCommand, Integer)}.
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}, also not {@link RedisCommand#SETEX},
* you can use second constructor {@link #RedisCommandDescription(RedisCommand)}
*/
public class RedisCommandDescription implements Serializable {
@@ -38,7 +40,7 @@
* This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
* Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
* But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
- * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. Its possible to use TTL.
* {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH}
* <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
* {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET}
@@ -46,15 +48,28 @@
private String additionalKey;
/**
- * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
- * If different data type is specified, {@code additionalKey} is ignored.
- * @param redisCommand the redis command type {@link RedisCommand}
- * @param additionalKey additional key for Hash and Sorted set data type
+ * This additional key is optional for the group {@link RedisDataType#HASH}, required for {@link RedisCommand#SETEX}.
+ * For the other types and commands, its not used.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. Its possible to use TTL.
+ * {@link #getAdditionalTTL()} used as time to live (TTL) for {@link RedisDataType#HASH}
+ * <p>For {@link RedisCommand#SETEX}, we need key, value and time to live (TTL).
*/
- public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
+ private Integer additionalTTL;
+
+ /**
+ * Default constructor for {@link RedisCommandDescription}.
+ * For {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} data types, {@code additionalKey} is required.
+ * For {@link RedisCommand#SETEX} command, {@code additionalTTL} is required.
+ * In both cases, if the respective variables are not provided, it throws an {@link IllegalArgumentException}
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalKey additional key for Hash data type
+ * @param additionalTTL additional TTL optional for Hash data type
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, String additionalKey, Integer additionalTTL) {
Objects.requireNonNull(redisCommand, "Redis command type can not be null");
this.redisCommand = redisCommand;
this.additionalKey = additionalKey;
+ this.additionalTTL = additionalTTL;
if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
@@ -62,6 +77,32 @@
throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
}
}
+
+ if (redisCommand.equals(RedisCommand.SETEX)) {
+ if (additionalTTL == null) {
+ throw new IllegalArgumentException("SETEX command should have time to live (TTL)");
+ }
+ }
+ }
+
+ /**
+ * Use this constructor when data type is {@link RedisDataType#HASH} (without TTL) or {@link RedisDataType#SORTED_SET}.
+ * If different data type is specified, {@code additionalKey} is ignored.
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalKey additional key for Hash and Sorted set data type
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
+ this(redisCommand, additionalKey, null);
+ }
+
+ /**
+ * Use this constructor when using SETEX command {@link RedisDataType#STRING}.
+ * This command requires a TTL. Throws {@link IllegalArgumentException} if it is null.
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalTTL additional TTL required for SETEX command
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, Integer additionalTTL) {
+ this(redisCommand, null, additionalTTL);
}
/**
@@ -70,7 +111,7 @@
* @param redisCommand the redis data type {@link RedisCommand}
*/
public RedisCommandDescription(RedisCommand redisCommand) {
- this(redisCommand, null);
+ this(redisCommand, null, null);
}
/**
@@ -87,7 +128,12 @@
*
* @return the additional key
*/
- public String getAdditionalKey() {
- return additionalKey;
- }
+ public String getAdditionalKey() { return additionalKey; }
+
+ /**
+ * Returns the additional time to live (TTL) if data type is {@link RedisDataType#HASH}.
+ *
+ * @return the additional TTL
+ */
+ public Integer getAdditionalTTL() { return additionalTTL; }
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
index 96df75e..1481d7d 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -75,4 +75,15 @@
default Optional<String> getAdditionalKey(T data) {
return Optional.empty();
}
+
+ /**
+ * Extracts the additional time to live (TTL) for data as an {@link Optional<Integer>}.
+ * The default implementation returns an empty Optional.
+ *
+ * @param data
+ * @return Optional
+ */
+ default Optional<Integer> getAdditionalTTL(T data) {
+ return Optional.empty();
+ }
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 47544f7..ee1cc7f 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -29,6 +29,8 @@
import org.junit.Test;
import redis.clients.jedis.Jedis;
+import java.util.Optional;
+
import static org.junit.Assert.assertEquals;
public class RedisSinkITCase extends RedisITCaseBase {
@@ -36,8 +38,13 @@
private FlinkJedisPoolConfig jedisPoolConfig;
private static final Long NUM_ELEMENTS = 20L;
private static final Long ZERO = 0L;
+ private static final Long REDIS_NOT_ASSOCIATED_EXPIRE_FLAG = -1L;
+ private static final Long REDIS_KEY_NOT_EXISTS_FLAG = -2L;
+ private static final Long REDIS_TTL_IN_SECS = 1L;
private static final String REDIS_KEY = "TEST_KEY";
private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
+ private static final String TEST_MESSAGE = "TEST_MESSAGE";
+ private static final Long TEST_MESSAGE_LENGTH = (long) TEST_MESSAGE.length();
StreamExecutionEnvironment env;
@@ -82,6 +89,21 @@
}
@Test
+ public void testRedisStringDataTypeWithTTL() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionString());
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ new RedisCommandMapperWithTTL(RedisCommand.SETEX));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Set Data Type With TTL");
+
+ assertEquals(TEST_MESSAGE_LENGTH, jedis.strlen(REDIS_KEY));
+ assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_KEY));
+
+ jedis.del(REDIS_KEY);
+ }
+
+ @Test
public void testRedisHyperLogLogDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
@@ -127,6 +149,37 @@
env.execute("Test Redis Hash Data Type");
assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG, jedis.ttl(REDIS_ADDITIONAL_KEY));
+
+ jedis.del(REDIS_ADDITIONAL_KEY);
+ }
+
+ @Test
+ public void testRedisHashDataTypeWithTTL() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ new RedisAdditionalTTLMapper(RedisCommand.HSET));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Hash Data Type");
+
+ assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+
+ jedis.del(REDIS_ADDITIONAL_KEY);
+ }
+
+ @Test
+ public void testRedisHashDataTypeWithTTLFromOpt() throws Exception {
+ DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ new RedisAdditionalTTLMapperFromOpt(RedisCommand.HSET));
+
+ source.addSink(redisSink);
+ env.execute("Test Redis Hash Data Type 2");
+
+ assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
jedis.del(REDIS_ADDITIONAL_KEY);
}
@@ -156,6 +209,24 @@
}
}
+ private static class TestSourceFunctionString implements SourceFunction<Tuple2<String, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+ if (running) {
+ ctx.collect(new Tuple2<>(REDIS_KEY, TEST_MESSAGE));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String, String>> {
private static final long serialVersionUID = 1L;
@@ -216,6 +287,30 @@
}
}
+ public static class RedisCommandMapperWithTTL implements RedisMapper<Tuple2<String, String>> {
+
+ private RedisCommand redisCommand;
+
+ RedisCommandMapperWithTTL(RedisCommand redisCommand){
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand, REDIS_TTL_IN_SECS.intValue());
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+ }
+
public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String, String>> {
private RedisCommand redisCommand;
@@ -239,4 +334,57 @@
return data.f1;
}
}
+
+ public static class RedisAdditionalTTLMapper implements RedisMapper<Tuple2<String, String>> {
+
+ private RedisCommand redisCommand;
+
+ RedisAdditionalTTLMapper(RedisCommand redisCommand){
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY, REDIS_TTL_IN_SECS.intValue());
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+ }
+
+ public static class RedisAdditionalTTLMapperFromOpt implements RedisMapper<Tuple2<String, String>> {
+
+ private RedisCommand redisCommand;
+
+ RedisAdditionalTTLMapperFromOpt(RedisCommand redisCommand){
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY, null);
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<String, String> data) {
+ return data.f0;
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<String, String> data) {
+ return data.f1;
+ }
+
+ @Override
+ public Optional<Integer> getAdditionalTTL(Tuple2<String, String> data) {
+ return Optional.of(REDIS_TTL_IN_SECS.intValue());
+ }
+ }
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
index 4af0c14..d268c95 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -31,6 +31,12 @@
redisCommandMapper.getCommandDescription();
}
+ @Test(expected=IllegalArgumentException.class)
+ public void shouldThrowExceptionIfAdditionalTTLIsNotGivenForStringDataTypeWithTTL(){
+ RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.SETEX);
+ redisCommandMapper.getCommandDescription();
+ }
+
@Test
public void shouldReturnNullForAdditionalDataType(){
RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);