[BAHIR-205] Support password configuration for redis cluster

Upgrade jedis to 2.9.0 and add password for redis cluster sink.

Closes #57
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 3ed9fe8..c9b7a63 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -34,7 +34,7 @@
     <packaging>jar</packaging>
 
     <properties>
-        <jedis.version>2.8.0</jedis.version>
+        <jedis.version>2.9.0</jedis.version>
     </properties>
 
     <dependencies>
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index 119ade3..f05dfd8 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -47,11 +47,13 @@
      * @param maxTotal the maximum number of objects that can be allocated by the pool
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the pool
+     * @param password the password of redis cluster
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+                                    int maxTotal, int maxIdle, int minIdle,
+                                    String password) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
 
         Objects.requireNonNull(nodes, "Node information should be presented");
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
@@ -94,6 +96,7 @@
         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+        private String password;
 
         /**
          * Sets list of node.
@@ -165,12 +168,24 @@
         }
 
         /**
+         * Sets value for the {@code password} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param password the password for accessing redis cluster
+         * @return Builder itself
+         */
+        public Builder setPassword(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /**
          * Builds JedisClusterConfig.
          *
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle);
+            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password);
         }
     }
 
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index 0d821ed..84b1bf2 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -31,8 +31,9 @@
     protected final int maxIdle;
     protected final int minIdle;
     protected final int connectionTimeout;
+    protected final String password;
 
-    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
+    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password) {
         Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
         Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
         Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
@@ -42,6 +43,7 @@
         this.maxTotal = maxTotal;
         this.maxIdle = maxIdle;
         this.minIdle = minIdle;
+        this.password = password;
     }
 
     /**
@@ -88,4 +90,13 @@
     public int getMinIdle() {
         return minIdle;
     }
+
+    /**
+     * Returns password.
+     *
+     * @return password
+     */
+    public String getPassword() {
+        return password;
+    }
 }
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index d4c30ff..7c37ecb 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -31,7 +31,6 @@
     private final String host;
     private final int port;
     private final int database;
-    private final String password;
 
 
     /**
@@ -50,12 +49,11 @@
      */
     private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
                                 int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
         Objects.requireNonNull(host, "Host information should be presented");
         this.host = host;
         this.port = port;
         this.database = database;
-        this.password = password;
     }
 
     /**
@@ -87,15 +85,6 @@
     }
 
     /**
-     * Returns password.
-     *
-     * @return password
-     */
-    public String getPassword() {
-        return password;
-    }
-
-    /**
      * Builder for initializing  {@link FlinkJedisPoolConfig}.
      */
     public static class Builder {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 6058a53..2fb87b9 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -37,7 +37,6 @@
     private final String masterName;
     private final Set<String> sentinels;
     private final int soTimeout;
-    private final String password;
     private final int database;
 
     /**
@@ -61,7 +60,7 @@
                                     int connectionTimeout, int soTimeout,
                                     String password, int database,
                                     int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
         Objects.requireNonNull(masterName, "Master name should be presented");
         Objects.requireNonNull(sentinels, "Sentinels information should be presented");
         Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
@@ -69,7 +68,6 @@
         this.masterName = masterName;
         this.sentinels = new HashSet<>(sentinels);
         this.soTimeout = soTimeout;
-        this.password = password;
         this.database = database;
     }
 
@@ -101,15 +99,6 @@
     }
 
     /**
-     * Returns password.
-     *
-     * @return password
-     */
-    public String getPassword() {
-        return password;
-    }
-
-    /**
      * Returns database index.
      *
      * @return database index
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 0db5b05..3e80d57 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -88,8 +88,12 @@
         genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
         genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
 
-        JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
-            jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
+        JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
+                jedisClusterConfig.getConnectionTimeout(),
+                jedisClusterConfig.getConnectionTimeout(),
+                jedisClusterConfig.getMaxRedirections(),
+                jedisClusterConfig.getPassword(),
+                genericObjectPoolConfig);
         return new RedisClusterContainer(jedisCluster);
     }
 
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 2601e40..6f519ed 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -44,7 +44,7 @@
     private class TestConfig extends FlinkJedisConfigBase {
 
         protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
-            super(connectionTimeout, maxTotal, maxIdle, minIdle);
+            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy");
         }
     }
 }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index 6d0e787..addb469 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -23,6 +23,9 @@
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 public class JedisClusterConfigTest extends TestLogger {
 
     @Test(expected = NullPointerException.class)
@@ -46,4 +49,35 @@
             .setNodes(set)
             .build();
     }
+
+    @Test
+    public void shouldSetPasswordSuccessfully() {
+        Set<InetSocketAddress> set = new HashSet<>();
+        InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080);
+        set.add(address);
+        FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+        FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+                .setMaxIdle(0)
+                .setMaxTotal(0)
+                .setTimeout(0)
+                .setNodes(set)
+                .setPassword("test-pwd")
+                .build();
+        assertEquals("test-pwd", clusterConfig.getPassword());
+    }
+
+    @Test
+    public void shouldPasswordNotBeenSet() {
+        Set<InetSocketAddress> set = new HashSet<>();
+        InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080);
+        set.add(address);
+        FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+        FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+                .setMaxIdle(0)
+                .setMaxTotal(0)
+                .setTimeout(0)
+                .setNodes(set)
+                .build();
+        assertNull(clusterConfig.getPassword());
+    }
 }