[FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data
This closes #52.
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java
new file mode 100644
index 0000000..b6c95a6
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator.initializer;
+
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * An implementation of {@link OffsetsInitializer} to initialize the offsets based on a
+ * latest-offset.
+ *
+ * <p>Package private and should be instantiated via {@link OffsetsInitializer}.
+ */
+class LatestOffsetsInitializer implements OffsetsInitializer {
+ private static final long serialVersionUID = 3014700244733286989L;
+
+ @Override
+ public Map<TopicPartition, Long> getPartitionOffsets(
+ Collection<TopicPartition> partitions,
+ PartitionOffsetsRetriever partitionOffsetsRetriever) {
+ return partitionOffsetsRetriever.endOffsets(partitions);
+ }
+
+ @Override
+ public OffsetResetStrategy getAutoOffsetResetStrategy() {
+ return OffsetResetStrategy.LATEST;
+ }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index db682c6..0f0c5d2 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -155,8 +155,7 @@
* @return an {@link OffsetsInitializer} which initializes the offsets to the latest offsets.
*/
static OffsetsInitializer latest() {
- return new ReaderHandledOffsetsInitializer(
- KafkaPartitionSplit.LATEST_OFFSET, OffsetResetStrategy.LATEST);
+ return new LatestOffsetsInitializer();
}
/**
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
index 026320d..42abd57 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
@@ -32,9 +32,9 @@
import static org.apache.flink.util.Preconditions.checkState;
/**
- * A initializer that initialize the partitions to the earliest / latest / last-committed offsets.
- * The offsets initialization are taken care of by the {@code KafkaPartitionSplitReader} instead of
- * by the {@code KafkaSourceEnumerator}.
+ * A initializer that initialize the partitions to the earliest / last-committed offsets. The
+ * offsets initialization are taken care of by the {@code KafkaPartitionSplitReader} instead of by
+ * the {@code KafkaSourceEnumerator}.
*
* <p>Package private and should be instantiated via {@link OffsetsInitializer}.
*/
@@ -46,8 +46,7 @@
/**
* The only valid value for startingOffset is following. {@link
* KafkaPartitionSplit#EARLIEST_OFFSET EARLIEST_OFFSET}, {@link
- * KafkaPartitionSplit#LATEST_OFFSET LATEST_OFFSET}, {@link KafkaPartitionSplit#COMMITTED_OFFSET
- * COMMITTED_OFFSET}
+ * KafkaPartitionSplit#COMMITTED_OFFSET COMMITTED_OFFSET}
*/
ReaderHandledOffsetsInitializer(long startingOffset, OffsetResetStrategy offsetResetStrategy) {
this.startingOffset = startingOffset;
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
index 8c2a1fd..ef1b8b8 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
@@ -35,7 +35,8 @@
public class KafkaPartitionSplit implements SourceSplit {
public static final long NO_STOPPING_OFFSET = Long.MIN_VALUE;
// Indicating the split should consume from the latest.
- public static final long LATEST_OFFSET = -1;
+ // @deprecated Only be used for compatibility with the history state, see FLINK-28303
+ @Deprecated public static final long LATEST_OFFSET = -1;
// Indicating the split should consume from the earliest.
public static final long EARLIEST_OFFSET = -2;
// Indicating the split should consume from the last committed offset.
@@ -43,9 +44,9 @@
// Valid special starting offsets
public static final Set<Long> VALID_STARTING_OFFSET_MARKERS =
- new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, COMMITTED_OFFSET));
+ new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET));
public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS =
- new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
+ new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET));
private final TopicPartition tp;
private final long startingOffset;
@@ -132,8 +133,8 @@
String.format(
"Invalid starting offset %d is specified for partition %s. "
+ "It should either be non-negative or be one of the "
- + "[%d(earliest), %d(latest), %d(committed)].",
- startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET));
+ + "[%d(earliest), %d(committed)].",
+ startingOffset, tp, EARLIEST_OFFSET, COMMITTED_OFFSET));
}
if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
@@ -141,12 +142,8 @@
String.format(
"Illegal stopping offset %d is specified for partition %s. "
+ "It should either be non-negative or be one of the "
- + "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
- stoppingOffset,
- tp,
- LATEST_OFFSET,
- COMMITTED_OFFSET,
- NO_STOPPING_OFFSET));
+ + "[%d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
+ stoppingOffset, tp, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
}
}
}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index f30c660..8b308af 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -300,7 +300,7 @@
getAllAssignSplits(context, PRE_EXISTING_TOPICS);
assertThat(initialPartitionAssign)
.extracting(KafkaPartitionSplit::getStartingOffset)
- .containsOnly(KafkaPartitionSplit.LATEST_OFFSET);
+ .containsOnly((long) KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
List<KafkaPartitionSplit> newPartitionAssign =
getAllAssignSplits(context, Collections.singleton(DYNAMIC_TOPIC_NAME));
assertThat(newPartitionAssign)
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
index e0cd850..46dd61a 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
@@ -84,7 +84,7 @@
assertThat(offsets).hasSameSizeAs(partitions);
assertThat(offsets.keySet()).containsAll(partitions);
for (long offset : offsets.values()) {
- assertThat(offset).isEqualTo(KafkaPartitionSplit.LATEST_OFFSET);
+ assertThat(offset).isEqualTo(KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
}
assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index edd4132..b592a69 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -246,8 +246,8 @@
final KafkaPartitionSplit emptySplit =
new KafkaPartitionSplit(
new TopicPartition(TOPIC2, 0),
- KafkaPartitionSplit.LATEST_OFFSET,
- KafkaPartitionSplit.LATEST_OFFSET);
+ KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION,
+ KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
final KafkaPartitionSplit emptySplitWithZeroStoppingOffset =
new KafkaPartitionSplit(new TopicPartition(TOPIC3, 0), 0, 0);
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index b350d8c..f5aa7f5 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -397,7 +397,9 @@
// Normal split with NUM_RECORDS_PER_SPLIT records
final KafkaPartitionSplit normalSplit =
new KafkaPartitionSplit(
- new TopicPartition(TOPIC, 0), 0, KafkaPartitionSplit.LATEST_OFFSET);
+ new TopicPartition(TOPIC, 0),
+ 0,
+ KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
// Empty split with no record
final KafkaPartitionSplit emptySplit =
new KafkaPartitionSplit(
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
index 4ca5c9c..db76472 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
@@ -36,11 +36,7 @@
Long normalOffset = 1L;
TopicPartition topicPartition = new TopicPartition(topic, 1);
List<Long> stoppingOffsets =
- Lists.newArrayList(
- KafkaPartitionSplit.COMMITTED_OFFSET,
- KafkaPartitionSplit.LATEST_OFFSET,
- offsetZero,
- normalOffset);
+ Lists.newArrayList(KafkaPartitionSplit.COMMITTED_OFFSET, offsetZero, normalOffset);
KafkaPartitionSplitSerializer splitSerializer = new KafkaPartitionSplitSerializer();
for (Long stoppingOffset : stoppingOffsets) {
KafkaPartitionSplit kafkaPartitionSplit =
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 2c82fc1..1246d53 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -477,13 +477,17 @@
OffsetsInitializer offsetsInitializer =
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
TopicPartition partition = new TopicPartition(TOPIC, 0);
+ long endOffsets = 123L;
Map<TopicPartition, Long> partitionOffsets =
offsetsInitializer.getPartitionOffsets(
Collections.singletonList(partition),
- MockPartitionOffsetsRetriever.noInteractions());
+ MockPartitionOffsetsRetriever.latest(
+ (tps) ->
+ Collections.singletonMap(
+ partition, endOffsets)));
assertThat(partitionOffsets)
.containsOnlyKeys(partition)
- .containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET);
+ .containsEntry(partition, endOffsets);
});
}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 2674183..409acd9 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -18,12 +18,19 @@
package org.apache.flink.streaming.connectors.kafka.table;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.EncodingUtils;
@@ -945,6 +952,138 @@
}
@Test
+ public void testLatestOffsetStrategyResume() throws Exception {
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ final String topic = "latest_offset_resume_topic_" + format + "_" + UUID.randomUUID();
+ createTestTopic(topic, 6, 1);
+ env.setParallelism(1);
+
+ // ---------- Produce data into Kafka's partition 0-6 -------------------
+
+ String groupId = getStandardProps().getProperty("group.id");
+ String bootstraps = getBootstrapServers();
+
+ final String createTable =
+ String.format(
+ "CREATE TABLE kafka (\n"
+ + " `partition_id` INT,\n"
+ + " `value` INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'properties.group.id' = '%s',\n"
+ + " 'scan.startup.mode' = 'latest-offset',\n"
+ + " 'sink.partitioner' = '%s',\n"
+ + " 'format' = '%s'\n"
+ + ")",
+ topic, bootstraps, groupId, TestPartitioner.class.getName(), format);
+
+ tEnv.executeSql(createTable);
+
+ String initialValues =
+ "INSERT INTO kafka VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (5, 0)";
+ tEnv.executeSql(initialValues).await();
+
+ // ---------- Consume stream from Kafka -------------------
+
+ String createSink =
+ "CREATE TABLE MySink(\n"
+ + " `id` INT,\n"
+ + " `value` INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values'\n"
+ + ")";
+ tEnv.executeSql(createSink);
+
+ String executeInsert = "INSERT INTO MySink SELECT `partition_id`, `value` FROM kafka";
+ TableResult tableResult = tEnv.executeSql(executeInsert);
+
+ // ---------- Produce data into Kafka's partition 0-2 -------------------
+
+ String moreValues = "INSERT INTO kafka VALUES (0, 1), (1, 1), (2, 1)";
+ tEnv.executeSql(moreValues).await();
+
+ final List<String> expected = Arrays.asList("+I[0, 1]", "+I[1, 1]", "+I[2, 1]");
+ KafkaTableTestUtils.waitingExpectedResults("MySink", expected, Duration.ofSeconds(5));
+
+ // ---------- Stop the consume job with savepoint -------------------
+
+ String savepointBasePath = getTempDirPath(topic + "-savepoint");
+ assert tableResult.getJobClient().isPresent();
+ JobClient client = tableResult.getJobClient().get();
+ String savepointPath =
+ client.stopWithSavepoint(false, savepointBasePath, SavepointFormatType.DEFAULT)
+ .get();
+
+ // ---------- Produce data into Kafka's partition 0-5 -------------------
+
+ String produceValuesBeforeResume =
+ "INSERT INTO kafka VALUES (0, 2), (1, 2), (2, 2), (3, 1), (4, 1), (5, 1)";
+ tEnv.executeSql(produceValuesBeforeResume).await();
+
+ // ---------- Resume the consume job from savepoint -------------------
+
+ Configuration configuration = new Configuration();
+ configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
+ configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+
+ tEnv.executeSql(createTable);
+ tEnv.executeSql(createSink);
+ tableResult = tEnv.executeSql(executeInsert);
+
+ final List<String> afterResumeExpected =
+ Arrays.asList(
+ "+I[0, 1]",
+ "+I[1, 1]",
+ "+I[2, 1]",
+ "+I[0, 2]",
+ "+I[1, 2]",
+ "+I[2, 2]",
+ "+I[3, 1]",
+ "+I[4, 1]",
+ "+I[5, 1]");
+ KafkaTableTestUtils.waitingExpectedResults(
+ "MySink", afterResumeExpected, Duration.ofSeconds(5));
+
+ // ---------- Produce data into Kafka's partition 0-5 -------------------
+
+ String produceValuesAfterResume =
+ "INSERT INTO kafka VALUES (0, 3), (1, 3), (2, 3), (3, 2), (4, 2), (5, 2)";
+ this.tEnv.executeSql(produceValuesAfterResume).await();
+
+ final List<String> afterProduceExpected =
+ Arrays.asList(
+ "+I[0, 1]",
+ "+I[1, 1]",
+ "+I[2, 1]",
+ "+I[0, 2]",
+ "+I[1, 2]",
+ "+I[2, 2]",
+ "+I[3, 1]",
+ "+I[4, 1]",
+ "+I[5, 1]",
+ "+I[0, 3]",
+ "+I[1, 3]",
+ "+I[2, 3]",
+ "+I[3, 2]",
+ "+I[4, 2]",
+ "+I[5, 2]");
+ KafkaTableTestUtils.waitingExpectedResults(
+ "MySink", afterProduceExpected, Duration.ofSeconds(5));
+
+ // ------------- cleanup -------------------
+
+ tableResult.getJobClient().ifPresent(JobClient::cancel);
+ deleteTestTopic(topic);
+ }
+
+ @Test
public void testStartFromGroupOffsetsLatest() throws Exception {
testStartFromGroupOffsets("latest");
}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index 41d9e7e..15c740d 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -465,13 +465,17 @@
OffsetsInitializer offsetsInitializer =
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
TopicPartition partition = new TopicPartition(SOURCE_TOPIC, 0);
+ long endOffsets = 123L;
Map<TopicPartition, Long> partitionOffsets =
offsetsInitializer.getPartitionOffsets(
Collections.singletonList(partition),
- MockPartitionOffsetsRetriever.noInteractions());
+ MockPartitionOffsetsRetriever.latest(
+ (tps) ->
+ Collections.singletonMap(
+ partition, endOffsets)));
assertThat(partitionOffsets)
.containsOnlyKeys(partition)
- .containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET);
+ .containsEntry(partition, endOffsets);
});
}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java
index 175bddd..9947bc5 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java
@@ -68,6 +68,17 @@
UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever);
}
+ public static MockPartitionOffsetsRetriever latest(OffsetsRetriever endOffsets) {
+ return new MockPartitionOffsetsRetriever(
+ UNSUPPORTED_RETRIEVAL,
+ endOffsets,
+ UNSUPPORTED_RETRIEVAL,
+ partitions -> {
+ throw new UnsupportedOperationException(
+ "The method was not supposed to be called");
+ });
+ }
+
private MockPartitionOffsetsRetriever(
OffsetsRetriever committedOffsets,
OffsetsRetriever endOffsets,