[ISSUE #12] Support scan startup mode (#54)
diff --git a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index dfdef29..22903c5 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -108,4 +108,10 @@
public static final ConfigOption<String> OPTIONAL_SECRET_KEY =
ConfigOptions.key("secretKey").stringType().noDefaultValue();
+
+ public static final ConfigOption<String> OPTIONAL_SCAN_STARTUP_MODE =
+ ConfigOptions.key("scanStartupMode").stringType().defaultValue("latest");
+
+ public static final ConfigOption<Long> OPTIONAL_OFFSET_FROM_TIMESTAMP =
+ ConfigOptions.key("offsetFromTimestamp").longType().noDefaultValue();
}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index 8c804cf..27c69f1 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -58,6 +58,8 @@
ResultTypeQueryable<OUT> {
private static final long serialVersionUID = -1L;
+ private final String consumerOffsetMode;
+ private final long consumerOffsetTimestamp;
private final String topic;
private final String consumerGroup;
private final String nameServerAddress;
@@ -89,7 +91,9 @@
long startOffset,
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
- RocketMQDeserializationSchema<OUT> deserializationSchema) {
+ RocketMQDeserializationSchema<OUT> deserializationSchema,
+ String cosumerOffsetMode,
+ long consumerOffsetTimestamp) {
Validate.isTrue(
!(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
"Consumer tag and sql can not set value at the same time");
@@ -102,10 +106,12 @@
this.sql = sql;
this.stopInMs = stopInMs;
this.startTime = startTime;
- this.startOffset = startOffset;
+ this.startOffset = startOffset > 0 ? startOffset : startTime;
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
this.boundedness = boundedness;
this.deserializationSchema = deserializationSchema;
+ this.consumerOffsetMode = cosumerOffsetMode;
+ this.consumerOffsetTimestamp = consumerOffsetTimestamp;
}
@Override
@@ -169,7 +175,9 @@
startOffset,
partitionDiscoveryIntervalMs,
boundedness,
- enumContext);
+ enumContext,
+ consumerOffsetMode,
+ consumerOffsetTimestamp);
}
@Override
@@ -188,7 +196,9 @@
partitionDiscoveryIntervalMs,
boundedness,
enumContext,
- checkpoint.getCurrentAssignment());
+ checkpoint.getCurrentAssignment(),
+ consumerOffsetMode,
+ consumerOffsetTimestamp);
}
@Override
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
index db593ec..38aa132 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -49,7 +49,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
+
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
/** The enumerator class for RocketMQ source. */
@Internal
@@ -57,7 +60,9 @@
implements SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> {
private static final Logger LOG = LoggerFactory.getLogger(RocketMQSourceEnumerator.class);
-
+ private final Map<MessageQueue, Long> offsetTable = new HashMap<>();
+ private final String consumerOffsetMode;
+ private final long consumerOffsetTimestamp;
/** The topic used for this RocketMQSource. */
private final String topic;
/** The consumer group used for this RocketMQSource. */
@@ -108,7 +113,9 @@
long startOffset,
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
- SplitEnumeratorContext<RocketMQPartitionSplit> context) {
+ SplitEnumeratorContext<RocketMQPartitionSplit> context,
+ String consumerOffsetMode,
+ long consumerOffsetTimestamp) {
this(
topic,
consumerGroup,
@@ -120,7 +127,9 @@
partitionDiscoveryIntervalMs,
boundedness,
context,
- new HashMap<>());
+ new HashMap<>(),
+ consumerOffsetMode,
+ consumerOffsetTimestamp);
}
public RocketMQSourceEnumerator(
@@ -134,7 +143,9 @@
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
SplitEnumeratorContext<RocketMQPartitionSplit> context,
- Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments) {
+ Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments,
+ String consumerOffsetMode,
+ long consumerOffsetTimestamp) {
this.topic = topic;
this.consumerGroup = consumerGroup;
this.nameServerAddress = nameServerAddress;
@@ -158,6 +169,8 @@
s.getBroker(),
s.getPartition()))));
this.pendingPartitionSplitAssignment = new HashMap<>();
+ this.consumerOffsetMode = consumerOffsetMode;
+ this.consumerOffsetTimestamp = consumerOffsetTimestamp;
}
@Override
@@ -221,6 +234,7 @@
Set<Tuple3<String, String, Integer>> removedPartitions =
new HashSet<>(Collections.unmodifiableSet(discoveredPartitions));
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
+ Set<RocketMQPartitionSplit> result = new HashSet<>();
for (MessageQueue messageQueue : messageQueues) {
Tuple3<String, String, Integer> topicPartition =
new Tuple3<>(
@@ -229,19 +243,17 @@
messageQueue.getQueueId());
if (!removedPartitions.remove(topicPartition)) {
newPartitions.add(topicPartition);
+ result.add(
+ new RocketMQPartitionSplit(
+ topicPartition.f0,
+ topicPartition.f1,
+ topicPartition.f2,
+ getOffsetByMessageQueue(messageQueue),
+ stopInMs));
}
}
discoveredPartitions.addAll(Collections.unmodifiableSet(newPartitions));
- return newPartitions.stream()
- .map(
- messageQueue ->
- new RocketMQPartitionSplit(
- messageQueue.f0,
- messageQueue.f1,
- messageQueue.f2,
- startOffset,
- stopInMs))
- .collect(Collectors.toSet());
+ return result;
}
// This method should only be invoked in the coordinator executor thread.
@@ -317,6 +329,35 @@
});
}
+ private long getOffsetByMessageQueue(MessageQueue mq) throws MQClientException {
+ Long offset = offsetTable.get(mq);
+ if (offset == null) {
+ if (startOffset > 0) {
+ offset = startOffset;
+ } else {
+ switch (consumerOffsetMode) {
+ case CONSUMER_OFFSET_EARLIEST:
+ offset = consumer.minOffset(mq);
+ break;
+ case CONSUMER_OFFSET_LATEST:
+ offset = consumer.maxOffset(mq);
+ break;
+ case CONSUMER_OFFSET_TIMESTAMP:
+ offset = consumer.searchOffset(mq, consumerOffsetTimestamp);
+ break;
+ default:
+ offset = consumer.fetchConsumeOffset(mq, false);
+ if (offset < 0) {
+ throw new IllegalArgumentException(
+ "Unknown value for CONSUMER_OFFSET_RESET_TO.");
+ }
+ }
+ }
+ }
+ offsetTable.put(mq, offset);
+ return offsetTable.get(mq);
+ }
+
private void initialRocketMQConsumer() {
try {
if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index ad25f0e..ca9c3f1 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -266,7 +266,7 @@
"The SplitChange type of %s is not supported.",
splitsChange.getClass()));
}
- // Setup the stopping timestamps.
+ // Set up the stopping timestamps.
splitsChange
.splits()
.forEach(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index 1c66dab..6db5075 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.flink.source.table;
+import org.apache.rocketmq.flink.common.RocketMQOptions;
+
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
@@ -48,6 +50,7 @@
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SECRET_KEY;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SQL;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
@@ -57,6 +60,7 @@
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_USE_NEW_API;
import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
/**
* Defines the {@link DynamicTableSourceFactory} implementation to create {@link
@@ -99,6 +103,7 @@
optionalOptions.add(OPTIONAL_LENGTH_CHECK);
optionalOptions.add(OPTIONAL_ACCESS_KEY);
optionalOptions.add(OPTIONAL_SECRET_KEY);
+ optionalOptions.add(OPTIONAL_SCAN_STARTUP_MODE);
return optionalOptions;
}
@@ -113,6 +118,18 @@
String nameServerAddress = configuration.getString(NAME_SERVER_ADDRESS);
String tag = configuration.getString(OPTIONAL_TAG);
String sql = configuration.getString(OPTIONAL_SQL);
+ if (configuration.contains(OPTIONAL_SCAN_STARTUP_MODE)
+ && (configuration.contains(OPTIONAL_START_MESSAGE_OFFSET)
+ || configuration.contains(OPTIONAL_START_TIME_MILLS)
+ || configuration.contains(OPTIONAL_START_TIME))) {
+ throw new IllegalArgumentException(
+ String.format(
+ "cannot support these configs when %s has been set: [%s, %s, %s] !",
+ OPTIONAL_SCAN_STARTUP_MODE.key(),
+ OPTIONAL_START_MESSAGE_OFFSET.key(),
+ OPTIONAL_START_TIME.key(),
+ OPTIONAL_START_TIME_MILLS.key()));
+ }
long startMessageOffset = configuration.getLong(OPTIONAL_START_MESSAGE_OFFSET);
long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS);
String startDateTime = configuration.getString(OPTIONAL_START_TIME);
@@ -158,6 +175,12 @@
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
descriptorProperties.putTableSchema("schema", physicalSchema);
+ String consumerOffsetMode =
+ configuration.getString(
+ RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE, CONSUMER_OFFSET_LATEST);
+ long consumerOffsetTimestamp =
+ configuration.getLong(
+ RocketMQOptions.OPTIONAL_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis());
return new RocketMQScanTableSource(
descriptorProperties,
physicalSchema,
@@ -172,6 +195,8 @@
startMessageOffset,
startMessageOffset < 0 ? startTime : -1L,
partitionDiscoveryIntervalMs,
+ consumerOffsetMode,
+ consumerOffsetTimestamp,
useNewApi);
}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index 77420e5..dc92a47 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -56,6 +56,9 @@
private final DescriptorProperties properties;
private final TableSchema schema;
+ private final String consumerOffsetMode;
+ private final long consumerOffsetTimestamp;
+
private final String topic;
private final String consumerGroup;
private final String nameServerAddress;
@@ -87,6 +90,8 @@
long startMessageOffset,
long startTime,
long partitionDiscoveryIntervalMs,
+ String consumerOffsetMode,
+ long consumerOffsetTimestamp,
boolean useNewApi) {
this.properties = properties;
this.schema = schema;
@@ -103,6 +108,8 @@
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
this.useNewApi = useNewApi;
this.metadataKeys = Collections.emptyList();
+ this.consumerOffsetMode = consumerOffsetMode;
+ this.consumerOffsetTimestamp = consumerOffsetTimestamp;
}
@Override
@@ -127,7 +134,9 @@
startMessageOffset < 0 ? 0 : startMessageOffset,
partitionDiscoveryIntervalMs,
isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
- createRocketMQDeserializationSchema()));
+ createRocketMQDeserializationSchema(),
+ consumerOffsetMode,
+ consumerOffsetTimestamp));
} else {
return SourceFunctionProvider.of(
new RocketMQSourceFunction<>(
@@ -166,6 +175,8 @@
startMessageOffset,
startTime,
partitionDiscoveryIntervalMs,
+ consumerOffsetMode,
+ consumerOffsetTimestamp,
useNewApi);
tableSource.metadataKeys = metadataKeys;
return tableSource;