Update Redis connector with HINCRBY (#81)

Update Redis connector with HINCRBY and repair 
Redis Cluster example code

Co-authored-by: zhanglei <zhanglei@evergrande.com>
diff --git a/flink-connector-redis/README.md b/flink-connector-redis/README.md
index 6c8fada..3fad3c2 100644
--- a/flink-connector-redis/README.md
+++ b/flink-connector-redis/README.md
@@ -78,7 +78,7 @@
 
 **Java:**
 
-    FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
+    FlinkJedisPoolConfig conf = new FlinkJedisClusterConfig.Builder()
         .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
 
     DataStream<String> stream = ...;
@@ -87,7 +87,7 @@
 **Scala:**
 
 
-    val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
+    val conf = new FlinkJedisClusterConfig.Builder().setNodes(...).build()
     stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
 
 
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 4dfa61b..4b6898d 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -174,6 +174,9 @@
                 this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value,
                         optAdditionalTTL.orElse(this.additionalTTL));
                 break;
+            case HINCRBY:
+                this.redisCommandsContainer.hincrBy(optAdditionalKey.orElse(this.additionalKey), key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
+                break;
             case INCRBY:
                 this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
                 break;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
index 4947fe0..48556b4 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.connectors.redis;
 
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-import java.util.Map;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -37,6 +34,10 @@
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
 /**
  * @author Ameng .
  * redis table sink to use redis in sql env.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
index b081d3c..1031aec 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
@@ -17,29 +17,21 @@
 
 package org.apache.flink.streaming.connectors.redis;
 
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MASTER_NAME;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_NODES;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_SENTINEL;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.types.Row;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
-import org.apache.flink.table.sinks.StreamTableSink;
-import org.apache.flink.types.Row;
+
+import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.*;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.Schema.*;
 
 /**
  * @author Ameng .
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index de0dfaa..018b8ce 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -73,6 +73,22 @@
     }
 
     @Override
+    public void hincrBy(final String key, final String hashField, final Long value, final Integer ttl) {
+        try {
+            jedisCluster.hincrBy(key, hashField, value);
+            if (ttl != null) {
+                jedisCluster.expire(key, ttl);
+            }
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command HINCRBY to hash {} of key {} error message {}",
+                        hashField, key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
     public void rpush(final String listName, final String value) {
         try {
             jedisCluster.rpush(listName, value);
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 8adbd8d..6f3e6c1 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -44,6 +44,8 @@
      */
     void hset(String key, String hashField, String value, Integer ttl);
 
+    void hincrBy(String key, String hashField, Long value, Integer ttl);
+
     /**
      * Insert the specified value at the tail of the list stored at key.
      * If key does not exist, it is created as empty list before performing the push operation.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index bde54b5..d573bde 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -106,6 +106,26 @@
     }
 
     @Override
+    public void hincrBy(final String key, final String hashField, final Long value, final Integer ttl) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.hincrBy(key, hashField, value);
+            if (ttl != null) {
+                jedis.expire(key, ttl);
+            }
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command HINCRBY to key {} and hashField {} error message {}",
+                        key, hashField, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
     public void rpush(final String listName, final String value) {
         Jedis jedis = null;
         try {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
index 6e19295..2cb3c77 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
@@ -17,14 +17,15 @@
 
 package org.apache.flink.streaming.connectors.redis.common.hanlder;
 
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
+
 /**
  * @author Ameng .
  * handler for create redis mapper.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 282d15a..858720b 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -77,6 +77,8 @@
      */
     HSET(RedisDataType.HASH),
 
+    HINCRBY(RedisDataType.HINCRBY),
+
     /**
      * Delta plus for specified key.
      */
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
index 989221c..4c88cad 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
@@ -33,6 +33,8 @@
      */
     HASH,
 
+    HINCRBY,
+
     /**
      * Redis Lists are simply lists of strings, sorted by insertion order.
      */
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HIncrByMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HIncrByMapper.java
new file mode 100644
index 0000000..509f8ea
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HIncrByMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * HSET operation redis mapper.
+ */
+public class HIncrByMapper extends RowRedisMapper {
+
+    public HIncrByMapper() {
+        super(RedisCommand.HINCRBY );
+    }
+
+}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
index 6e9bc90..a1f1388 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.connectors.redis.common.mapper.row;
 
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
@@ -31,6 +26,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
+
 /**
  * @author Ameng .
  * base row redis mapper implement.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
index 086114b..b9b2e1f 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
@@ -17,19 +17,13 @@
 
 package org.apache.flink.streaming.connectors.redis.descriptor;
 
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MASTER_NAME;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_NODES;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_SENTINEL;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.*;
 
 /**
  * @author Ameng .