[FLINK-28303] Allow LATEST_OFFSET marker when restoring from old checkpoints
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
index ef1b8b8..7c04600 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
@@ -44,9 +44,9 @@
// Valid special starting offsets
public static final Set<Long> VALID_STARTING_OFFSET_MARKERS =
- new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET));
+ new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, COMMITTED_OFFSET));
public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS =
- new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET));
+ new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
private final TopicPartition tp;
private final long startingOffset;
@@ -133,8 +133,8 @@
String.format(
"Invalid starting offset %d is specified for partition %s. "
+ "It should either be non-negative or be one of the "
- + "[%d(earliest), %d(committed)].",
- startingOffset, tp, EARLIEST_OFFSET, COMMITTED_OFFSET));
+ + "[%d(earliest), %d(latest), %d(committed)].",
+ startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET));
}
if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
@@ -142,8 +142,12 @@
String.format(
"Illegal stopping offset %d is specified for partition %s. "
+ "It should either be non-negative or be one of the "
- + "[%d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
- stoppingOffset, tp, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
+ + "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
+ stoppingOffset,
+ tp,
+ LATEST_OFFSET,
+ COMMITTED_OFFSET,
+ NO_STOPPING_OFFSET));
}
}
}