[FLINK-28303] Allow LATEST_OFFSET marker when restoring from old checkpoints
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
index ef1b8b8..7c04600 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
@@ -44,9 +44,9 @@
 
     // Valid special starting offsets
     public static final Set<Long> VALID_STARTING_OFFSET_MARKERS =
-            new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET));
+            new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, COMMITTED_OFFSET));
     public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS =
-            new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET));
+            new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
 
     private final TopicPartition tp;
     private final long startingOffset;
@@ -133,8 +133,8 @@
                     String.format(
                             "Invalid starting offset %d is specified for partition %s. "
                                     + "It should either be non-negative or be one of the "
-                                    + "[%d(earliest), %d(committed)].",
-                            startingOffset, tp, EARLIEST_OFFSET, COMMITTED_OFFSET));
+                                    + "[%d(earliest), %d(latest), %d(committed)].",
+                            startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET));
         }
 
         if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
@@ -142,8 +142,12 @@
                     String.format(
                             "Illegal stopping offset %d is specified for partition %s. "
                                     + "It should either be non-negative or be one of the "
-                                    + "[%d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
-                            stoppingOffset, tp, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
+                                    + "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
+                            stoppingOffset,
+                            tp,
+                            LATEST_OFFSET,
+                            COMMITTED_OFFSET,
+                            NO_STOPPING_OFFSET));
         }
     }
 }