[BAHIR-247] Provide connection validation/idle testing for Flink-Redis Connector (#121)
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index c56ac14..0840deb 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -48,12 +48,15 @@
* @param maxIdle the cap on the number of "idle" instances in the pool
* @param minIdle the minimum number of idle objects to maintain in the pool
* @param password the password of redis cluster
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if parameter {@code nodes} is {@code null}
*/
private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
- int maxTotal, int maxIdle, int minIdle,
- String password) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+ int maxTotal, int maxIdle, int minIdle, String password,
+ boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
Objects.requireNonNull(nodes, "Node information should be presented");
Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
@@ -96,6 +99,9 @@
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
private String password;
/**
@@ -180,24 +186,67 @@
}
/**
+ * Sets value for the {@code testOnBorrow} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * Setting this to true will also set default idle-testing parameters provided in Jedis
+ * @see redis.clients.jedis.JedisPoolConfig
+ *
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+ * @return Builder itself
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
+
+ /**
* Builds JedisClusterConfig.
*
* @return JedisClusterConfig
*/
public FlinkJedisClusterConfig build() {
- return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password);
+ return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@Override
public String toString() {
return "FlinkJedisClusterConfig{" +
- "nodes=" + nodes +
- ", timeout=" + connectionTimeout +
- ", maxRedirections=" + maxRedirections +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
+ "nodes=" + nodes +
+ ", maxRedirections=" + maxRedirections +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ ", connectionTimeout=" + connectionTimeout +
+ ", password=" + password +
+ ", testOnBorrow=" + testOnBorrow +
+ ", testOnReturn=" + testOnReturn +
+ ", testWhileIdle=" + testWhileIdle +
+ '}';
}
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index 84b1bf2..a41b0e0 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -33,7 +33,12 @@
protected final int connectionTimeout;
protected final String password;
- protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password) {
+ protected final boolean testOnBorrow;
+ protected final boolean testOnReturn;
+ protected final boolean testWhileIdle;
+
+ protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+
Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
@@ -43,6 +48,9 @@
this.maxTotal = maxTotal;
this.maxIdle = maxIdle;
this.minIdle = minIdle;
+ this.testOnBorrow = testOnBorrow;
+ this.testOnReturn = testOnReturn;
+ this.testWhileIdle = testWhileIdle;
this.password = password;
}
@@ -99,4 +107,40 @@
public String getPassword() {
return password;
}
+
+ /**
+ * Get the value for the {@code testOnBorrow} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code testOnBorrow} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getTestOnBorrow()
+ */
+ public boolean getTestOnBorrow() {
+ return testOnBorrow;
+ }
+
+ /**
+ * Get the value for the {@code testOnReturn} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code testOnReturn} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getTestOnReturn()
+ */
+ public boolean getTestOnReturn() {
+ return testOnReturn;
+ }
+
+ /**
+ * Get the value for the {@code testWhileIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code testWhileIdle} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getTestWhileIdle()
+ */
+ public boolean getTestWhileIdle() {
+ return testWhileIdle;
+ }
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index 3f8fc2f..5012da1 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -45,11 +45,16 @@
* @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
* @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
* @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if parameter {@code host} is {@code null}
*/
private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+ int maxTotal, int maxIdle, int minIdle,
+ boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+
Objects.requireNonNull(host, "Host information should be presented");
this.host = host;
this.port = port;
@@ -96,6 +101,9 @@
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
/**
* Sets value for the {@code maxTotal} configuration attribute
@@ -188,6 +196,44 @@
return this;
}
+ /**
+ * Sets value for the {@code testOnBorrow} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * Setting this to true will also set default idle-testing parameters provided in Jedis
+ * @see redis.clients.jedis.JedisPoolConfig
+ *
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+ * @return Builder itself
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
/**
* Builds JedisPoolConfig.
@@ -195,20 +241,24 @@
* @return JedisPoolConfig
*/
public FlinkJedisPoolConfig build() {
- return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle);
+ return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@Override
public String toString() {
return "FlinkJedisPoolConfig{" +
- "host='" + host + '\'' +
- ", port=" + port +
- ", timeout=" + connectionTimeout +
- ", database=" + database +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
+ "host=" + host +
+ ", port=" + port +
+ ", database=" + database +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ ", connectionTimeout=" +
+ ", password=" + password +
+ ", testOnBorrow=" + testOnBorrow +
+ ", testOnReturn=" + testOnReturn +
+ ", testWhileIdle=" + testWhileIdle +
+ '}';
}
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 928f5e8..340eb4e 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -52,15 +52,19 @@
* @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
* @param maxIdle the cap on the number of "idle" instances in the pool
* @param minIdle the minimum number of idle objects to maintain in the pool
- *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
* @throws IllegalArgumentException if {@code sentinels} are empty
*/
private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels,
- int connectionTimeout, int soTimeout,
- String password, int database,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+ int connectionTimeout, int soTimeout,
+ String password, int database,
+ int maxTotal, int maxIdle, int minIdle,
+ boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+
Objects.requireNonNull(masterName, "Master name should be presented");
Objects.requireNonNull(sentinels, "Sentinels information should be presented");
Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
@@ -120,6 +124,9 @@
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
/**
* Sets master name of the replica set.
@@ -224,26 +231,70 @@
}
/**
+ * Sets value for the {@code testOnBorrow} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * Setting this to true will also set default idle-testing parameters provided in Jedis
+ * @see redis.clients.jedis.JedisPoolConfig
+ *
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+ * @return Builder itself
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
+
+ /**
* Builds JedisSentinelConfig.
*
* @return JedisSentinelConfig
*/
public FlinkJedisSentinelConfig build(){
return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout,
- password, database, maxTotal, maxIdle, minIdle);
+ password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@Override
public String toString() {
return "FlinkJedisSentinelConfig{" +
- "masterName='" + masterName + '\'' +
- ", connectionTimeout=" + connectionTimeout +
- ", soTimeout=" + soTimeout +
- ", database=" + database +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
+ "masterName=" + masterName +
+ ", sentinels=" + sentinels +
+ ", soTimeout=" + soTimeout +
+ ", database=" + database +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ ", connectionTimeout=" + connectionTimeout +
+ ", password=" + password +
+ ", testOnBorrow=" + testOnBorrow +
+ ", testOnReturn=" + testOnReturn +
+ ", testWhileIdle=" + testWhileIdle +
+ '}';
}
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index bdb9fed..b06a6e9 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -23,6 +23,7 @@
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisSentinelPool;
import java.util.Objects;
@@ -65,8 +66,8 @@
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
- jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
- jedisPoolConfig.getDatabase());
+ jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase());
return new RedisContainer(jedisPool);
}
@@ -83,11 +84,11 @@
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
- jedisClusterConfig.getConnectionTimeout(),
- jedisClusterConfig.getConnectionTimeout(),
- jedisClusterConfig.getMaxRedirections(),
- jedisClusterConfig.getPassword(),
- genericObjectPoolConfig);
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getMaxRedirections(),
+ jedisClusterConfig.getPassword(),
+ genericObjectPoolConfig);
return new RedisClusterContainer(jedisCluster);
}
@@ -104,17 +105,20 @@
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
- jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
- jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
- jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
+ jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+ jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
+ jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
return new RedisContainer(jedisSentinelPool);
}
- private static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
- GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ public static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+ GenericObjectPoolConfig genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? new JedisPoolConfig(): new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
+ genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow());
+ genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn());
+
return genericObjectPoolConfig;
}
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 6f519ed..80189df 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -23,28 +23,28 @@
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
- new TestConfig(-1, 0, 0, 0);
+ new TestConfig(-1, 0, 0, 0, false, false, false);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
- new TestConfig(1, -1, 0, 0);
+ new TestConfig(1, -1, 0, 0, false, false, false);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
- new TestConfig(0, 0, -1, 0);
+ new TestConfig(0, 0, -1, 0, false, false, false);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
- new TestConfig(0, 0, 0, -1);
+ new TestConfig(0, 0, 0, -1, false, false, false);
}
private class TestConfig extends FlinkJedisConfigBase {
-
- protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy");
+ protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle,
+ boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", testOnBorrow, testOnReturn, testWhileIdle);
}
}
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
new file mode 100644
index 0000000..eac5ca0
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.Test;
+import redis.clients.jedis.JedisPoolConfig;
+
+public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
+
+ @Test
+ public void testNotTestWhileIdle() {
+ FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).build();
+ GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+ assertFalse(genericObjectPoolConfig.getTestWhileIdle());
+ assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
+ }
+
+ @Test
+ public void testTestWhileIdle() {
+ FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).setTestWhileIdle(true).build();
+ GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+ assertTrue(genericObjectPoolConfig.getTestWhileIdle());
+ assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
+
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ assertEquals(genericObjectPoolConfig.getMinEvictableIdleTimeMillis(), jedisPoolConfig.getMinEvictableIdleTimeMillis());
+ assertEquals(genericObjectPoolConfig.getTimeBetweenEvictionRunsMillis(), jedisPoolConfig.getTimeBetweenEvictionRunsMillis());
+ assertEquals(genericObjectPoolConfig.getNumTestsPerEvictionRun(), jedisPoolConfig.getNumTestsPerEvictionRun());
+ }
+
+ private void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig genericObjectPoolConfig) {
+ assertEquals(genericObjectPoolConfig.getMaxIdle(), flinkJedisPoolConfig.getMaxIdle());
+ assertEquals(genericObjectPoolConfig.getMinIdle(), flinkJedisPoolConfig.getMinIdle());
+ assertEquals(genericObjectPoolConfig.getMaxTotal(), flinkJedisPoolConfig.getMaxTotal());
+ assertEquals(genericObjectPoolConfig.getTestWhileIdle(), flinkJedisPoolConfig.getTestWhileIdle());
+ assertEquals(genericObjectPoolConfig.getTestOnBorrow(), flinkJedisPoolConfig.getTestOnBorrow());
+ assertEquals(genericObjectPoolConfig.getTestOnReturn(), flinkJedisPoolConfig.getTestOnReturn());
+ }
+}