blob: f1a320c3b89ba8261e1fd146e98b071ec5a8bb41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.iteration.operator;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.iteration.IterationID;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.operator.event.CoordinatorCheckpointEvent;
import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
import org.apache.flink.iteration.operator.event.SubtaskAlignedEvent;
import org.apache.flink.iteration.operator.event.TerminatingOnInitializeEvent;
import org.apache.flink.iteration.operator.feedback.SpillableFeedbackChannel;
import org.apache.flink.iteration.operator.feedback.SpillableFeedbackChannelBroker;
import org.apache.flink.iteration.operator.headprocessor.RegularHeadOperatorRecordProcessor;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/** Tests the {@link HeadOperator}. */
public class HeadOperatorTest extends TestLogger {
@Test
public void testForwardRecords() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(1, 0), 2));
putFeedbackRecords(iterationId, IterationRecord.newRecord(3, 1), 3L);
harness.processAll();
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(2, 0), 3));
putFeedbackRecords(iterationId, IterationRecord.newRecord(4, 1), 4L);
harness.processAll();
List<StreamRecord<IterationRecord<Integer>>> expectedOutput =
Arrays.asList(
new StreamRecord<>(IterationRecord.newRecord(1, 0), 2),
new StreamRecord<>(IterationRecord.newRecord(3, 1), 3),
new StreamRecord<>(IterationRecord.newRecord(2, 0), 3),
new StreamRecord<>(IterationRecord.newRecord(4, 1), 4));
assertEquals(expectedOutput, new ArrayList<>(harness.getOutput()));
RegularHeadOperatorRecordProcessor recordProcessor =
(RegularHeadOperatorRecordProcessor)
RecordingHeadOperatorFactory.latestHeadOperator
.getRecordProcessor();
assertEquals(2, (long) recordProcessor.getNumFeedbackRecordsPerEpoch().get(1));
return null;
});
}
@Test(timeout = 60000)
public void testSynchronizingEpochWatermark() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(1, 0), 2));
// We will start a new thread to simulate the operator coordinator thread
CompletableFuture<Void> taskExecuteResult =
CompletableFuture.supplyAsync(
() -> {
try {
RecordingOperatorEventGateway eventGateway =
(RecordingOperatorEventGateway)
RecordingHeadOperatorFactory
.latestHeadOperator
.getOperatorEventGateway();
// We should get the aligned event for round 0 on
// endInput
assertNextOperatorEvent(
new SubtaskAlignedEvent(0, 0, false),
eventGateway);
dispatchOperatorEvent(
harness,
operatorId,
new GloballyAlignedEvent(0, false));
putFeedbackRecords(
iterationId,
IterationRecord.newRecord(4, 1),
4L);
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(1, "tail"),
0L);
assertNextOperatorEvent(
new SubtaskAlignedEvent(1, 1, false),
eventGateway);
dispatchOperatorEvent(
harness,
operatorId,
new GloballyAlignedEvent(1, true));
while (RecordingHeadOperatorFactory.latestHeadOperator
.getStatus()
== HeadOperator.HeadOperatorStatus.RUNNING) {
Thread.sleep(500);
}
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(
Integer.MAX_VALUE, "tail"),
null);
return null;
} catch (Throwable e) {
RecordingHeadOperatorFactory.latestHeadOperator
.getMailboxExecutor()
.execute(
() -> {
throw e;
},
"poison mail");
throw new CompletionException(e);
}
});
// Mark the input as finished.
harness.processEvent(new EndOfData(StopMode.DRAIN));
// There should be no exception
taskExecuteResult.get();
assertEquals(
Arrays.asList(
new StreamRecord<>(IterationRecord.newRecord(1, 0), 2),
new StreamRecord<>(
IterationRecord.newEpochWatermark(
0,
OperatorUtils.getUniqueSenderId(operatorId, 0)),
0),
new StreamRecord<>(IterationRecord.newRecord(4, 1), 4),
new StreamRecord<>(
IterationRecord.newEpochWatermark(
IterationRecord.END_EPOCH_WATERMARK,
OperatorUtils.getUniqueSenderId(operatorId, 0)),
0)),
new ArrayList<>(harness.getOutput()));
return null;
});
}
@Test(timeout = 60000)
public void testHoldCheckpointTillCoordinatorNotified() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
CompletableFuture<Void> coordinatorResult =
CompletableFuture.supplyAsync(
() -> {
try {
// Slight postpone the notification
Thread.sleep(2000);
harness.getStreamTask()
.dispatchOperatorEvent(
operatorId,
new SerializedValue<>(
new GloballyAlignedEvent(
5, false)));
harness.getStreamTask()
.dispatchOperatorEvent(
operatorId,
new SerializedValue<>(
new CoordinatorCheckpointEvent(
5)));
return null;
} catch (Throwable e) {
RecordingHeadOperatorFactory.latestHeadOperator
.getMailboxExecutor()
.execute(
() -> {
throw e;
},
"poison mail");
throw new CompletionException(e);
}
});
CheckpointBarrier barrier =
new CheckpointBarrier(
5,
5000,
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault()));
harness.processEvent(barrier);
// There should be no exception
coordinatorResult.get();
// If the task do not hold, it would be likely snapshot state before received
// the globally aligned event.
assertEquals(
Arrays.asList(
new StreamRecord<>(
IterationRecord.newEpochWatermark(
5,
OperatorUtils.getUniqueSenderId(operatorId, 0)),
0),
barrier),
new ArrayList<>(harness.getOutput()));
return null;
});
}
@Test(timeout = 60000)
public void testPostponeGloballyAlignedEventsAfterSnapshot() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.getStreamTask()
.dispatchOperatorEvent(
operatorId,
new SerializedValue<>(new CoordinatorCheckpointEvent(5)));
harness.getStreamTask()
.dispatchOperatorEvent(
operatorId,
new SerializedValue<>(new GloballyAlignedEvent(5, false)));
CheckpointBarrier barrier =
new CheckpointBarrier(
5,
5000,
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault()));
harness.processEvent(barrier);
harness.processAll();
assertEquals(
Arrays.asList(
barrier,
new StreamRecord<>(
IterationRecord.newEpochWatermark(
5,
OperatorUtils.getUniqueSenderId(operatorId, 0)),
0)),
new ArrayList<>(harness.getOutput()));
return null;
});
}
@Test
public void testSnapshotAndRestoreBeforeRoundZeroFinish() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
TaskStateSnapshot taskStateSnapshot =
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
harness.processElement(
new StreamRecord<>(IterationRecord.newRecord(100, 0)));
dispatchOperatorEvent(
harness, operatorId, new CoordinatorCheckpointEvent(2));
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference
.getDefault()));
putFeedbackRecords(iterationId, IterationRecord.newBarrier(2), null);
harness.processAll();
harness.getTaskStateManager().getWaitForReportLatch().await();
return harness.getTaskStateManager()
.getLastJobManagerTaskStateSnapshot();
});
assertNotNull(taskStateSnapshot);
createHarnessAndRun(
iterationId,
operatorId,
taskStateSnapshot,
harness -> {
checkRestoredOperatorState(
harness,
HeadOperator.HeadOperatorStatus.RUNNING,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap(),
-1,
-1);
harness.processAll();
return null;
});
}
@Test
public void testSnapshotAndRestoreAfterRoundZeroFinishAndRoundOneNotAligned() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
TaskStateSnapshot taskStateSnapshot =
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
harness.processElement(
new StreamRecord<>(IterationRecord.newRecord(100, 0)));
// Simulates endOfInputs, but not block the main thread.
harness.processElement(
new StreamRecord<>(
IterationRecord.newEpochWatermark(0, "fake")));
harness.processAll();
dispatchOperatorEvent(
harness, operatorId, new CoordinatorCheckpointEvent(2));
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference
.getDefault()));
putFeedbackRecords(iterationId, IterationRecord.newBarrier(2), null);
harness.processAll();
harness.getTaskStateManager().getWaitForReportLatch().await();
return harness.getTaskStateManager()
.getLastJobManagerTaskStateSnapshot();
});
assertNotNull(taskStateSnapshot);
createHarnessAndRun(
iterationId,
operatorId,
taskStateSnapshot,
harness -> {
checkRestoredOperatorState(
harness,
HeadOperator.HeadOperatorStatus.RUNNING,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap(),
0,
-1);
// Simulates endOfInputs, but not block the main thread.
harness.processElement(
new StreamRecord<>(IterationRecord.newEpochWatermark(0, "fake")));
assertEquals(
Collections.singletonList(new SubtaskAlignedEvent(0, 0, false)),
new ArrayList<>(
((RecordingOperatorEventGateway)
RecordingHeadOperatorFactory.latestHeadOperator
.getOperatorEventGateway())
.operatorEvents));
return null;
});
}
@Test
public void testSnapshotAndRestoreAfterRoundZeroFinishAndRoundOneAligned() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
TaskStateSnapshot taskStateSnapshot =
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
harness.processElement(
new StreamRecord<>(IterationRecord.newRecord(100, 0)));
// Simulates endOfInputs, but not block the main thread.
harness.processElement(
new StreamRecord<>(
IterationRecord.newEpochWatermark(0, "fake")));
dispatchOperatorEvent(
harness, operatorId, new GloballyAlignedEvent(0, false));
putFeedbackRecords(
iterationId, IterationRecord.newRecord(100, 1), null);
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(1, "tail"),
null);
harness.processAll();
dispatchOperatorEvent(
harness, operatorId, new CoordinatorCheckpointEvent(2));
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference
.getDefault()));
putFeedbackRecords(iterationId, IterationRecord.newBarrier(2), null);
harness.processAll();
harness.getTaskStateManager().getWaitForReportLatch().await();
return harness.getTaskStateManager()
.getLastJobManagerTaskStateSnapshot();
});
assertNotNull(taskStateSnapshot);
createHarnessAndRun(
iterationId,
operatorId,
taskStateSnapshot,
harness -> {
checkRestoredOperatorState(
harness,
HeadOperator.HeadOperatorStatus.RUNNING,
Collections.emptyList(),
Collections.singletonList(new SubtaskAlignedEvent(1, 1, false)),
Collections.singletonMap(1, 1L),
1,
0);
harness.processAll();
return null;
});
}
@Test
public void testSnapshotAndRestoreWithFeedbackRecords() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
TaskStateSnapshot taskStateSnapshot =
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(4, "tail"),
null);
dispatchOperatorEvent(
harness, operatorId, new GloballyAlignedEvent(4, false));
harness.processAll();
putFeedbackRecords(
iterationId, IterationRecord.newRecord(100, 5), null);
dispatchOperatorEvent(
harness, operatorId, new CoordinatorCheckpointEvent(2));
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference
.getDefault()));
harness.processAll();
putFeedbackRecords(
iterationId, IterationRecord.newRecord(101, 5), null);
putFeedbackRecords(
iterationId, IterationRecord.newRecord(102, 5), null);
putFeedbackRecords(
iterationId, IterationRecord.newRecord(103, 6), null);
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(5, "tail"),
null);
putFeedbackRecords(iterationId, IterationRecord.newBarrier(2), null);
harness.processAll();
harness.getTaskStateManager().getWaitForReportLatch().await();
return harness.getTaskStateManager()
.getLastJobManagerTaskStateSnapshot();
});
assertNotNull(taskStateSnapshot);
createHarnessAndRun(
iterationId,
operatorId,
taskStateSnapshot,
harness -> {
checkRestoredOperatorState(
harness,
HeadOperator.HeadOperatorStatus.RUNNING,
Arrays.asList(
new StreamRecord<>(IterationRecord.newRecord(101, 5)),
new StreamRecord<>(IterationRecord.newRecord(102, 5)),
new StreamRecord<>(IterationRecord.newRecord(103, 6))),
/* The one before checkpoint and the two after checkpoint */
Collections.singletonList(new SubtaskAlignedEvent(5, 3, false)),
new HashMap<Integer, Long>() {
{
this.put(5, 3L);
this.put(6, 1L);
}
},
5,
4);
harness.processAll();
return null;
});
}
@Test
public void testCheckpointBeforeTerminated() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
TaskStateSnapshot taskStateSnapshot =
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(4, "tail"),
null);
dispatchOperatorEvent(
harness, operatorId, new GloballyAlignedEvent(4, false));
harness.processAll();
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(5, "tail"),
null);
dispatchOperatorEvent(
harness, operatorId, new CoordinatorCheckpointEvent(2));
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference
.getDefault()));
harness.processAll();
dispatchOperatorEvent(
harness, operatorId, new GloballyAlignedEvent(5, true));
harness.processAll();
putFeedbackRecords(iterationId, IterationRecord.newBarrier(2), null);
harness.processAll();
harness.getTaskStateManager().getWaitForReportLatch().await();
return harness.getTaskStateManager()
.getLastJobManagerTaskStateSnapshot();
});
assertNotNull(taskStateSnapshot);
createHarnessAndRun(
iterationId,
operatorId,
taskStateSnapshot,
harness -> {
checkRestoredOperatorState(
harness,
HeadOperator.HeadOperatorStatus.RUNNING,
Collections.emptyList(),
Collections.singletonList(new SubtaskAlignedEvent(5, 0, false)),
Collections.emptyMap(),
5,
4);
harness.processAll();
return null;
});
}
@Test
public void testCheckpointAfterTerminating() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
TaskStateSnapshot taskStateSnapshot =
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(5, "tail"),
null);
dispatchOperatorEvent(
harness, operatorId, new GloballyAlignedEvent(5, true));
harness.processAll();
dispatchOperatorEvent(
harness, operatorId, new CoordinatorCheckpointEvent(2));
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference
.getDefault()));
harness.processAll();
harness.getTaskStateManager().getWaitForReportLatch().await();
return harness.getTaskStateManager()
.getLastJobManagerTaskStateSnapshot();
});
assertNotNull(taskStateSnapshot);
createHarnessAndRun(
iterationId,
operatorId,
taskStateSnapshot,
harness -> {
checkRestoredOperatorState(
harness,
HeadOperator.HeadOperatorStatus.TERMINATING,
Collections.emptyList(),
Collections.singletonList(TerminatingOnInitializeEvent.INSTANCE),
Collections.emptyMap(),
-1,
-1);
putFeedbackRecords(
iterationId,
IterationRecord.newEpochWatermark(Integer.MAX_VALUE, "tail"),
null);
harness.processEvent(new EndOfData(StopMode.DRAIN));
harness.finishProcessing();
return null;
});
}
@Test(timeout = 20000)
public void testTailAbortPendingCheckpointIfHeadBlocked() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(100, 0)));
dispatchOperatorEvent(harness, operatorId, new CoordinatorCheckpointEvent(2));
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault()));
harness.processAll();
putFeedbackRecords(iterationId, IterationRecord.newRecord(100, 1), null);
harness.processAll();
// Simulates the tail operators help to abort the checkpoint
CompletableFuture<Void> supplier =
CompletableFuture.supplyAsync(
() -> {
try {
// Slightly postpone the execution till the head
// operator get blocked.
Thread.sleep(2000);
OneInputStreamOperatorTestHarness<
IterationRecord<?>, Void>
testHarness =
new OneInputStreamOperatorTestHarness<>(
new TailOperator(
iterationId, 0));
testHarness.open();
testHarness.getOperator().notifyCheckpointAborted(2);
} catch (Exception e) {
throw new CompletionException(e);
}
return null;
});
harness.getStreamTask().notifyCheckpointAbortAsync(2, 0);
harness.processAll();
supplier.get();
return null;
});
}
@Test(timeout = 20000)
public void testCheckpointsWithBarrierFeedbackFirst() throws Exception {
IterationID iterationId = new IterationID();
OperatorID operatorId = new OperatorID();
createHarnessAndRun(
iterationId,
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(100, 0)));
harness.processAll();
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault()));
// Simulates that the barrier get feed back before the
// CoordinatorCheckpointEvent is dispatched. If we not handle this case,
// there would be deadlock.
putFeedbackRecords(iterationId, IterationRecord.newBarrier(2), null);
dispatchOperatorEvent(harness, operatorId, new CoordinatorCheckpointEvent(2));
harness.processAll();
harness.getTaskStateManager().getWaitForReportLatch().await();
return null;
});
}
private <T> T createHarnessAndRun(
IterationID iterationId,
OperatorID operatorId,
@Nullable TaskStateSnapshot snapshot,
FunctionWithException<
StreamTaskMailboxTestHarness<IterationRecord<Integer>>, T, Exception>
runnable)
throws Exception {
try (StreamTaskMailboxTestHarness<IterationRecord<Integer>> harness =
new StreamTaskMailboxTestHarnessBuilder<>(
OneInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.modifyStreamConfig(
// Set the fraction to 0.5 to make sure there are enough pages
// for each feedback queue.
streamConfig ->
streamConfig.setManagedMemoryFractionOperatorOfUseCase(
ManagedMemoryUseCase.OPERATOR, 0.5))
.setTaskStateSnapshot(
1, snapshot == null ? new TaskStateSnapshot() : snapshot)
.setupOutputForSingletonOperatorChain(
new RecordingHeadOperatorFactory(
iterationId,
0,
false,
5,
RecordingOperatorEventGateway::new),
operatorId)
.build()) {
try {
return runnable.apply(harness);
} finally {
RecordingHeadOperatorFactory.latestHeadOperator.close();
RecordingHeadOperatorFactory.latestHeadOperator.getFeedbackChannel().close();
}
}
}
private static void dispatchOperatorEvent(
StreamTaskMailboxTestHarness<?> harness,
OperatorID operatorId,
OperatorEvent operatorEvent)
throws IOException, FlinkException {
harness.getStreamTask()
.dispatchOperatorEvent(operatorId, new SerializedValue<>(operatorEvent));
}
private static void assertNextOperatorEvent(
OperatorEvent expectedEvent, RecordingOperatorEventGateway eventGateway)
throws InterruptedException {
OperatorEvent nextOperatorEvent = eventGateway.operatorEvents.poll(10000, TimeUnit.SECONDS);
assertNotNull("The expected operator event not received", nextOperatorEvent);
assertEquals(expectedEvent, nextOperatorEvent);
}
private static void putFeedbackRecords(
IterationID iterationId, IterationRecord<?> record, @Nullable Long timestamp)
throws MemoryAllocationException {
SpillableFeedbackChannel<StreamRecord<IterationRecord<?>>> feedbackChannel =
SpillableFeedbackChannelBroker.get()
.getChannel(
OperatorUtils.<StreamRecord<IterationRecord<?>>>createFeedbackKey(
iterationId, 0)
.withSubTaskIndex(0, 0),
null);
feedbackChannel.put(
timestamp == null
? new StreamRecord<>(record)
: new StreamRecord<>(record, timestamp));
}
private static void checkRestoredOperatorState(
StreamTaskMailboxTestHarness<?> harness,
HeadOperator.HeadOperatorStatus expectedStatus,
List<Object> expectedOutput,
List<OperatorEvent> expectedOperatorEvents,
Map<Integer, Long> expectedNumFeedbackRecords,
int expectedLastAligned,
int expectedLastGloballyAligned) {
HeadOperator headOperator = RecordingHeadOperatorFactory.latestHeadOperator;
assertEquals(expectedStatus, headOperator.getStatus());
assertEquals(expectedOutput, new ArrayList<>(harness.getOutput()));
RecordingOperatorEventGateway eventGateway =
(RecordingOperatorEventGateway) headOperator.getOperatorEventGateway();
assertEquals(expectedOperatorEvents, new ArrayList<>(eventGateway.operatorEvents));
if (expectedStatus == HeadOperator.HeadOperatorStatus.RUNNING) {
RegularHeadOperatorRecordProcessor recordProcessor =
(RegularHeadOperatorRecordProcessor) headOperator.getRecordProcessor();
assertEquals(
expectedNumFeedbackRecords, recordProcessor.getNumFeedbackRecordsPerEpoch());
assertEquals(expectedLastAligned, recordProcessor.getLatestRoundAligned());
assertEquals(
expectedLastGloballyAligned, recordProcessor.getLatestRoundGloballyAligned());
}
}
private static class RecordingOperatorEventGateway implements OperatorEventGateway {
final BlockingQueue<OperatorEvent> operatorEvents = new LinkedBlockingQueue<>();
@Override
public void sendEventToCoordinator(OperatorEvent operatorEvent) {
operatorEvents.add(operatorEvent);
}
}
private interface OperatorEventGatewayFactory extends Serializable {
OperatorEventGateway create();
}
private static class RecordingHeadOperatorFactory extends HeadOperatorFactory {
private final OperatorEventGatewayFactory operatorEventGatewayFactory;
static HeadOperator latestHeadOperator;
public RecordingHeadOperatorFactory(
IterationID iterationId,
int feedbackIndex,
boolean isCriteriaStream,
int totalHeadParallelism,
OperatorEventGatewayFactory operatorEventGatewayFactory) {
super(iterationId, feedbackIndex, isCriteriaStream, totalHeadParallelism);
this.operatorEventGatewayFactory = operatorEventGatewayFactory;
}
@Override
public <T extends StreamOperator<IterationRecord<?>>> T createStreamOperator(
StreamOperatorParameters<IterationRecord<?>> streamOperatorParameters) {
latestHeadOperator = super.createStreamOperator(streamOperatorParameters);
return (T) latestHeadOperator;
}
@Override
OperatorEventGateway createOperatorEventGateway(
StreamOperatorParameters<IterationRecord<?>> streamOperatorParameters) {
return operatorEventGatewayFactory.create();
}
}
}