[issues-44] support access control (#47)

diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index 816449f..f61cbda 100644
--- a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -92,10 +92,10 @@
                 properties,
                 schema,
                 topic,
-                null,
-                null,
                 producerGroup,
                 nameServerAddress,
+                null,
+                null,
                 tag,
                 dynamicColumn,
                 fieldDelimiter,
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index 4117cc1..e8693f7 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -122,6 +122,8 @@
         long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS);
         String startDateTime = configuration.getString(OPTIONAL_START_TIME);
         String timeZone = configuration.getString(OPTIONAL_TIME_ZONE);
+        String accessKey = configuration.getString(OPTIONAL_ACCESS_KEY);
+        String secretKey = configuration.getString(OPTIONAL_SECRET_KEY);
         long startTime = startTimeMs;
         if (startTime == -1) {
             if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) {
@@ -167,6 +169,8 @@
                 topic,
                 consumerGroup,
                 nameServerAddress,
+                accessKey,
+                secretKey,
                 tag,
                 sql,
                 stopInMs,
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index e75bfaa..7a96d08 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -149,6 +149,8 @@
                             topic,
                             consumerGroup,
                             nameServerAddress,
+                            accessKey,
+                            secretKey,
                             tag,
                             sql,
                             stopInMs,
@@ -238,6 +240,8 @@
         consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress);
         consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, tag);
         consumerProps.setProperty(RocketMQConfig.CONSUMER_SQL, sql);
+        consumerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
+        consumerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
         return consumerProps;
     }