blob: 49bd045de084f1ec848dd4d677bce12ac0d82171 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.iteration.operator;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.config.IterationOptions;
import org.apache.flink.iteration.typeinfo.IterationRecordSerializer;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
/** Test the behavior of {@link ReplayOperator}. */
public class ReplayOperatorTest extends TestLogger {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@Test(timeout = 60000)
public void testReplaying() throws Exception {
final int numRecords = 10;
OperatorID operatorId = new OperatorID();
createHarnessAndRun(
operatorId,
null,
harness -> {
// First round
for (int i = 0; i < numRecords; ++i) {
harness.processElement(
new StreamRecord<>(IterationRecord.newRecord(i, 0)), 0, 0);
}
harness.endInput(0, true);
harness.processElement(
new StreamRecord<>(IterationRecord.newEpochWatermark(0, "sender1")),
1,
0);
assertOutputAllRecordsAndEpochWatermark(
harness.getOutput(), numRecords, operatorId, 0);
harness.getOutput().clear();
// The round 1
harness.processElement(
new StreamRecord<>(IterationRecord.newEpochWatermark(1, "sender1")),
1,
0);
// The output would be done asynchronously inside the ReplayerOperator.
while (harness.getOutput().size() < numRecords + 1) {
Thread.sleep(500);
}
assertOutputAllRecordsAndEpochWatermark(
harness.getOutput(), numRecords, operatorId, 1);
harness.getOutput().clear();
// The round 2
harness.processElement(
new StreamRecord<>(IterationRecord.newEpochWatermark(2, "sender1")),
1,
0);
// The output would be done asynchronously inside the ReplayerOperator.
while (harness.getOutput().size() < numRecords + 1) {
Thread.sleep(500);
}
assertOutputAllRecordsAndEpochWatermark(
harness.getOutput(), numRecords, operatorId, 2);
return null;
});
}
@Test
public void testSnapshotAndRestoreOnFirstEpoch() throws Exception {
final int numRecords = 10;
OperatorID operatorId = new OperatorID();
List<Object> firstRoundOutput = new ArrayList<>();
List<Object> secondRoundOutput = new ArrayList<>();
TaskStateSnapshot snapshot =
createHarnessAndRun(
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
for (int i = 0; i < numRecords / 2; ++i) {
harness.processElement(
new StreamRecord<>(IterationRecord.newRecord(i, 0)), 0, 0);
}
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference
.getDefault()));
harness.processAll();
firstRoundOutput.addAll(harness.getOutput());
harness.getTaskStateManager().getWaitForReportLatch().await();
return harness.getTaskStateManager()
.getLastJobManagerTaskStateSnapshot();
});
createHarnessAndRun(
operatorId,
snapshot,
harness -> {
for (int i = numRecords / 2; i < numRecords; ++i) {
harness.processElement(
new StreamRecord<>(IterationRecord.newRecord(i, 0)), 0, 0);
}
harness.endInput(0, true);
harness.processElement(
new StreamRecord<>(IterationRecord.newEpochWatermark(0, "send-0")),
1,
0);
harness.processAll();
firstRoundOutput.addAll(harness.getOutput());
// The second round
harness.getOutput().clear();
harness.processElement(
new StreamRecord<>(IterationRecord.newEpochWatermark(1, "send-0")),
1,
0);
secondRoundOutput.addAll(harness.getOutput());
return null;
});
assertOutputAllRecordsAndEpochWatermark(firstRoundOutput, numRecords, operatorId, 0);
assertOutputAllRecordsAndEpochWatermark(secondRoundOutput, numRecords, operatorId, 1);
}
@Test
public void testSnapshotAndRestoreOnSecondEpoch() throws Exception {
final int numRecords = 10;
OperatorID operatorId = new OperatorID();
List<Object> firstRoundOutput = new ArrayList<>();
List<Object> secondRoundOutput = new ArrayList<>();
HookableOutput hookableOutput = new HookableOutput(numRecords + numRecords / 2);
TaskStateSnapshot snapshot =
createHarnessAndRun(
operatorId,
null,
harness -> {
harness.getTaskStateManager().getWaitForReportLatch().reset();
for (int i = 0; i < numRecords; ++i) {
harness.processElement(
new StreamRecord<>(IterationRecord.newRecord(i, 0)), 0, 0);
}
harness.endInput(0, true);
harness.processElement(
new StreamRecord<>(
IterationRecord.newEpochWatermark(0, "send-0")),
1,
0);
harness.processAll();
firstRoundOutput.addAll(harness.getOutput());
harness.getOutput().clear();
harness.getTaskStateManager().getWaitForReportLatch().reset();
// We have to simulate another thread insert checkpoint barrier
hookableOutput.setRunnable(
() ->
harness.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(2, 1000),
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference
.getDefault())));
// Slightly postpone the epoch watermark.
harness.processElement(
new StreamRecord<>(
IterationRecord.newEpochWatermark(1, "send-0")),
1,
0);
harness.processAll();
secondRoundOutput.addAll(harness.getOutput());
harness.getTaskStateManager().getWaitForReportLatch().await();
return harness.getTaskStateManager()
.getLastJobManagerTaskStateSnapshot();
},
hookableOutput);
createHarnessAndRun(
operatorId,
snapshot,
harness -> {
// In this case, we expected the input would finish immediately.
harness.endInput(0, true);
secondRoundOutput.addAll(harness.getOutput());
return null;
});
assertOutputAllRecordsAndEpochWatermark(firstRoundOutput, numRecords, operatorId, 0);
assertOutputAllRecordsAndEpochWatermark(secondRoundOutput, numRecords, operatorId, 1);
}
private <T> T createHarnessAndRun(
OperatorID operatorId,
@Nullable TaskStateSnapshot snapshot,
FunctionWithException<
StreamTaskMailboxTestHarness<IterationRecord<Integer>>, T, Exception>
runnable,
ResultPartitionWriter... additionalOutput)
throws Exception {
try (StreamTaskMailboxTestHarness<IterationRecord<Integer>> harness =
new StreamTaskMailboxTestHarnessBuilder<>(
TwoInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO), 1)
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO), 1)
.addAdditionalOutput(additionalOutput)
.setTaskStateSnapshot(
1, snapshot == null ? new TaskStateSnapshot() : snapshot)
.setupOutputForSingletonOperatorChain(new ReplayOperator<>(), operatorId)
.buildUnrestored()) {
harness.getStreamTask()
.getEnvironment()
.getTaskManagerInfo()
.getConfiguration()
.set(
IterationOptions.DATA_CACHE_PATH,
tempFolder.newFolder().toPath().toAbsolutePath().toUri().toString());
harness.getStreamTask().restore();
return runnable.apply(harness);
}
}
private void assertOutputAllRecordsAndEpochWatermark(
Collection<Object> output, int numRecords, OperatorID operatorId, int round) {
assertEquals(
Stream.concat(
IntStream.range(0, numRecords)
.boxed()
.map(
i ->
new StreamRecord<>(
IterationRecord.newRecord(
i, round))),
Stream.of(
new StreamRecord<>(
IterationRecord.newEpochWatermark(
round,
OperatorUtils.getUniqueSenderId(
operatorId, 0)))))
.collect(Collectors.toList()),
output.stream()
.filter(e -> e.getClass() != CheckpointBarrier.class)
.collect(Collectors.toList()));
}
private static class HookableOutput
extends RecordOrEventCollectingResultPartitionWriter<StreamElement> {
private int remainingRecordsToWait;
@Nullable private Runnable runnable;
public HookableOutput(int triggerCount) {
super(
new ArrayDeque<>(),
new StreamElementSerializer<>(
new IterationRecordSerializer<>(IntSerializer.INSTANCE)));
this.remainingRecordsToWait = triggerCount;
}
public void setRunnable(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
super.emitRecord(record, targetSubpartition);
tryTrigger();
}
@Override
public void broadcastRecord(ByteBuffer record) throws IOException {
super.broadcastRecord(record);
tryTrigger();
}
private void tryTrigger() {
if (remainingRecordsToWait > 0) {
remainingRecordsToWait--;
if (remainingRecordsToWait == 0 && runnable != null) {
runnable.run();
}
}
}
}
}