[ISSUE #12] Support scan startup mode (#54)

diff --git a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index dfdef29..22903c5 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -108,4 +108,10 @@
 
     public static final ConfigOption<String> OPTIONAL_SECRET_KEY =
             ConfigOptions.key("secretKey").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> OPTIONAL_SCAN_STARTUP_MODE =
+            ConfigOptions.key("scanStartupMode").stringType().defaultValue("latest");
+
+    public static final ConfigOption<Long> OPTIONAL_OFFSET_FROM_TIMESTAMP =
+            ConfigOptions.key("offsetFromTimestamp").longType().noDefaultValue();
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index 8c804cf..27c69f1 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -58,6 +58,8 @@
                 ResultTypeQueryable<OUT> {
     private static final long serialVersionUID = -1L;
 
+    private final String consumerOffsetMode;
+    private final long consumerOffsetTimestamp;
     private final String topic;
     private final String consumerGroup;
     private final String nameServerAddress;
@@ -89,7 +91,9 @@
             long startOffset,
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
-            RocketMQDeserializationSchema<OUT> deserializationSchema) {
+            RocketMQDeserializationSchema<OUT> deserializationSchema,
+            String cosumerOffsetMode,
+            long consumerOffsetTimestamp) {
         Validate.isTrue(
                 !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
                 "Consumer tag and sql can not set value at the same time");
@@ -102,10 +106,12 @@
         this.sql = sql;
         this.stopInMs = stopInMs;
         this.startTime = startTime;
-        this.startOffset = startOffset;
+        this.startOffset = startOffset > 0 ? startOffset : startTime;
         this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
         this.boundedness = boundedness;
         this.deserializationSchema = deserializationSchema;
+        this.consumerOffsetMode = cosumerOffsetMode;
+        this.consumerOffsetTimestamp = consumerOffsetTimestamp;
     }
 
     @Override
@@ -169,7 +175,9 @@
                 startOffset,
                 partitionDiscoveryIntervalMs,
                 boundedness,
-                enumContext);
+                enumContext,
+                consumerOffsetMode,
+                consumerOffsetTimestamp);
     }
 
     @Override
@@ -188,7 +196,9 @@
                 partitionDiscoveryIntervalMs,
                 boundedness,
                 enumContext,
-                checkpoint.getCurrentAssignment());
+                checkpoint.getCurrentAssignment(),
+                consumerOffsetMode,
+                consumerOffsetTimestamp);
     }
 
     @Override
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
index db593ec..38aa132 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -49,7 +49,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
 
 /** The enumerator class for RocketMQ source. */
 @Internal
@@ -57,7 +60,9 @@
         implements SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> {
 
     private static final Logger LOG = LoggerFactory.getLogger(RocketMQSourceEnumerator.class);
-
+    private final Map<MessageQueue, Long> offsetTable = new HashMap<>();
+    private final String consumerOffsetMode;
+    private final long consumerOffsetTimestamp;
     /** The topic used for this RocketMQSource. */
     private final String topic;
     /** The consumer group used for this RocketMQSource. */
@@ -108,7 +113,9 @@
             long startOffset,
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
-            SplitEnumeratorContext<RocketMQPartitionSplit> context) {
+            SplitEnumeratorContext<RocketMQPartitionSplit> context,
+            String consumerOffsetMode,
+            long consumerOffsetTimestamp) {
         this(
                 topic,
                 consumerGroup,
@@ -120,7 +127,9 @@
                 partitionDiscoveryIntervalMs,
                 boundedness,
                 context,
-                new HashMap<>());
+                new HashMap<>(),
+                consumerOffsetMode,
+                consumerOffsetTimestamp);
     }
 
     public RocketMQSourceEnumerator(
@@ -134,7 +143,9 @@
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
             SplitEnumeratorContext<RocketMQPartitionSplit> context,
-            Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments) {
+            Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments,
+            String consumerOffsetMode,
+            long consumerOffsetTimestamp) {
         this.topic = topic;
         this.consumerGroup = consumerGroup;
         this.nameServerAddress = nameServerAddress;
@@ -158,6 +169,8 @@
                                                         s.getBroker(),
                                                         s.getPartition()))));
         this.pendingPartitionSplitAssignment = new HashMap<>();
+        this.consumerOffsetMode = consumerOffsetMode;
+        this.consumerOffsetTimestamp = consumerOffsetTimestamp;
     }
 
     @Override
@@ -221,6 +234,7 @@
         Set<Tuple3<String, String, Integer>> removedPartitions =
                 new HashSet<>(Collections.unmodifiableSet(discoveredPartitions));
         Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
+        Set<RocketMQPartitionSplit> result = new HashSet<>();
         for (MessageQueue messageQueue : messageQueues) {
             Tuple3<String, String, Integer> topicPartition =
                     new Tuple3<>(
@@ -229,19 +243,17 @@
                             messageQueue.getQueueId());
             if (!removedPartitions.remove(topicPartition)) {
                 newPartitions.add(topicPartition);
+                result.add(
+                        new RocketMQPartitionSplit(
+                                topicPartition.f0,
+                                topicPartition.f1,
+                                topicPartition.f2,
+                                getOffsetByMessageQueue(messageQueue),
+                                stopInMs));
             }
         }
         discoveredPartitions.addAll(Collections.unmodifiableSet(newPartitions));
-        return newPartitions.stream()
-                .map(
-                        messageQueue ->
-                                new RocketMQPartitionSplit(
-                                        messageQueue.f0,
-                                        messageQueue.f1,
-                                        messageQueue.f2,
-                                        startOffset,
-                                        stopInMs))
-                .collect(Collectors.toSet());
+        return result;
     }
 
     // This method should only be invoked in the coordinator executor thread.
@@ -317,6 +329,35 @@
                 });
     }
 
+    private long getOffsetByMessageQueue(MessageQueue mq) throws MQClientException {
+        Long offset = offsetTable.get(mq);
+        if (offset == null) {
+            if (startOffset > 0) {
+                offset = startOffset;
+            } else {
+                switch (consumerOffsetMode) {
+                    case CONSUMER_OFFSET_EARLIEST:
+                        offset = consumer.minOffset(mq);
+                        break;
+                    case CONSUMER_OFFSET_LATEST:
+                        offset = consumer.maxOffset(mq);
+                        break;
+                    case CONSUMER_OFFSET_TIMESTAMP:
+                        offset = consumer.searchOffset(mq, consumerOffsetTimestamp);
+                        break;
+                    default:
+                        offset = consumer.fetchConsumeOffset(mq, false);
+                        if (offset < 0) {
+                            throw new IllegalArgumentException(
+                                    "Unknown value for CONSUMER_OFFSET_RESET_TO.");
+                        }
+                }
+            }
+        }
+        offsetTable.put(mq, offset);
+        return offsetTable.get(mq);
+    }
+
     private void initialRocketMQConsumer() {
         try {
             if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index ad25f0e..ca9c3f1 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -266,7 +266,7 @@
                             "The SplitChange type of %s is not supported.",
                             splitsChange.getClass()));
         }
-        // Setup the stopping timestamps.
+        // Set up the stopping timestamps.
         splitsChange
                 .splits()
                 .forEach(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index 1c66dab..6db5075 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.rocketmq.flink.source.table;
 
+import org.apache.rocketmq.flink.common.RocketMQOptions;
+
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
@@ -48,6 +50,7 @@
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SECRET_KEY;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SQL;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
@@ -57,6 +60,7 @@
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_USE_NEW_API;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
 
 /**
  * Defines the {@link DynamicTableSourceFactory} implementation to create {@link
@@ -99,6 +103,7 @@
         optionalOptions.add(OPTIONAL_LENGTH_CHECK);
         optionalOptions.add(OPTIONAL_ACCESS_KEY);
         optionalOptions.add(OPTIONAL_SECRET_KEY);
+        optionalOptions.add(OPTIONAL_SCAN_STARTUP_MODE);
         return optionalOptions;
     }
 
@@ -113,6 +118,18 @@
         String nameServerAddress = configuration.getString(NAME_SERVER_ADDRESS);
         String tag = configuration.getString(OPTIONAL_TAG);
         String sql = configuration.getString(OPTIONAL_SQL);
+        if (configuration.contains(OPTIONAL_SCAN_STARTUP_MODE)
+                && (configuration.contains(OPTIONAL_START_MESSAGE_OFFSET)
+                        || configuration.contains(OPTIONAL_START_TIME_MILLS)
+                        || configuration.contains(OPTIONAL_START_TIME))) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "cannot support these configs when %s has been set: [%s, %s, %s] !",
+                            OPTIONAL_SCAN_STARTUP_MODE.key(),
+                            OPTIONAL_START_MESSAGE_OFFSET.key(),
+                            OPTIONAL_START_TIME.key(),
+                            OPTIONAL_START_TIME_MILLS.key()));
+        }
         long startMessageOffset = configuration.getLong(OPTIONAL_START_MESSAGE_OFFSET);
         long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS);
         String startDateTime = configuration.getString(OPTIONAL_START_TIME);
@@ -158,6 +175,12 @@
         TableSchema physicalSchema =
                 TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         descriptorProperties.putTableSchema("schema", physicalSchema);
+        String consumerOffsetMode =
+                configuration.getString(
+                        RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE, CONSUMER_OFFSET_LATEST);
+        long consumerOffsetTimestamp =
+                configuration.getLong(
+                        RocketMQOptions.OPTIONAL_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis());
         return new RocketMQScanTableSource(
                 descriptorProperties,
                 physicalSchema,
@@ -172,6 +195,8 @@
                 startMessageOffset,
                 startMessageOffset < 0 ? startTime : -1L,
                 partitionDiscoveryIntervalMs,
+                consumerOffsetMode,
+                consumerOffsetTimestamp,
                 useNewApi);
     }
 
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index 77420e5..dc92a47 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -56,6 +56,9 @@
     private final DescriptorProperties properties;
     private final TableSchema schema;
 
+    private final String consumerOffsetMode;
+    private final long consumerOffsetTimestamp;
+
     private final String topic;
     private final String consumerGroup;
     private final String nameServerAddress;
@@ -87,6 +90,8 @@
             long startMessageOffset,
             long startTime,
             long partitionDiscoveryIntervalMs,
+            String consumerOffsetMode,
+            long consumerOffsetTimestamp,
             boolean useNewApi) {
         this.properties = properties;
         this.schema = schema;
@@ -103,6 +108,8 @@
         this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
         this.useNewApi = useNewApi;
         this.metadataKeys = Collections.emptyList();
+        this.consumerOffsetMode = consumerOffsetMode;
+        this.consumerOffsetTimestamp = consumerOffsetTimestamp;
     }
 
     @Override
@@ -127,7 +134,9 @@
                             startMessageOffset < 0 ? 0 : startMessageOffset,
                             partitionDiscoveryIntervalMs,
                             isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
-                            createRocketMQDeserializationSchema()));
+                            createRocketMQDeserializationSchema(),
+                            consumerOffsetMode,
+                            consumerOffsetTimestamp));
         } else {
             return SourceFunctionProvider.of(
                     new RocketMQSourceFunction<>(
@@ -166,6 +175,8 @@
                         startMessageOffset,
                         startTime,
                         partitionDiscoveryIntervalMs,
+                        consumerOffsetMode,
+                        consumerOffsetTimestamp,
                         useNewApi);
         tableSource.metadataKeys = metadataKeys;
         return tableSource;