[BAHIR-247] Provide connection validation/idle testing for Flink-Redis Connector (#121)

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index c56ac14..0840deb 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -48,12 +48,15 @@
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the pool
      * @param password the password of redis cluster
+     * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+     * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+     * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle,
-                                    String password) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+                                    int maxTotal, int maxIdle, int minIdle, String password,
+                                    boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(nodes, "Node information should be presented");
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
@@ -96,6 +99,9 @@
         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+        private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+        private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+        private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
         private String password;
 
         /**
@@ -180,24 +186,67 @@
         }
 
         /**
+         * Sets value for the {@code testOnBorrow} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+         * @return Builder itself
+         */
+        public Builder setTestOnBorrow(boolean testOnBorrow) {
+            this.testOnBorrow = testOnBorrow;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testOnReturn} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+         * @return Builder itself
+         */
+        public Builder setTestOnReturn(boolean testOnReturn) {
+            this.testOnReturn = testOnReturn;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testWhileIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * Setting this to true will also set default idle-testing parameters provided in Jedis
+         * @see redis.clients.jedis.JedisPoolConfig
+         *
+         * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+         * @return Builder itself
+         */
+        public Builder setTestWhileIdle(boolean testWhileIdle) {
+            this.testWhileIdle = testWhileIdle;
+            return this;
+        }
+
+        /**
          * Builds JedisClusterConfig.
          *
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password);
+            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 
     @Override
     public String toString() {
         return "FlinkJedisClusterConfig{" +
-            "nodes=" + nodes +
-            ", timeout=" + connectionTimeout +
-            ", maxRedirections=" + maxRedirections +
-            ", maxTotal=" + maxTotal +
-            ", maxIdle=" + maxIdle +
-            ", minIdle=" + minIdle +
-            '}';
+          "nodes=" + nodes +
+          ", maxRedirections=" + maxRedirections +
+          ", maxTotal=" + maxTotal +
+          ", maxIdle=" + maxIdle +
+          ", minIdle=" + minIdle +
+          ", connectionTimeout=" + connectionTimeout +
+          ", password=" + password +
+          ", testOnBorrow=" + testOnBorrow +
+          ", testOnReturn=" + testOnReturn +
+          ", testWhileIdle=" + testWhileIdle +
+          '}';
     }
 }
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index 84b1bf2..a41b0e0 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -33,7 +33,12 @@
     protected final int connectionTimeout;
     protected final String password;
 
-    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password) {
+    protected final boolean testOnBorrow;
+    protected final boolean testOnReturn;
+    protected final boolean testWhileIdle;
+
+    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+
         Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
         Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
         Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
@@ -43,6 +48,9 @@
         this.maxTotal = maxTotal;
         this.maxIdle = maxIdle;
         this.minIdle = minIdle;
+        this.testOnBorrow = testOnBorrow;
+        this.testOnReturn = testOnReturn;
+        this.testWhileIdle = testWhileIdle;
         this.password = password;
     }
 
@@ -99,4 +107,40 @@
     public String getPassword() {
         return password;
     }
+
+    /**
+     * Get the value for the {@code testOnBorrow} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code testOnBorrow} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getTestOnBorrow()
+     */
+    public boolean getTestOnBorrow() {
+        return testOnBorrow;
+    }
+
+    /**
+     * Get the value for the {@code testOnReturn} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code testOnReturn} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getTestOnReturn()
+     */
+    public boolean getTestOnReturn() {
+        return testOnReturn;
+    }
+
+    /**
+     * Get the value for the {@code testWhileIdle} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code testWhileIdle} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getTestWhileIdle()
+     */
+    public boolean getTestWhileIdle() {
+        return testWhileIdle;
+    }
 }
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index 3f8fc2f..5012da1 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -45,11 +45,16 @@
      * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
      * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
      * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+     * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+     * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+     * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code host} is {@code null}
      */
     private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
-                                int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+                                 int maxTotal, int maxIdle, int minIdle,
+                                 boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+
         Objects.requireNonNull(host, "Host information should be presented");
         this.host = host;
         this.port = port;
@@ -96,6 +101,9 @@
         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+        private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+        private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+        private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
 
         /**
          * Sets value for the {@code maxTotal} configuration attribute
@@ -188,6 +196,44 @@
             return this;
         }
 
+        /**
+         * Sets value for the {@code testOnBorrow} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+         * @return Builder itself
+         */
+        public Builder setTestOnBorrow(boolean testOnBorrow) {
+            this.testOnBorrow = testOnBorrow;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testOnReturn} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+         * @return Builder itself
+         */
+        public Builder setTestOnReturn(boolean testOnReturn) {
+            this.testOnReturn = testOnReturn;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testWhileIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * Setting this to true will also set default idle-testing parameters provided in Jedis
+         * @see redis.clients.jedis.JedisPoolConfig
+         *
+         * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+         * @return Builder itself
+         */
+        public Builder setTestWhileIdle(boolean testWhileIdle) {
+            this.testWhileIdle = testWhileIdle;
+            return this;
+        }
 
         /**
          * Builds JedisPoolConfig.
@@ -195,20 +241,24 @@
          * @return JedisPoolConfig
          */
         public FlinkJedisPoolConfig build() {
-            return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle);
+            return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 
     @Override
     public String toString() {
         return "FlinkJedisPoolConfig{" +
-            "host='" + host + '\'' +
-            ", port=" + port +
-            ", timeout=" + connectionTimeout +
-            ", database=" + database +
-            ", maxTotal=" + maxTotal +
-            ", maxIdle=" + maxIdle +
-            ", minIdle=" + minIdle +
-            '}';
+          "host=" + host +
+          ", port=" + port +
+          ", database=" + database +
+          ", maxTotal=" + maxTotal +
+          ", maxIdle=" + maxIdle +
+          ", minIdle=" + minIdle +
+          ", connectionTimeout=" +
+          ", password=" + password +
+          ", testOnBorrow=" + testOnBorrow +
+          ", testOnReturn=" + testOnReturn +
+          ", testWhileIdle=" + testWhileIdle +
+          '}';
     }
 }
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 928f5e8..340eb4e 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -52,15 +52,19 @@
      * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the pool
-     *
+     * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+     * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+     * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
      * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
      * @throws IllegalArgumentException if {@code sentinels} are empty
      */
     private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels,
-                                    int connectionTimeout, int soTimeout,
-                                    String password, int database,
-                                    int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+                                     int connectionTimeout, int soTimeout,
+                                     String password, int database,
+                                     int maxTotal, int maxIdle, int minIdle,
+                                     boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+
         Objects.requireNonNull(masterName, "Master name should be presented");
         Objects.requireNonNull(sentinels, "Sentinels information should be presented");
         Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
@@ -120,6 +124,9 @@
         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+        private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+        private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+        private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
 
         /**
          * Sets master name of the replica set.
@@ -224,26 +231,70 @@
         }
 
         /**
+         * Sets value for the {@code testOnBorrow} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+         * @return Builder itself
+         */
+        public Builder setTestOnBorrow(boolean testOnBorrow) {
+            this.testOnBorrow = testOnBorrow;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testOnReturn} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+         * @return Builder itself
+         */
+        public Builder setTestOnReturn(boolean testOnReturn) {
+            this.testOnReturn = testOnReturn;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testWhileIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * Setting this to true will also set default idle-testing parameters provided in Jedis
+         * @see redis.clients.jedis.JedisPoolConfig
+         *
+         * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+         * @return Builder itself
+         */
+        public Builder setTestWhileIdle(boolean testWhileIdle) {
+            this.testWhileIdle = testWhileIdle;
+            return this;
+        }
+
+        /**
          * Builds JedisSentinelConfig.
          *
          * @return JedisSentinelConfig
          */
         public FlinkJedisSentinelConfig build(){
             return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout,
-                password, database, maxTotal, maxIdle, minIdle);
+                password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 
     @Override
     public String toString() {
         return "FlinkJedisSentinelConfig{" +
-            "masterName='" + masterName + '\'' +
-            ", connectionTimeout=" + connectionTimeout +
-            ", soTimeout=" + soTimeout +
-            ", database=" + database +
-            ", maxTotal=" + maxTotal +
-            ", maxIdle=" + maxIdle +
-            ", minIdle=" + minIdle +
-            '}';
+          "masterName=" + masterName +
+          ", sentinels=" + sentinels +
+          ", soTimeout=" + soTimeout +
+          ", database=" + database +
+          ", maxTotal=" + maxTotal +
+          ", maxIdle=" + maxIdle +
+          ", minIdle=" + minIdle +
+          ", connectionTimeout=" + connectionTimeout +
+          ", password=" + password +
+          ", testOnBorrow=" + testOnBorrow +
+          ", testOnReturn=" + testOnReturn +
+          ", testWhileIdle=" + testWhileIdle +
+          '}';
     }
 }
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index bdb9fed..b06a6e9 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -23,6 +23,7 @@
 import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
 import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
 import redis.clients.jedis.JedisSentinelPool;
 
 import java.util.Objects;
@@ -65,8 +66,8 @@
         GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
 
         JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
-            jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
-            jedisPoolConfig.getDatabase());
+          jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+          jedisPoolConfig.getDatabase());
         return new RedisContainer(jedisPool);
     }
 
@@ -83,11 +84,11 @@
         GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
 
         JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
-                jedisClusterConfig.getConnectionTimeout(),
-                jedisClusterConfig.getConnectionTimeout(),
-                jedisClusterConfig.getMaxRedirections(),
-                jedisClusterConfig.getPassword(),
-                genericObjectPoolConfig);
+          jedisClusterConfig.getConnectionTimeout(),
+          jedisClusterConfig.getConnectionTimeout(),
+          jedisClusterConfig.getMaxRedirections(),
+          jedisClusterConfig.getPassword(),
+          genericObjectPoolConfig);
         return new RedisClusterContainer(jedisCluster);
     }
 
@@ -104,17 +105,20 @@
         GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
 
         JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
-            jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
-            jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
-            jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
+          jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+          jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
+          jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
         return new RedisContainer(jedisSentinelPool);
     }
 
-    private static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
-        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+    public static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+        GenericObjectPoolConfig genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? new JedisPoolConfig(): new GenericObjectPoolConfig();
         genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
         genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
         genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
+        genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow());
+        genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn());
+
         return genericObjectPoolConfig;
     }
 }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 6f519ed..80189df 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -23,28 +23,28 @@
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
-        new TestConfig(-1, 0, 0, 0);
+        new TestConfig(-1, 0, 0, 0, false, false, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
-        new TestConfig(1, -1, 0, 0);
+        new TestConfig(1, -1, 0, 0, false, false, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
-        new TestConfig(0, 0, -1, 0);
+        new TestConfig(0, 0, -1, 0, false, false, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
-        new TestConfig(0, 0, 0, -1);
+        new TestConfig(0, 0, 0, -1, false, false, false);
     }
 
     private class TestConfig extends FlinkJedisConfigBase {
-
-        protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
-            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy");
+        protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle,
+                             boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
new file mode 100644
index 0000000..eac5ca0
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.Test;
+import redis.clients.jedis.JedisPoolConfig;
+
+public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
+
+    @Test
+    public void testNotTestWhileIdle() {
+        FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).build();
+        GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+        assertFalse(genericObjectPoolConfig.getTestWhileIdle());
+        assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
+    }
+
+    @Test
+    public void testTestWhileIdle() {
+        FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).setTestWhileIdle(true).build();
+        GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+        assertTrue(genericObjectPoolConfig.getTestWhileIdle());
+        assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
+
+        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+        assertEquals(genericObjectPoolConfig.getMinEvictableIdleTimeMillis(), jedisPoolConfig.getMinEvictableIdleTimeMillis());
+        assertEquals(genericObjectPoolConfig.getTimeBetweenEvictionRunsMillis(), jedisPoolConfig.getTimeBetweenEvictionRunsMillis());
+        assertEquals(genericObjectPoolConfig.getNumTestsPerEvictionRun(), jedisPoolConfig.getNumTestsPerEvictionRun());
+    }
+
+    private void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig genericObjectPoolConfig) {
+        assertEquals(genericObjectPoolConfig.getMaxIdle(), flinkJedisPoolConfig.getMaxIdle());
+        assertEquals(genericObjectPoolConfig.getMinIdle(), flinkJedisPoolConfig.getMinIdle());
+        assertEquals(genericObjectPoolConfig.getMaxTotal(), flinkJedisPoolConfig.getMaxTotal());
+        assertEquals(genericObjectPoolConfig.getTestWhileIdle(), flinkJedisPoolConfig.getTestWhileIdle());
+        assertEquals(genericObjectPoolConfig.getTestOnBorrow(), flinkJedisPoolConfig.getTestOnBorrow());
+        assertEquals(genericObjectPoolConfig.getTestOnReturn(), flinkJedisPoolConfig.getTestOnReturn());
+    }
+}