[BAHIR-155] TTL to HSET and SETEX command (#66)

- Adds Possibility to include TTL to a HASH in HSET operation.
- Adds SETEX command.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index e468772..e6fb355 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -92,6 +92,13 @@
      * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET}
      */
     private String additionalKey;
+
+    /**
+     * This additional time to live is optional for {@link RedisDataType#HASH} and required for {@link RedisCommand#SETEX}.
+     * It sets the TTL for a specific key.
+     */
+    private Integer additionalTTL;
+
     private RedisMapper<IN> redisSinkMapper;
     private RedisCommand redisCommand;
 
@@ -113,7 +120,9 @@
 
         this.redisSinkMapper = redisSinkMapper;
         RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
+
         this.redisCommand = redisCommandDescription.getCommand();
+        this.additionalTTL = redisCommandDescription.getAdditionalTTL();
         this.additionalKey = redisCommandDescription.getAdditionalKey();
     }
 
@@ -121,7 +130,7 @@
      * Called when new data arrives to the sink, and forwards it to Redis channel.
      * Depending on the specified Redis data type (see {@link RedisDataType}),
      * a different Redis command will be applied.
-     * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.
+     * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, SETEX, PFADD, HSET, ZADD.
      *
      * @param input The incoming data
      */
@@ -129,7 +138,9 @@
     public void invoke(IN input, Context context) throws Exception {
         String key = redisSinkMapper.getKeyFromData(input);
         String value = redisSinkMapper.getValueFromData(input);
+
         Optional<String> optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
+        Optional<Integer> optAdditionalTTL = redisSinkMapper.getAdditionalTTL(input);
 
         switch (redisCommand) {
             case RPUSH:
@@ -144,6 +155,9 @@
             case SET:
                 this.redisCommandsContainer.set(key, value);
                 break;
+            case SETEX:
+                this.redisCommandsContainer.setex(key, value, optAdditionalTTL.orElse(this.additionalTTL));
+                break;
             case PFADD:
                 this.redisCommandsContainer.pfadd(key, value);
                 break;
@@ -157,7 +171,8 @@
                 this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), key);
                 break;
             case HSET:
-                this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value);
+                this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value,
+                        optAdditionalTTL.orElse(this.additionalTTL));
                 break;
             default:
                 throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 8b18578..886b94f 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -57,9 +57,12 @@
     }
 
     @Override
-    public void hset(final String key, final String hashField, final String value) {
+    public void hset(final String key, final String hashField, final String value, final Integer ttl) {
         try {
             jedisCluster.hset(key, hashField, value);
+            if (ttl != null) {
+                jedisCluster.expire(key, ttl);
+            }
         } catch (Exception e) {
             if (LOG.isErrorEnabled()) {
                 LOG.error("Cannot send Redis message with command HSET to hash {} of key {} error message {}",
@@ -135,6 +138,19 @@
     }
 
     @Override
+    public void setex(final String key, final String value, final Integer ttl) {
+        try {
+            jedisCluster.setex(key, ttl, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
     public void pfadd(final String key, final String element) {
         try {
             jedisCluster.pfadd(key, element);
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 5d7993c..486784b 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -32,15 +32,17 @@
     void open() throws Exception;
 
     /**
-     * Sets field in the hash stored at key to value.
+     * Sets field in the hash stored at key to value, with TTL, if needed.
+     * Setting expire time to key is optional.
      * If key does not exist, a new key holding a hash is created.
      * If field already exists in the hash, it is overwritten.
      *
      * @param key Hash name
      * @param hashField Hash field
      * @param value Hash value
+     * @param ttl Hash expire time
      */
-    void hset(String key, String hashField, String value);
+    void hset(String key, String hashField, String value, Integer ttl);
 
     /**
      * Insert the specified value at the tail of the list stored at key.
@@ -89,6 +91,17 @@
     void set(String key, String value);
 
     /**
+     * Set key to hold the string value, with a time to live (TTL). If key already holds a value,
+     * it is overwritten, regardless of its type. Any previous time to live associated with the key is
+     * reset on successful SETEX operation.
+     *
+     * @param key the key name in which value to be set
+     * @param value the value
+     * @param ttl time to live (TTL)
+     */
+    void setex(String key, String value, Integer ttl);
+
+    /**
      * Adds all the element arguments to the HyperLogLog data structure
      * stored at the variable name specified as first argument.
      *
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index b862ea4..4af84d1 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -86,11 +86,14 @@
     }
 
     @Override
-    public void hset(final String key, final String hashField, final String value) {
+    public void hset(final String key, final String hashField, final String value, final Integer ttl) {
         Jedis jedis = null;
         try {
             jedis = getInstance();
             jedis.hset(key, hashField, value);
+            if (ttl != null) {
+                jedis.expire(key, ttl);
+            }
         } catch (Exception e) {
             if (LOG.isErrorEnabled()) {
                 LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}",
@@ -188,6 +191,23 @@
     }
 
     @Override
+    public void setex(final String key, final String value, final Integer ttl) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.setex(key, ttl, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
     public void pfadd(final String key, final String element) {
         Jedis jedis = null;
         try {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 019ad46..d465e83 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -46,6 +46,12 @@
     SET(RedisDataType.STRING),
 
     /**
+     * Set key to hold the string value, with a time to live (TTL). If key already holds a value,
+     * it is overwritten, regardless of its type.
+     */
+    SETEX(RedisDataType.STRING),
+
+    /**
      * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
      */
     PFADD(RedisDataType.HYPER_LOG_LOG),
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
index 6ab329f..3284361 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -25,7 +25,9 @@
  * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}.
  * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException}
  *
- * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
+ * If command is {@link RedisCommand#SETEX}, its required to use TTL. The proper constructor is {@link #RedisCommandDescription(RedisCommand, Integer)}.
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}, also not {@link RedisCommand#SETEX},
  * you can use second constructor {@link #RedisCommandDescription(RedisCommand)}
  */
 public class RedisCommandDescription implements Serializable {
@@ -38,7 +40,7 @@
      * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
      * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
      * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
-     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. Its possible to use TTL.
      * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH}
      * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
      * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET}
@@ -46,15 +48,28 @@
     private String additionalKey;
 
     /**
-     * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
-     * If different data type is specified, {@code additionalKey} is ignored.
-     * @param redisCommand the redis command type {@link RedisCommand}
-     * @param additionalKey additional key for Hash and Sorted set data type
+     * This additional key is optional for the group {@link RedisDataType#HASH}, required for {@link RedisCommand#SETEX}.
+     * For the other types and commands, its not used.
+     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. Its possible to use TTL.
+     * {@link #getAdditionalTTL()} used as time to live (TTL) for {@link RedisDataType#HASH}
+     * <p>For {@link RedisCommand#SETEX}, we need key, value and time to live (TTL).
      */
-    public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
+    private Integer additionalTTL;
+
+    /**
+     * Default constructor for {@link RedisCommandDescription}.
+     * For {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} data types, {@code additionalKey} is required.
+     * For {@link RedisCommand#SETEX} command, {@code additionalTTL} is required.
+     * In both cases, if the respective variables are not provided, it throws an {@link IllegalArgumentException}
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalKey additional key for Hash data type
+     * @param additionalTTL additional TTL optional for Hash data type
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, String additionalKey, Integer additionalTTL) {
         Objects.requireNonNull(redisCommand, "Redis command type can not be null");
         this.redisCommand = redisCommand;
         this.additionalKey = additionalKey;
+        this.additionalTTL = additionalTTL;
 
         if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
             redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
@@ -62,6 +77,32 @@
                 throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
             }
         }
+
+        if (redisCommand.equals(RedisCommand.SETEX)) {
+            if (additionalTTL == null) {
+                throw new IllegalArgumentException("SETEX command should have time to live (TTL)");
+            }
+        }
+    }
+
+    /**
+     * Use this constructor when data type is {@link RedisDataType#HASH} (without TTL) or {@link RedisDataType#SORTED_SET}.
+     * If different data type is specified, {@code additionalKey} is ignored.
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalKey additional key for Hash and Sorted set data type
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
+        this(redisCommand, additionalKey, null);
+    }
+
+    /**
+     * Use this constructor when using SETEX command {@link RedisDataType#STRING}.
+     * This command requires a TTL. Throws {@link IllegalArgumentException} if it is null.
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalTTL additional TTL required for SETEX command
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, Integer additionalTTL) {
+        this(redisCommand, null, additionalTTL);
     }
 
     /**
@@ -70,7 +111,7 @@
      * @param redisCommand the redis data type {@link RedisCommand}
      */
     public RedisCommandDescription(RedisCommand redisCommand) {
-        this(redisCommand, null);
+        this(redisCommand, null, null);
     }
 
     /**
@@ -87,7 +128,12 @@
      *
      * @return the additional key
      */
-    public String getAdditionalKey() {
-        return additionalKey;
-    }
+    public String getAdditionalKey() { return additionalKey; }
+
+    /**
+     * Returns the additional time to live (TTL) if data type is {@link RedisDataType#HASH}.
+     *
+     * @return the additional TTL
+     */
+    public Integer getAdditionalTTL() { return additionalTTL; }
 }
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
index 96df75e..1481d7d 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -75,4 +75,15 @@
     default Optional<String> getAdditionalKey(T data) {
         return Optional.empty();
     }
+
+    /**
+     * Extracts the additional time to live (TTL) for data as an {@link Optional<Integer>}.
+     * The default implementation returns an empty Optional.
+     *
+     * @param data
+     * @return Optional
+     */
+    default Optional<Integer> getAdditionalTTL(T data) {
+        return Optional.empty();
+    }
 }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 47544f7..ee1cc7f 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -29,6 +29,8 @@
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
 
+import java.util.Optional;
+
 import static org.junit.Assert.assertEquals;
 
 public class RedisSinkITCase extends RedisITCaseBase {
@@ -36,8 +38,13 @@
     private FlinkJedisPoolConfig jedisPoolConfig;
     private static final Long NUM_ELEMENTS = 20L;
     private static final Long ZERO = 0L;
+    private static final Long REDIS_NOT_ASSOCIATED_EXPIRE_FLAG = -1L;
+    private static final Long REDIS_KEY_NOT_EXISTS_FLAG = -2L;
+    private static final Long REDIS_TTL_IN_SECS = 1L;
     private static final String REDIS_KEY = "TEST_KEY";
     private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
+    private static final String TEST_MESSAGE = "TEST_MESSAGE";
+    private static final Long TEST_MESSAGE_LENGTH = (long) TEST_MESSAGE.length();
 
     StreamExecutionEnvironment env;
 
@@ -82,6 +89,21 @@
     }
 
     @Test
+    public void testRedisStringDataTypeWithTTL() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionString());
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+                new RedisCommandMapperWithTTL(RedisCommand.SETEX));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Set Data Type With TTL");
+
+        assertEquals(TEST_MESSAGE_LENGTH, jedis.strlen(REDIS_KEY));
+        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_KEY));
+
+        jedis.del(REDIS_KEY);
+    }
+
+    @Test
     public void testRedisHyperLogLogDataType() throws Exception {
         DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
         RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
@@ -127,6 +149,37 @@
         env.execute("Test Redis Hash Data Type");
 
         assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG, jedis.ttl(REDIS_ADDITIONAL_KEY));
+
+        jedis.del(REDIS_ADDITIONAL_KEY);
+    }
+
+    @Test
+    public void testRedisHashDataTypeWithTTL() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+                new RedisAdditionalTTLMapper(RedisCommand.HSET));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Hash Data Type");
+
+        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+
+        jedis.del(REDIS_ADDITIONAL_KEY);
+    }
+
+    @Test
+    public void testRedisHashDataTypeWithTTLFromOpt() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+                new RedisAdditionalTTLMapperFromOpt(RedisCommand.HSET));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Hash Data Type 2");
+
+        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
@@ -156,6 +209,24 @@
         }
     }
 
+    private static class TestSourceFunctionString implements SourceFunction<Tuple2<String, String>> {
+        private static final long serialVersionUID = 1L;
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+            if (running) {
+                ctx.collect(new Tuple2<>(REDIS_KEY, TEST_MESSAGE));
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
     private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String, String>> {
         private static final long serialVersionUID = 1L;
 
@@ -216,6 +287,30 @@
         }
     }
 
+    public static class RedisCommandMapperWithTTL implements RedisMapper<Tuple2<String, String>> {
+
+        private RedisCommand redisCommand;
+
+        RedisCommandMapperWithTTL(RedisCommand redisCommand){
+            this.redisCommand = redisCommand;
+        }
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(redisCommand, REDIS_TTL_IN_SECS.intValue());
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+    }
+
     public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String, String>> {
 
         private RedisCommand redisCommand;
@@ -239,4 +334,57 @@
             return data.f1;
         }
     }
+
+    public static class RedisAdditionalTTLMapper implements RedisMapper<Tuple2<String, String>> {
+
+        private RedisCommand redisCommand;
+
+        RedisAdditionalTTLMapper(RedisCommand redisCommand){
+            this.redisCommand = redisCommand;
+        }
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY, REDIS_TTL_IN_SECS.intValue());
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+    }
+
+    public static class RedisAdditionalTTLMapperFromOpt implements RedisMapper<Tuple2<String, String>> {
+
+        private RedisCommand redisCommand;
+
+        RedisAdditionalTTLMapperFromOpt(RedisCommand redisCommand){
+            this.redisCommand = redisCommand;
+        }
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY, null);
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+
+        @Override
+        public Optional<Integer> getAdditionalTTL(Tuple2<String, String> data) {
+            return Optional.of(REDIS_TTL_IN_SECS.intValue());
+        }
+    }
 }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
index 4af0c14..d268c95 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -31,6 +31,12 @@
         redisCommandMapper.getCommandDescription();
     }
 
+    @Test(expected=IllegalArgumentException.class)
+    public void shouldThrowExceptionIfAdditionalTTLIsNotGivenForStringDataTypeWithTTL(){
+        RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.SETEX);
+        redisCommandMapper.getCommandDescription();
+    }
+
     @Test
     public void shouldReturnNullForAdditionalDataType(){
         RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);