| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.kafka.internals; |
| |
| import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; |
| import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; |
| import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; |
| import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; |
| import org.apache.flink.streaming.api.watermark.Watermark; |
| import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext; |
| import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; |
| import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; |
| import org.apache.flink.util.SerializedValue; |
| |
| import org.junit.Test; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Optional; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.mock; |
| |
| /** |
| * Tests for the {@link AbstractFetcher}. |
| */ |
| @SuppressWarnings("serial") |
| public class AbstractFetcherTest { |
| |
| @Test |
| public void testIgnorePartitionStateSentinelInSnapshot() throws Exception { |
| final String testTopic = "test topic name"; |
| Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 2), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 3), KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); |
| |
| TestSourceContext<Long> sourceContext = new TestSourceContext<>(); |
| |
| TestFetcher<Long> fetcher = new TestFetcher<>( |
| sourceContext, |
| originalPartitions, |
| null, |
| null, |
| mock(TestProcessingTimeService.class), |
| 0); |
| |
| synchronized (sourceContext.getCheckpointLock()) { |
| HashMap<KafkaTopicPartition, Long> currentState = fetcher.snapshotCurrentState(); |
| fetcher.commitInternalOffsetsToKafka(currentState, new KafkaCommitCallback() { |
| @Override |
| public void onSuccess() { |
| } |
| |
| @Override |
| public void onException(Throwable cause) { |
| throw new RuntimeException("Callback failed", cause); |
| } |
| }); |
| |
| assertTrue(fetcher.getLastCommittedOffsets().isPresent()); |
| assertEquals(Collections.emptyMap(), fetcher.getLastCommittedOffsets().get()); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Record emitting tests |
| // ------------------------------------------------------------------------ |
| |
| @Test |
| public void testSkipCorruptedRecord() throws Exception { |
| final String testTopic = "test topic name"; |
| Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| |
| TestSourceContext<Long> sourceContext = new TestSourceContext<>(); |
| |
| TestFetcher<Long> fetcher = new TestFetcher<>( |
| sourceContext, |
| originalPartitions, |
| null, /* periodic watermark assigner */ |
| null, /* punctuated watermark assigner */ |
| mock(TestProcessingTimeService.class), |
| 0); |
| |
| final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates().get(0); |
| |
| fetcher.emitRecord(1L, partitionStateHolder, 1L); |
| fetcher.emitRecord(2L, partitionStateHolder, 2L); |
| assertEquals(2L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(2L, partitionStateHolder.getOffset()); |
| |
| // emit null record |
| fetcher.emitRecord(null, partitionStateHolder, 3L); |
| assertEquals(2L, sourceContext.getLatestElement().getValue().longValue()); // the null record should be skipped |
| assertEquals(3L, partitionStateHolder.getOffset()); // the offset in state still should have advanced |
| } |
| |
| @Test |
| public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception { |
| final String testTopic = "test topic name"; |
| Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| |
| TestSourceContext<Long> sourceContext = new TestSourceContext<>(); |
| |
| TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); |
| |
| TestFetcher<Long> fetcher = new TestFetcher<>( |
| sourceContext, |
| originalPartitions, |
| null, /* periodic watermark assigner */ |
| new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), /* punctuated watermark assigner */ |
| processingTimeProvider, |
| 0); |
| |
| final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates().get(0); |
| |
| // elements generate a watermark if the timestamp is a multiple of three |
| fetcher.emitRecord(1L, partitionStateHolder, 1L); |
| fetcher.emitRecord(2L, partitionStateHolder, 2L); |
| fetcher.emitRecord(3L, partitionStateHolder, 3L); |
| assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); |
| assertTrue(sourceContext.hasWatermark()); |
| assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); |
| assertEquals(3L, partitionStateHolder.getOffset()); |
| |
| // emit null record |
| fetcher.emitRecord(null, partitionStateHolder, 4L); |
| |
| // no elements or watermarks should have been collected |
| assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); |
| assertFalse(sourceContext.hasWatermark()); |
| // the offset in state still should have advanced |
| assertEquals(4L, partitionStateHolder.getOffset()); |
| } |
| |
| @Test |
| public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception { |
| final String testTopic = "test topic name"; |
| Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| |
| TestSourceContext<Long> sourceContext = new TestSourceContext<>(); |
| |
| TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); |
| |
| TestFetcher<Long> fetcher = new TestFetcher<>( |
| sourceContext, |
| originalPartitions, |
| new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), /* periodic watermark assigner */ |
| null, /* punctuated watermark assigner */ |
| processingTimeProvider, |
| 10); |
| |
| final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates().get(0); |
| |
| // elements generate a watermark if the timestamp is a multiple of three |
| fetcher.emitRecord(1L, partitionStateHolder, 1L); |
| fetcher.emitRecord(2L, partitionStateHolder, 2L); |
| fetcher.emitRecord(3L, partitionStateHolder, 3L); |
| assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); |
| assertEquals(3L, partitionStateHolder.getOffset()); |
| |
| // advance timer for watermark emitting |
| processingTimeProvider.setCurrentTime(10L); |
| assertTrue(sourceContext.hasWatermark()); |
| assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); |
| |
| // emit null record |
| fetcher.emitRecord(null, partitionStateHolder, 4L); |
| |
| // no elements should have been collected |
| assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); |
| // the offset in state still should have advanced |
| assertEquals(4L, partitionStateHolder.getOffset()); |
| |
| // no watermarks should be collected |
| processingTimeProvider.setCurrentTime(20L); |
| assertFalse(sourceContext.hasWatermark()); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Timestamps & watermarks tests |
| // ------------------------------------------------------------------------ |
| |
| @Test |
| public void testPunctuatedWatermarks() throws Exception { |
| final String testTopic = "test topic name"; |
| Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| |
| TestSourceContext<Long> sourceContext = new TestSourceContext<>(); |
| |
| TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); |
| |
| TestFetcher<Long> fetcher = new TestFetcher<>( |
| sourceContext, |
| originalPartitions, |
| null, /* periodic watermark assigner */ |
| new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), |
| processingTimeProvider, |
| 0); |
| |
| final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates().get(0); |
| final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates().get(1); |
| final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates().get(2); |
| |
| // elements generate a watermark if the timestamp is a multiple of three |
| |
| // elements for partition 1 |
| fetcher.emitRecord(1L, part1, 1L); |
| fetcher.emitRecord(2L, part1, 2L); |
| fetcher.emitRecord(3L, part1, 3L); |
| assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); |
| assertFalse(sourceContext.hasWatermark()); |
| |
| // elements for partition 2 |
| fetcher.emitRecord(12L, part2, 1L); |
| assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); |
| assertFalse(sourceContext.hasWatermark()); |
| |
| // elements for partition 3 |
| fetcher.emitRecord(101L, part3, 1L); |
| fetcher.emitRecord(102L, part3, 2L); |
| assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); |
| |
| // now, we should have a watermark |
| assertTrue(sourceContext.hasWatermark()); |
| assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); |
| |
| // advance partition 3 |
| fetcher.emitRecord(1003L, part3, 3L); |
| fetcher.emitRecord(1004L, part3, 4L); |
| fetcher.emitRecord(1005L, part3, 5L); |
| assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); |
| |
| // advance partition 1 beyond partition 2 - this bumps the watermark |
| fetcher.emitRecord(30L, part1, 4L); |
| assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); |
| assertTrue(sourceContext.hasWatermark()); |
| assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); |
| |
| // advance partition 2 again - this bumps the watermark |
| fetcher.emitRecord(13L, part2, 2L); |
| assertFalse(sourceContext.hasWatermark()); |
| fetcher.emitRecord(14L, part2, 3L); |
| assertFalse(sourceContext.hasWatermark()); |
| fetcher.emitRecord(15L, part2, 3L); |
| assertTrue(sourceContext.hasWatermark()); |
| assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp()); |
| } |
| |
| @Test |
| public void testPeriodicWatermarks() throws Exception { |
| final String testTopic = "test topic name"; |
| Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 7), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 13), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| originalPartitions.put(new KafkaTopicPartition(testTopic, 21), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); |
| |
| TestSourceContext<Long> sourceContext = new TestSourceContext<>(); |
| |
| TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); |
| |
| TestFetcher<Long> fetcher = new TestFetcher<>( |
| sourceContext, |
| originalPartitions, |
| new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), |
| null, /* punctuated watermarks assigner*/ |
| processingTimeService, |
| 10); |
| |
| final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitionStates().get(0); |
| final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitionStates().get(1); |
| final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates().get(2); |
| |
| // elements generate a watermark if the timestamp is a multiple of three |
| |
| // elements for partition 1 |
| fetcher.emitRecord(1L, part1, 1L); |
| fetcher.emitRecord(2L, part1, 2L); |
| fetcher.emitRecord(3L, part1, 3L); |
| assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); |
| |
| // elements for partition 2 |
| fetcher.emitRecord(12L, part2, 1L); |
| assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); |
| |
| // elements for partition 3 |
| fetcher.emitRecord(101L, part3, 1L); |
| fetcher.emitRecord(102L, part3, 2L); |
| assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); |
| |
| processingTimeService.setCurrentTime(10); |
| |
| // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) |
| assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); |
| |
| // advance partition 3 |
| fetcher.emitRecord(1003L, part3, 3L); |
| fetcher.emitRecord(1004L, part3, 4L); |
| fetcher.emitRecord(1005L, part3, 5L); |
| assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); |
| |
| // advance partition 1 beyond partition 2 - this bumps the watermark |
| fetcher.emitRecord(30L, part1, 4L); |
| assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); |
| assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); |
| |
| processingTimeService.setCurrentTime(20); |
| |
| // this blocks until the periodic thread emitted the watermark |
| assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); |
| |
| // advance partition 2 again - this bumps the watermark |
| fetcher.emitRecord(13L, part2, 2L); |
| fetcher.emitRecord(14L, part2, 3L); |
| fetcher.emitRecord(15L, part2, 3L); |
| |
| processingTimeService.setCurrentTime(30); |
| // this blocks until the periodic thread emitted the watermark |
| long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); |
| assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); |
| } |
| |
| @Test |
| public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWatermarks() throws Exception { |
| final String testTopic = "test topic name"; |
| Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); |
| |
| TestSourceContext<Long> sourceContext = new TestSourceContext<>(); |
| |
| TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); |
| |
| TestFetcher<Long> fetcher = new TestFetcher<>( |
| sourceContext, |
| originalPartitions, |
| new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), |
| null, /* punctuated watermarks assigner*/ |
| processingTimeProvider, |
| 10); |
| |
| processingTimeProvider.setCurrentTime(10); |
| // no partitions; when the periodic watermark emitter fires, no watermark should be emitted |
| assertFalse(sourceContext.hasWatermark()); |
| |
| // counter-test that when the fetcher does actually have partitions, |
| // when the periodic watermark emitter fires again, a watermark really is emitted |
| fetcher.addDiscoveredPartitions(Collections.singletonList(new KafkaTopicPartition(testTopic, 0))); |
| fetcher.emitRecord(100L, fetcher.subscribedPartitionStates().get(0), 3L); |
| processingTimeProvider.setCurrentTime(20); |
| assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Test mocks |
| // ------------------------------------------------------------------------ |
| |
| private static final class TestFetcher<T> extends AbstractFetcher<T, Object> { |
| protected Optional<Map<KafkaTopicPartition, Long>> lastCommittedOffsets = Optional.empty(); |
| |
| protected TestFetcher( |
| SourceContext<T> sourceContext, |
| Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets, |
| SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, |
| SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, |
| ProcessingTimeService processingTimeProvider, |
| long autoWatermarkInterval) throws Exception { |
| this(sourceContext, assignedPartitionsWithStartOffsets, null, watermarksPeriodic, |
| watermarksPunctuated, processingTimeProvider, autoWatermarkInterval); |
| } |
| |
| protected TestFetcher( |
| SourceContext<T> sourceContext, |
| Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets, |
| Map<KafkaTopicPartition, Long> partitionsToEndOffsets, |
| SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, |
| SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, |
| ProcessingTimeService processingTimeProvider, |
| long autoWatermarkInterval) throws Exception { |
| super( |
| sourceContext, |
| assignedPartitionsWithStartOffsets, |
| partitionsToEndOffsets, |
| watermarksPeriodic, |
| watermarksPunctuated, |
| processingTimeProvider, |
| autoWatermarkInterval, |
| TestFetcher.class.getClassLoader(), |
| new UnregisteredMetricsGroup(), |
| false); |
| } |
| |
| @Override |
| public void runFetchLoop(boolean dynamicDiscover) throws Exception { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void cancel() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Object createKafkaPartitionHandle(KafkaTopicPartition partition) { |
| return new Object(); |
| } |
| |
| @Override |
| protected void doCommitInternalOffsetsToKafka( |
| Map<KafkaTopicPartition, Long> offsets, |
| @Nonnull KafkaCommitCallback callback) throws Exception { |
| lastCommittedOffsets = Optional.of(offsets); |
| callback.onSuccess(); |
| } |
| |
| public Optional<Map<KafkaTopicPartition, Long>> getLastCommittedOffsets() { |
| return lastCommittedOffsets; |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> { |
| |
| private volatile long maxTimestamp = Long.MIN_VALUE; |
| |
| @Override |
| public long extractTimestamp(Long element, long previousElementTimestamp) { |
| maxTimestamp = Math.max(maxTimestamp, element); |
| return element; |
| } |
| |
| @Nullable |
| @Override |
| public Watermark getCurrentWatermark() { |
| return new Watermark(maxTimestamp); |
| } |
| } |
| |
| private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> { |
| |
| @Override |
| public long extractTimestamp(Long element, long previousElementTimestamp) { |
| return element; |
| } |
| |
| @Nullable |
| @Override |
| public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { |
| return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null; |
| } |
| |
| } |
| } |