[BAHIR-205] Support password configuration for redis cluster
Upgrade jedis to 2.9.0 and add password for redis cluster sink.
Closes #57
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 3ed9fe8..c9b7a63 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -34,7 +34,7 @@
<packaging>jar</packaging>
<properties>
- <jedis.version>2.8.0</jedis.version>
+ <jedis.version>2.9.0</jedis.version>
</properties>
<dependencies>
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index 119ade3..f05dfd8 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -47,11 +47,13 @@
* @param maxTotal the maximum number of objects that can be allocated by the pool
* @param maxIdle the cap on the number of "idle" instances in the pool
* @param minIdle the minimum number of idle objects to maintain in the pool
+ * @param password the password of redis cluster
* @throws NullPointerException if parameter {@code nodes} is {@code null}
*/
private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ int maxTotal, int maxIdle, int minIdle,
+ String password) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
Objects.requireNonNull(nodes, "Node information should be presented");
Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
@@ -94,6 +96,7 @@
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private String password;
/**
* Sets list of node.
@@ -165,12 +168,24 @@
}
/**
+ * Sets value for the {@code password} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param password the password for accessing redis cluster
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
* Builds JedisClusterConfig.
*
* @return JedisClusterConfig
*/
public FlinkJedisClusterConfig build() {
- return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle);
+ return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password);
}
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index 0d821ed..84b1bf2 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -31,8 +31,9 @@
protected final int maxIdle;
protected final int minIdle;
protected final int connectionTimeout;
+ protected final String password;
- protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
+ protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password) {
Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
@@ -42,6 +43,7 @@
this.maxTotal = maxTotal;
this.maxIdle = maxIdle;
this.minIdle = minIdle;
+ this.password = password;
}
/**
@@ -88,4 +90,13 @@
public int getMinIdle() {
return minIdle;
}
+
+ /**
+ * Returns password.
+ *
+ * @return password
+ */
+ public String getPassword() {
+ return password;
+ }
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index d4c30ff..7c37ecb 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -31,7 +31,6 @@
private final String host;
private final int port;
private final int database;
- private final String password;
/**
@@ -50,12 +49,11 @@
*/
private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
Objects.requireNonNull(host, "Host information should be presented");
this.host = host;
this.port = port;
this.database = database;
- this.password = password;
}
/**
@@ -87,15 +85,6 @@
}
/**
- * Returns password.
- *
- * @return password
- */
- public String getPassword() {
- return password;
- }
-
- /**
* Builder for initializing {@link FlinkJedisPoolConfig}.
*/
public static class Builder {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 6058a53..2fb87b9 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -37,7 +37,6 @@
private final String masterName;
private final Set<String> sentinels;
private final int soTimeout;
- private final String password;
private final int database;
/**
@@ -61,7 +60,7 @@
int connectionTimeout, int soTimeout,
String password, int database,
int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
Objects.requireNonNull(masterName, "Master name should be presented");
Objects.requireNonNull(sentinels, "Sentinels information should be presented");
Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
@@ -69,7 +68,6 @@
this.masterName = masterName;
this.sentinels = new HashSet<>(sentinels);
this.soTimeout = soTimeout;
- this.password = password;
this.database = database;
}
@@ -101,15 +99,6 @@
}
/**
- * Returns password.
- *
- * @return password
- */
- public String getPassword() {
- return password;
- }
-
- /**
* Returns database index.
*
* @return database index
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 0db5b05..3e80d57 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -88,8 +88,12 @@
genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
- JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
- jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
+ JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getMaxRedirections(),
+ jedisClusterConfig.getPassword(),
+ genericObjectPoolConfig);
return new RedisClusterContainer(jedisCluster);
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 2601e40..6f519ed 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -44,7 +44,7 @@
private class TestConfig extends FlinkJedisConfigBase {
protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy");
}
}
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index 6d0e787..addb469 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -23,6 +23,9 @@
import java.util.HashSet;
import java.util.Set;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
public class JedisClusterConfigTest extends TestLogger {
@Test(expected = NullPointerException.class)
@@ -46,4 +49,35 @@
.setNodes(set)
.build();
}
+
+ @Test
+ public void shouldSetPasswordSuccessfully() {
+ Set<InetSocketAddress> set = new HashSet<>();
+ InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080);
+ set.add(address);
+ FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+ FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+ .setMaxIdle(0)
+ .setMaxTotal(0)
+ .setTimeout(0)
+ .setNodes(set)
+ .setPassword("test-pwd")
+ .build();
+ assertEquals("test-pwd", clusterConfig.getPassword());
+ }
+
+ @Test
+ public void shouldPasswordNotBeenSet() {
+ Set<InetSocketAddress> set = new HashSet<>();
+ InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080);
+ set.add(address);
+ FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+ FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+ .setMaxIdle(0)
+ .setMaxTotal(0)
+ .setTimeout(0)
+ .setNodes(set)
+ .build();
+ assertNull(clusterConfig.getPassword());
+ }
}