Update Redis connector with HINCRBY (#81)
Update Redis connector with HINCRBY and repair
Redis Cluster example code
Co-authored-by: zhanglei <zhanglei@evergrande.com>
diff --git a/flink-connector-redis/README.md b/flink-connector-redis/README.md
index 6c8fada..3fad3c2 100644
--- a/flink-connector-redis/README.md
+++ b/flink-connector-redis/README.md
@@ -78,7 +78,7 @@
**Java:**
- FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
+ FlinkJedisPoolConfig conf = new FlinkJedisClusterConfig.Builder()
.setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
DataStream<String> stream = ...;
@@ -87,7 +87,7 @@
**Scala:**
- val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
+ val conf = new FlinkJedisClusterConfig.Builder().setNodes(...).build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 4dfa61b..4b6898d 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -174,6 +174,9 @@
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value,
optAdditionalTTL.orElse(this.additionalTTL));
break;
+ case HINCRBY:
+ this.redisCommandsContainer.hincrBy(optAdditionalKey.orElse(this.additionalKey), key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
+ break;
case INCRBY:
this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
break;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
index 4947fe0..48556b4 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
@@ -17,9 +17,6 @@
package org.apache.flink.streaming.connectors.redis;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -37,6 +34,10 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
/**
* @author Ameng .
* redis table sink to use redis in sql env.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
index b081d3c..1031aec 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
@@ -17,29 +17,21 @@
package org.apache.flink.streaming.connectors.redis;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MASTER_NAME;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_NODES;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_SENTINEL;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
-import org.apache.flink.table.sinks.StreamTableSink;
-import org.apache.flink.types.Row;
+
+import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.*;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.Schema.*;
/**
* @author Ameng .
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index de0dfaa..018b8ce 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -73,6 +73,22 @@
}
@Override
+ public void hincrBy(final String key, final String hashField, final Long value, final Integer ttl) {
+ try {
+ jedisCluster.hincrBy(key, hashField, value);
+ if (ttl != null) {
+ jedisCluster.expire(key, ttl);
+ }
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command HINCRBY to hash {} of key {} error message {}",
+ hashField, key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
public void rpush(final String listName, final String value) {
try {
jedisCluster.rpush(listName, value);
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 8adbd8d..6f3e6c1 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -44,6 +44,8 @@
*/
void hset(String key, String hashField, String value, Integer ttl);
+ void hincrBy(String key, String hashField, Long value, Integer ttl);
+
/**
* Insert the specified value at the tail of the list stored at key.
* If key does not exist, it is created as empty list before performing the push operation.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index bde54b5..d573bde 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -106,6 +106,26 @@
}
@Override
+ public void hincrBy(final String key, final String hashField, final Long value, final Integer ttl) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.hincrBy(key, hashField, value);
+ if (ttl != null) {
+ jedis.expire(key, ttl);
+ }
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command HINCRBY to key {} and hashField {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
public void rpush(final String listName, final String value) {
Jedis jedis = null;
try {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
index 6e19295..2cb3c77 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
@@ -17,14 +17,15 @@
package org.apache.flink.streaming.connectors.redis.common.hanlder;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
+
/**
* @author Ameng .
* handler for create redis mapper.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 282d15a..858720b 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -77,6 +77,8 @@
*/
HSET(RedisDataType.HASH),
+ HINCRBY(RedisDataType.HINCRBY),
+
/**
* Delta plus for specified key.
*/
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
index 989221c..4c88cad 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
@@ -33,6 +33,8 @@
*/
HASH,
+ HINCRBY,
+
/**
* Redis Lists are simply lists of strings, sorted by insertion order.
*/
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HIncrByMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HIncrByMapper.java
new file mode 100644
index 0000000..509f8ea
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HIncrByMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * HSET operation redis mapper.
+ */
+public class HIncrByMapper extends RowRedisMapper {
+
+ public HIncrByMapper() {
+ super(RedisCommand.HINCRBY );
+ }
+
+}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
index 6e9bc90..a1f1388 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
@@ -17,11 +17,6 @@
package org.apache.flink.streaming.connectors.redis.common.mapper.row;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
@@ -31,6 +26,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
+
/**
* @author Ameng .
* base row redis mapper implement.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
index 086114b..b9b2e1f 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
@@ -17,19 +17,13 @@
package org.apache.flink.streaming.connectors.redis.descriptor;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MASTER_NAME;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_NODES;
-import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_SENTINEL;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.util.Preconditions;
import java.util.HashMap;
import java.util.Map;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.*;
/**
* @author Ameng .