[issues-44] support access control (#47)
diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index 816449f..f61cbda 100644
--- a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -92,10 +92,10 @@
properties,
schema,
topic,
- null,
- null,
producerGroup,
nameServerAddress,
+ null,
+ null,
tag,
dynamicColumn,
fieldDelimiter,
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index 4117cc1..e8693f7 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -122,6 +122,8 @@
long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS);
String startDateTime = configuration.getString(OPTIONAL_START_TIME);
String timeZone = configuration.getString(OPTIONAL_TIME_ZONE);
+ String accessKey = configuration.getString(OPTIONAL_ACCESS_KEY);
+ String secretKey = configuration.getString(OPTIONAL_SECRET_KEY);
long startTime = startTimeMs;
if (startTime == -1) {
if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) {
@@ -167,6 +169,8 @@
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
tag,
sql,
stopInMs,
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index e75bfaa..7a96d08 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -149,6 +149,8 @@
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
tag,
sql,
stopInMs,
@@ -238,6 +240,8 @@
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress);
consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, tag);
consumerProps.setProperty(RocketMQConfig.CONSUMER_SQL, sql);
+ consumerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
+ consumerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
return consumerProps;
}