blob: eb868ed00cdf24163ba4e46ec369030dcd1dbc36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.stream.LongStream;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Tests for {@link UnboundedSourceWrapper}. */
@RunWith(Enclosed.class)
public class UnboundedSourceWrapperTest {
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapperTest.class);
/** Parameterized tests. */
@RunWith(Parameterized.class)
public static class ParameterizedUnboundedSourceWrapperTest {
private final int numTasks;
private final int numSplits;
public ParameterizedUnboundedSourceWrapperTest(int numTasks, int numSplits) {
this.numTasks = numTasks;
this.numSplits = numSplits;
}
@Parameterized.Parameters(name = "numTasks = {0}; numSplits={1}")
public static Collection<Object[]> data() {
/*
* Parameters for initializing the tests:
* {numTasks, numSplits}
* The test currently assumes powers of two for some assertions.
*/
return Arrays.asList(
new Object[][] {
{1, 1}, {1, 2}, {1, 4},
{2, 1}, {2, 2}, {2, 4},
{4, 1}, {4, 2}, {4, 4}
});
}
/**
* Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. If
* numSplits > numTasks the source has one source will manage multiple readers.
*/
@Test(timeout = 30_000)
public void testValueEmission() throws Exception {
final int numElementsPerShard = 20;
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setShutdownSourcesOnFinalWatermark(true);
final long[] numElementsReceived = {0L};
final int[] numWatermarksReceived = {0};
// this source will emit exactly NUM_ELEMENTS for each parallel reader,
// afterwards it will stall. We check whether we also receive NUM_ELEMENTS
// elements later.
TestCountingSource source =
new TestCountingSource(numElementsPerShard).withFixedNumSplits(numSplits);
for (int subtaskIndex = 0; subtaskIndex < numTasks; subtaskIndex++) {
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, source, numTasks);
// the source wrapper will only request as many splits as there are tasks and the source
// will create at most numSplits splits
assertEquals(numSplits, flinkWrapper.getSplitSources().size());
StreamSource<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
sourceOperator = new StreamSource<>(flinkWrapper);
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
testHarness =
new AbstractStreamOperatorTestHarness<>(
sourceOperator,
numTasks /* max parallelism */,
numTasks /* parallelism */,
subtaskIndex /* subtask index */);
// The testing timer service is synchronous, so we must configure a watermark interval
// > 0, otherwise we can get loop infinitely due to a timer always becoming ready after
// it has been set.
testHarness.getExecutionConfig().setAutoWatermarkInterval(10L);
testHarness.setProcessingTime(System.currentTimeMillis());
testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
// start a thread that advances processing time, so that we eventually get the final
// watermark which is only updated via a processing-time trigger
Thread processingTimeUpdateThread =
new Thread() {
@Override
public void run() {
while (true) {
try {
// Need to advance this so that the watermark timers in the source wrapper fire
// Synchronize is necessary because this can interfere with updating the
// PriorityQueue of the ProcessingTimeService which is accessed when setting
// timers in UnboundedSourceWrapper.
synchronized (testHarness.getCheckpointLock()) {
testHarness.setProcessingTime(System.currentTimeMillis());
}
Thread.sleep(100);
} catch (InterruptedException e) {
// this is ok
break;
} catch (Exception e) {
LOG.error("Unexpected error advancing processing time", e);
break;
}
}
}
};
processingTimeUpdateThread.start();
try {
testHarness.open();
sourceOperator.run(
testHarness.getCheckpointLock(),
new TestStreamStatusMaintainer(),
new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
private boolean hasSeenMaxWatermark = false;
@Override
public void emitWatermark(Watermark watermark) {
// we get this when there is no more data
// it can happen that we get the max watermark several times, so guard against
// this
if (!hasSeenMaxWatermark
&& watermark.getTimestamp()
>= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
numWatermarksReceived[0]++;
hasSeenMaxWatermark = true;
}
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
collect((StreamRecord) streamRecord);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}
@Override
public void collect(
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
windowedValueStreamRecord) {
numElementsReceived[0]++;
}
@Override
public void close() {}
});
} finally {
processingTimeUpdateThread.interrupt();
processingTimeUpdateThread.join();
}
}
// verify that we get the expected count across all subtasks
assertEquals(numElementsPerShard * numSplits, numElementsReceived[0]);
// and that we get as many final watermarks as there are subtasks
assertEquals(numTasks, numWatermarksReceived[0]);
}
/**
* Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. If
* numSplits > numTasks the source will manage multiple readers.
*
* <p>This test verifies that watermarks are correctly forwarded.
*/
@Test(timeout = 30_000)
public void testWatermarkEmission() throws Exception {
final int numElements = 500;
PipelineOptions options = PipelineOptionsFactory.create();
// this source will emit exactly NUM_ELEMENTS across all parallel readers,
// afterwards it will stall. We check whether we also receive NUM_ELEMENTS
// elements later.
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
assertEquals(numSplits, flinkWrapper.getSplitSources().size());
final StreamSource<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
sourceOperator = new StreamSource<>(flinkWrapper);
final AbstractStreamOperatorTestHarness<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
testHarness =
new AbstractStreamOperatorTestHarness<>(
sourceOperator,
numTasks /* max parallelism */,
numTasks /* parallelism */,
0 /* subtask index */);
testHarness.getExecutionConfig().setLatencyTrackingInterval(0);
testHarness.getExecutionConfig().setAutoWatermarkInterval(1);
testHarness.setProcessingTime(Long.MIN_VALUE);
testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
final ConcurrentLinkedQueue<Object> caughtExceptions = new ConcurrentLinkedQueue<>();
// We test emission of two watermarks here, one intermediate, one final
final CountDownLatch seenWatermarks = new CountDownLatch(2);
final int minElementsPerReader = numElements / numSplits;
final CountDownLatch minElementsCountdown = new CountDownLatch(minElementsPerReader);
// first halt the source to test auto watermark emission
source.haltEmission();
testHarness.open();
Thread sourceThread =
new Thread(
() -> {
try {
sourceOperator.run(
testHarness.getCheckpointLock(),
new TestStreamStatusMaintainer(),
new Output<
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@Override
public void emitWatermark(Watermark watermark) {
seenWatermarks.countDown();
}
@Override
public <X> void collect(
OutputTag<X> outputTag, StreamRecord<X> streamRecord) {}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}
@Override
public void collect(
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
windowedValueStreamRecord) {
minElementsCountdown.countDown();
}
@Override
public void close() {}
});
} catch (Exception e) {
LOG.info("Caught exception:", e);
caughtExceptions.add(e);
}
});
sourceThread.start();
while (flinkWrapper.getLocalReaders().stream()
.anyMatch(reader -> reader.getWatermark().getMillis() == 0)) {
// readers haven't been initialized
Thread.sleep(50);
}
// Need to advance this so that the watermark timers in the source wrapper fire
// Synchronize is necessary because this can interfere with updating the PriorityQueue
// of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
synchronized (testHarness.getCheckpointLock()) {
testHarness.setProcessingTime(0);
}
// now read the elements
source.continueEmission();
// ..and await elements
minElementsCountdown.await();
// Need to advance this so that the watermark timers in the source wrapper fire
// Synchronize is necessary because this can interfere with updating the PriorityQueue
// of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
synchronized (testHarness.getCheckpointLock()) {
testHarness.setProcessingTime(Long.MAX_VALUE);
}
seenWatermarks.await();
if (!caughtExceptions.isEmpty()) {
fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
}
sourceOperator.cancel();
sourceThread.join();
}
/**
* Verify that snapshot/restore work as expected. We bring up a source and cancel after seeing a
* certain number of elements. Then we snapshot that source, bring up a completely new source
* that we restore from the snapshot and verify that we see all expected elements in the end.
*/
@Test
public void testRestore() throws Exception {
final int numElements = 20;
final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();
// this source will emit exactly NUM_ELEMENTS across all parallel readers,
// afterwards it will stall. We check whether we also receive NUM_ELEMENTS
// elements later.
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
assertEquals(numSplits, flinkWrapper.getSplitSources().size());
StreamSource<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
sourceOperator = new StreamSource<>(flinkWrapper);
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
testHarness =
new AbstractStreamOperatorTestHarness<>(
sourceOperator,
numTasks /* max parallelism */,
numTasks /* parallelism */,
0 /* subtask index */);
testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
boolean readFirstBatchOfElements = false;
try {
testHarness.open();
sourceOperator.run(
checkpointLock,
new TestStreamStatusMaintainer(),
new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
private int count = 0;
@Override
public void emitWatermark(Watermark watermark) {}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
collect((StreamRecord) streamRecord);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}
@Override
public void collect(
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
windowedValueStreamRecord) {
emittedElements.add(windowedValueStreamRecord.getValue().getValue().getValue());
count++;
if (count >= numElements / 2) {
throw new SuccessException();
}
}
@Override
public void close() {}
});
} catch (SuccessException e) {
// success
readFirstBatchOfElements = true;
}
assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
// draw a snapshot
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
// test that finalizeCheckpoint on CheckpointMark is called
final ArrayList<Integer> finalizeList = new ArrayList<>();
TestCountingSource.setFinalizeTracker(finalizeList);
testHarness.notifyOfCompletedCheckpoint(0);
assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size());
// create a completely new source but restore from the snapshot
TestCountingSource restoredSource = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>
restoredFlinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, restoredSource, numSplits);
assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
StreamSource<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
restoredSourceOperator = new StreamSource<>(restoredFlinkWrapper);
// set parallelism to 1 to ensure that our testing operator gets all checkpointed state
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
restoredTestHarness =
new AbstractStreamOperatorTestHarness<>(
restoredSourceOperator,
numTasks /* max parallelism */,
1 /* parallelism */,
0 /* subtask index */);
restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
// restore snapshot
restoredTestHarness.initializeState(snapshot);
boolean readSecondBatchOfElements = false;
// run again and verify that we see the other elements
try {
restoredTestHarness.open();
restoredSourceOperator.run(
checkpointLock,
new TestStreamStatusMaintainer(),
new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
private int count = 0;
@Override
public void emitWatermark(Watermark watermark) {}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
collect((StreamRecord) streamRecord);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}
@Override
public void collect(
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
windowedValueStreamRecord) {
emittedElements.add(windowedValueStreamRecord.getValue().getValue().getValue());
count++;
if (count >= numElements / 2) {
throw new SuccessException();
}
}
@Override
public void close() {}
});
} catch (SuccessException e) {
// success
readSecondBatchOfElements = true;
}
assertEquals(
Math.max(1, numSplits / numTasks), restoredFlinkWrapper.getLocalSplitSources().size());
assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
// verify that we saw all NUM_ELEMENTS elements
assertTrue(emittedElements.size() == numElements);
}
@Test
public void testNullCheckpoint() throws Exception {
final int numElements = 20;
PipelineOptions options = PipelineOptionsFactory.create();
TestCountingSource source =
new TestCountingSource(numElements) {
@Override
public Coder<CounterMark> getCheckpointMarkCoder() {
return null;
}
};
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
StreamSource<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
sourceOperator = new StreamSource<>(flinkWrapper);
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
testHarness =
new AbstractStreamOperatorTestHarness<>(
sourceOperator,
numTasks /* max parallelism */,
numTasks /* parallelism */,
0 /* subtask index */);
testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
testHarness.open();
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>
restoredFlinkWrapper =
new UnboundedSourceWrapper<>(
"stepName", options, new TestCountingSource(numElements), numSplits);
StreamSource<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
restoredSourceOperator = new StreamSource<>(restoredFlinkWrapper);
// set parallelism to 1 to ensure that our testing operator gets all checkpointed state
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
restoredTestHarness =
new AbstractStreamOperatorTestHarness<>(
restoredSourceOperator,
numTasks /* max parallelism */,
1 /* parallelism */,
0 /* subtask index */);
restoredTestHarness.setup();
restoredTestHarness.initializeState(snapshot);
restoredTestHarness.open();
// when the source checkpointed a null we don't re-initialize the splits, that is we
// will have no splits.
assertEquals(0, restoredFlinkWrapper.getLocalSplitSources().size());
}
/** A special {@link RuntimeException} that we throw to signal that the test was successful. */
private static class SuccessException extends RuntimeException {}
}
/** Not parameterized tests. */
@RunWith(JUnit4.class)
public static class BasicTest {
/** Check serialization a {@link UnboundedSourceWrapper}. */
@Test
public void testSerialization() throws Exception {
final int parallelism = 1;
final int numElements = 20;
PipelineOptions options = PipelineOptionsFactory.create();
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, source, parallelism);
InstantiationUtil.serializeObject(flinkWrapper);
}
@Test(timeout = 10_000)
public void testSourceWithNoReaderDoesNotShutdown() throws Exception {
testSourceDoesNotShutdown(false);
}
@Test(timeout = 10_000)
public void testSourceWithReadersDoesNotShutdown() throws Exception {
testSourceDoesNotShutdown(true);
}
private static void testSourceDoesNotShutdown(boolean shouldHaveReaders) throws Exception {
final int parallelism = 2;
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
TestCountingSource source = new TestCountingSource(20).withoutSplitting();
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> sourceWrapper =
new UnboundedSourceWrapper<>("noReader", options, source, parallelism);
StreamingRuntimeContext mock = Mockito.mock(StreamingRuntimeContext.class);
if (shouldHaveReaders) {
// Since the source can't be split, the first subtask index will read everything
Mockito.when(mock.getIndexOfThisSubtask()).thenReturn(0);
} else {
// Set up the RuntimeContext such that this instance won't receive any readers
Mockito.when(mock.getIndexOfThisSubtask()).thenReturn(parallelism - 1);
}
Mockito.when(mock.getNumberOfParallelSubtasks()).thenReturn(parallelism);
Mockito.when(mock.getExecutionConfig()).thenReturn(new ExecutionConfig());
ProcessingTimeService timerService = Mockito.mock(ProcessingTimeService.class);
Mockito.when(timerService.getCurrentProcessingTime()).thenReturn(Long.MAX_VALUE);
Mockito.when(mock.getProcessingTimeService()).thenReturn(timerService);
sourceWrapper.setRuntimeContext(mock);
sourceWrapper.open(new Configuration());
SourceFunction.SourceContext sourceContext = Mockito.mock(SourceFunction.SourceContext.class);
Object checkpointLock = new Object();
Mockito.when(sourceContext.getCheckpointLock()).thenReturn(checkpointLock);
// Initialize source context early to avoid concurrency issues with its initialization in the
// run
// method and the onProcessingTime call on the wrapper.
sourceWrapper.setSourceContext(sourceContext);
sourceWrapper.open(new Configuration());
assertThat(sourceWrapper.getLocalReaders().isEmpty(), is(!shouldHaveReaders));
Thread thread =
new Thread(
() -> {
try {
sourceWrapper.run(sourceContext);
} catch (Exception e) {
LOG.error("Error while running UnboundedSourceWrapper", e);
}
});
try {
thread.start();
// Wait to see if the wrapper shuts down immediately in case it doesn't have readers
if (!shouldHaveReaders) {
// The expected state is for finalizeSource to sleep instead of exiting
while (true) {
StackTraceElement[] callStack = thread.getStackTrace();
if (callStack.length >= 2
&& "sleep".equals(callStack[0].getMethodName())
&& "finalizeSource".equals(callStack[1].getMethodName())) {
break;
}
Thread.sleep(10);
}
}
// Source should still be running even if there are no readers
assertThat(sourceWrapper.isRunning(), is(true));
synchronized (checkpointLock) {
// Trigger emission of the watermark by updating processing time.
// The actual processing time value does not matter.
sourceWrapper.onProcessingTime(42);
}
// Source should still be running even when watermark is at max
assertThat(sourceWrapper.isRunning(), is(true));
assertThat(thread.isAlive(), is(true));
sourceWrapper.cancel();
} finally {
thread.interrupt();
// try to join but also don't mask exceptions with test timeout
thread.join(1000);
}
assertThat(thread.isAlive(), is(false));
}
@Test
public void testSequentialReadingFromBoundedSource() throws Exception {
UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<Long> source =
new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
CountingSource.upTo(1000));
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setShutdownSourcesOnFinalWatermark(true);
UnboundedSourceWrapper<
Long, UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint<Long>>
sourceWrapper = new UnboundedSourceWrapper<>("sequentialRead", options, source, 4);
StreamingRuntimeContext runtimeContextMock = Mockito.mock(StreamingRuntimeContext.class);
Mockito.when(runtimeContextMock.getIndexOfThisSubtask()).thenReturn(0);
when(runtimeContextMock.getNumberOfParallelSubtasks()).thenReturn(2);
when(runtimeContextMock.getExecutionConfig()).thenReturn(new ExecutionConfig());
TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
processingTimeService.setCurrentTime(0);
when(runtimeContextMock.getProcessingTimeService()).thenReturn(processingTimeService);
when(runtimeContextMock.getMetricGroup()).thenReturn(new UnregisteredMetricsGroup());
sourceWrapper.setRuntimeContext(runtimeContextMock);
sourceWrapper.open(new Configuration());
assertThat(sourceWrapper.getLocalReaders().size(), is(2));
List<Long> integers = new ArrayList<>();
sourceWrapper.run(
new SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<Long>>>() {
private final Object checkpointLock = new Object();
@Override
public void collect(WindowedValue<ValueWithRecordId<Long>> element) {
integers.add(element.getValue().getValue());
}
@Override
public void collectWithTimestamp(
WindowedValue<ValueWithRecordId<Long>> element, long timestamp) {
throw new IllegalStateException("Should not collect with timestamp");
}
@Override
public void emitWatermark(Watermark mark) {}
@Override
public void markAsTemporarilyIdle() {}
@Override
public Object getCheckpointLock() {
return checkpointLock;
}
@Override
public void close() {}
});
// The source is effectively split into two parts: The initial splitting is performed with a
// parallelism of 4, but there are 2 parallel subtasks. This instances taskes 2 out of 4
// partitions.
assertThat(integers.size(), is(500));
assertThat(
integers,
contains(
LongStream.concat(LongStream.range(0, 250), LongStream.range(500, 750))
.boxed()
.toArray()));
}
}
private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer {
StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
@Override
public void toggleStreamStatus(StreamStatus streamStatus) {
if (!currentStreamStatus.equals(streamStatus)) {
currentStreamStatus = streamStatus;
}
}
@Override
public StreamStatus getStreamStatus() {
return currentStreamStatus;
}
}
}