blob: 89e1fcc267a5389736ad1235601efe8fe836d21f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.executor.task;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.BoundedIteratorReadable;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.InMemorySourceVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.Stage;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.executor.MetricMessageSender;
import org.apache.nemo.runtime.executor.TaskStateManager;
import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
import org.apache.nemo.runtime.executor.datatransfer.OutputWriter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
/**
* Tests {@link TaskExecutor}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({InputReader.class, OutputWriter.class, IntermediateDataIOFactory.class, BroadcastManagerWorker.class,
TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class, Stage.class, IREdge.class})
public final class TaskExecutorTest {
private static final AtomicInteger RUNTIME_EDGE_ID = new AtomicInteger(0);
private static final int DATA_SIZE = 100;
private static final ExecutionPropertyMap<VertexExecutionProperty> TASK_EXECUTION_PROPERTY_MAP
= new ExecutionPropertyMap<>("TASK_EXECUTION_PROPERTY_MAP");
private static final int SOURCE_PARALLELISM = 5;
private static final int FIRST_ATTEMPT = 0;
private List<Integer> elements;
private Map<String, List> runtimeEdgeToOutputData;
private IntermediateDataIOFactory intermediateDataIOFactory;
private BroadcastManagerWorker broadcastManagerWorker;
private TaskStateManager taskStateManager;
private MetricMessageSender metricMessageSender;
private PersistentConnectionToMasterMap persistentConnectionToMasterMap;
private AtomicInteger stageId;
private String generateTaskId() {
return RuntimeIdManager.generateTaskId(
RuntimeIdManager.generateStageId(stageId.getAndIncrement()), 0, FIRST_ATTEMPT);
}
@Before
public void setUp() throws Exception {
elements = getRangedNumList(0, DATA_SIZE);
stageId = new AtomicInteger(1);
// Mock a TaskStateManager. It accumulates the state change into a list.
taskStateManager = mock(TaskStateManager.class);
// Mock a IntermediateDataIOFactory.
runtimeEdgeToOutputData = new HashMap<>();
intermediateDataIOFactory = mock(IntermediateDataIOFactory.class);
when(intermediateDataIOFactory.createReader(anyInt(), any(), any())).then(new ParentTaskReaderAnswer());
when(intermediateDataIOFactory.createWriter(any(), any())).then(new ChildTaskWriterAnswer());
// Mock a MetricMessageSender.
metricMessageSender = mock(MetricMessageSender.class);
doNothing().when(metricMessageSender).send(anyString(), anyString(), anyString(), any());
doNothing().when(metricMessageSender).close();
persistentConnectionToMasterMap = mock(PersistentConnectionToMasterMap.class);
broadcastManagerWorker = mock(BroadcastManagerWorker.class);
}
private boolean checkEqualElements(final List<Integer> left, final List<Integer> right) {
Collections.sort(left);
Collections.sort(right);
return left.equals(right);
}
/**
* Test source vertex data fetching.
*/
@Test()
public void testSourceVertexDataFetching() throws Exception {
final IRVertex sourceIRVertex = new InMemorySourceVertex<>(elements);
final Readable readable = new BoundedIteratorReadable() {
@Override
protected Iterator initializeIterator() {
return elements.iterator();
}
@Override
public long readWatermark() {
throw new UnsupportedOperationException();
}
@Override
public List<String> getLocations() {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
}
};
final Map<String, Readable> vertexIdToReadable = new HashMap<>();
vertexIdToReadable.put(sourceIRVertex.getId(), readable);
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(sourceIRVertex)
.buildWithoutSourceSinkCheck();
final StageEdge taskOutEdge = mockStageEdgeFrom(sourceIRVertex);
final Task task =
new Task(
"testSourceVertexDataFetching",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.emptyList(),
Collections.singletonList(taskOutEdge),
vertexIdToReadable);
// Execute the task.
final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
taskExecutor.execute();
// Check the output.
assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
}
/**
* Test invalid parameter failure.
*/
@Test()
public void testInvalidInputData() throws Exception {
try {
// Execute the task.
final TaskExecutor taskExecutor = getTaskExecutor(null, null);
taskExecutor.execute();
// This should not be reached.
fail();
} catch (NullPointerException e) {
assertEquals(true, true);
}
}
/**
* This test emits data and watermark by emulating an unbounded source readable.
*/
@Test()
public void testUnboundedSourceVertexDataFetching() throws Exception {
final IRVertex sourceIRVertex = new TestUnboundedSourceVertex();
final Long watermark = 1234567L;
final BlockingQueue<Long> watermarkQueue = new LinkedBlockingQueue<>();
watermarkQueue.add(watermark);
final Readable readable = new TestUnboundedSourceReadable(watermarkQueue, 1);
final Map<String, Readable> vertexIdToReadable = new HashMap<>();
vertexIdToReadable.put(sourceIRVertex.getId(), readable);
final List<Watermark> emittedWatermarks = new LinkedList<>();
final Transform transform = new StreamTransformNoWatermarkEmit(emittedWatermarks);
final OperatorVertex operatorVertex = new OperatorVertex(transform);
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(sourceIRVertex)
.addVertex(operatorVertex)
.connectVertices(createEdge(sourceIRVertex, operatorVertex, "edge1"))
.buildWithoutSourceSinkCheck();
final StageEdge taskOutEdge = mockStageEdgeFrom(operatorVertex);
final Task task =
new Task(
"testSourceVertexDataFetching",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.emptyList(),
Collections.singletonList(taskOutEdge),
vertexIdToReadable);
// Execute the task.
final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
taskExecutor.execute();
// Check whether the watermark is emitted
assertEquals(Arrays.asList(new Watermark(watermark)), emittedWatermarks);
// Check the output.
assertEquals(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId()));
}
/**
* Test parent task data fetching.
*/
@Test(timeout = 5000)
public void testParentTaskDataFetching() throws Exception {
final IRVertex vertex = new OperatorVertex(new StreamTransform());
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(vertex)
.buildWithoutSourceSinkCheck();
final StageEdge taskOutEdge = mockStageEdgeFrom(vertex);
final Task task = new Task(
"testSourceVertexDataFetching",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.singletonList(mockStageEdgeTo(vertex)),
Collections.singletonList(taskOutEdge),
Collections.emptyMap());
// Execute the task.
final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
taskExecutor.execute();
// Check the output.
assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
}
private void waitUntilWatermarkEmitted(final Queue<Long> watermarkQueue) {
while (!watermarkQueue.isEmpty()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* The DAG of the task to test will looks like:
* source1 -> vertex1 -> vertex2
* source2 -> vertex3 ->
* <p>
* The vertex2 has two incoming edges (from vertex1 and vertex3)
* and we test if TaskExecutor handles data and watermarks correctly in this situation.
* <p>
* source1 emits watermarks: 500 (ts) 600 (ts) 1400 (ts) 1800 (ts) 2500 (ts)
* source2 emits watermarks: 1000(ts) 2200 (ts)
* <p>
* The vertex2 should receive and emits watermarks 500, 600, 1000, 1800, and 2200
*/
@Test()
public void testMultipleIncomingEdges() throws Exception {
final List<Watermark> emittedWatermarks = new ArrayList<>();
final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransformNoWatermarkEmit(emittedWatermarks));
final IRVertex operatorIRVertex3 = new OperatorVertex(new StreamTransform());
final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex();
final IRVertex sourceIRVertex2 = new TestUnboundedSourceVertex();
final Queue<Long> watermarks1 = new ConcurrentLinkedQueue<>();
watermarks1.add(500L);
final Queue<Long> watermarks2 = new ConcurrentLinkedQueue<>();
watermarks2.add(1000L);
final Readable readable1 = new TestUnboundedSourceReadable(watermarks1, 5);
final Readable readable2 = new TestUnboundedSourceReadable(watermarks2, 2);
final Map<String, Readable> vertexIdToReadable = new HashMap<>();
vertexIdToReadable.put(sourceIRVertex1.getId(), readable1);
vertexIdToReadable.put(sourceIRVertex2.getId(), readable2);
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(sourceIRVertex1)
.addVertex(sourceIRVertex2)
.addVertex(operatorIRVertex1)
.addVertex(operatorIRVertex2)
.addVertex(operatorIRVertex3)
.connectVertices(createEdge(sourceIRVertex1, operatorIRVertex1, "edge1"))
.connectVertices(createEdge(operatorIRVertex1, operatorIRVertex2, "edge2"))
.connectVertices(createEdge(sourceIRVertex2, operatorIRVertex3, "edge3"))
.connectVertices(createEdge(operatorIRVertex3, operatorIRVertex2, "edge4"))
.buildWithoutSourceSinkCheck();
final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
final Task task =
new Task(
"testSourceVertexDataFetching",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.emptyList(),
Collections.singletonList(taskOutEdge),
vertexIdToReadable);
// Execute the task.
final Thread watermarkEmitThread = new Thread(new Runnable() {
@Override
public void run() {
waitUntilWatermarkEmitted(watermarks2);
watermarks1.add(600L);
watermarks1.add(1400L);
watermarks1.add(1800L);
waitUntilWatermarkEmitted(watermarks1);
watermarks2.add(2200L);
waitUntilWatermarkEmitted(watermarks2);
watermarks1.add(2500L);
waitUntilWatermarkEmitted(watermarks1);
}
});
watermarkEmitThread.start();
final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
taskExecutor.execute();
watermarkEmitThread.join();
// Check whether the watermark is emitted
assertEquals(Arrays.asList(
new Watermark(500), new Watermark(600), new Watermark(1000),
new Watermark(1800), new Watermark(2200)), emittedWatermarks);
// Check the output.
final List<Integer> doubledElements = new ArrayList<>(elements.size() * 2);
doubledElements.addAll(elements);
doubledElements.addAll(elements);
assertTrue(checkEqualElements(doubledElements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
}
/**
* The DAG of the task to test will looks like:
* parent task -> task (vertex 1 -> task 2) -> child task
* <p>
* The output data from task 1 will be split according to source parallelism through {@link ParentTaskReaderAnswer}.
* Because of this, task 1 will process multiple partitions and emit data in multiple times also.
* On the other hand, task 2 will receive the output data once and produce a single output.
*/
@Test(timeout = 5000)
public void testTwoOperators() throws Exception {
final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransform());
final String edgeId = "edge";
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(operatorIRVertex1)
.addVertex(operatorIRVertex2)
.connectVertices(createEdge(operatorIRVertex1, operatorIRVertex2, edgeId))
.buildWithoutSourceSinkCheck();
final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
final Task task = new Task(
"testSourceVertexDataFetching",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.singletonList(mockStageEdgeTo(operatorIRVertex1)),
Collections.singletonList(taskOutEdge),
Collections.emptyMap());
// Execute the task.
final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
taskExecutor.execute();
// Check the output.
assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
}
@Test(timeout = 5000)
public void testTwoOperatorsWithBroadcastVariable() {
final Transform singleListTransform = new CreateSingleListTransform();
final long broadcastId = 0;
final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
final IRVertex operatorIRVertex2 = new OperatorVertex(new BroadcastVariablePairingTransform(broadcastId));
final String edgeId = "edge";
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(operatorIRVertex1)
.addVertex(operatorIRVertex2)
.connectVertices(createEdge(operatorIRVertex1, operatorIRVertex2, edgeId))
.buildWithoutSourceSinkCheck();
final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
final StageEdge taskInEdge = mockStageEdgeTo(operatorIRVertex1);
when(broadcastManagerWorker.get(broadcastId)).thenReturn(new ArrayList<>(elements));
final Task task = new Task(
"testSourceVertexDataFetching",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.singletonList(taskInEdge),
Collections.singletonList(taskOutEdge),
Collections.emptyMap());
// Execute the task.
final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
taskExecutor.execute();
// Check the output.
final List<Pair<List<Integer>, Integer>> pairs = runtimeEdgeToOutputData.get(taskOutEdge.getId());
final List<Integer> values = pairs.stream().map(Pair::right).collect(Collectors.toList());
assertTrue(checkEqualElements(elements, values));
assertTrue(pairs.stream().map(Pair::left).allMatch(broadcastVar -> checkEqualElements(broadcastVar, values)));
}
/**
* The DAG of the task to test looks like:
* parent vertex 1 --+-- vertex 2 (main tag)
* +-- vertex 3 (additional tag 1)
* +-- vertex 4 (additional tag 2)
* <p>
* emit(element) and emit(dstVertexId, element) used together. emit(element) routes results to main output children,
* and emit(dstVertexId, element) routes results to corresponding additional output children.
*/
@Test(timeout = 5000)
public void testAdditionalOutputs() throws Exception {
final String additionalTag1 = "bonus1";
final String additionalTag2 = "bonus2";
final IRVertex routerVertex = new OperatorVertex(
new RoutingTransform(Arrays.asList(additionalTag1, additionalTag2)));
final IRVertex mainVertex = new OperatorVertex(new StreamTransform());
final IRVertex bonusVertex1 = new OperatorVertex(new StreamTransform());
final IRVertex bonusVertex2 = new OperatorVertex(new StreamTransform());
final RuntimeEdge<IRVertex> edge1 = createEdge(routerVertex, mainVertex, "edge-1");
final RuntimeEdge<IRVertex> edge2 = createEdge(routerVertex, bonusVertex1, "edge-2");
final RuntimeEdge<IRVertex> edge3 = createEdge(routerVertex, bonusVertex2, "edge-3");
edge2.getExecutionProperties().put(AdditionalOutputTagProperty.of(additionalTag1));
edge3.getExecutionProperties().put(AdditionalOutputTagProperty.of(additionalTag2));
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(routerVertex)
.addVertex(mainVertex)
.addVertex(bonusVertex1)
.addVertex(bonusVertex2)
.connectVertices(edge1)
.connectVertices(edge2)
.connectVertices(edge3)
.buildWithoutSourceSinkCheck();
final StageEdge outEdge1 = mockStageEdgeFrom(mainVertex);
final StageEdge outEdge2 = mockStageEdgeFrom(bonusVertex1);
final StageEdge outEdge3 = mockStageEdgeFrom(bonusVertex2);
final Task task = new Task(
"testAdditionalOutputs",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.singletonList(mockStageEdgeTo(routerVertex)),
Arrays.asList(outEdge1, outEdge2, outEdge3),
Collections.emptyMap());
// Execute the task.
final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
taskExecutor.execute();
// Check the output.
final List<Integer> mainOutputs = runtimeEdgeToOutputData.get(outEdge1.getId());
final List<Integer> bonusOutputs1 = runtimeEdgeToOutputData.get(outEdge2.getId());
final List<Integer> bonusOutputs2 = runtimeEdgeToOutputData.get(outEdge3.getId());
List<Integer> even = elements.stream().filter(i -> i % 2 == 0).collect(Collectors.toList());
List<Integer> odd = elements.stream().filter(i -> i % 2 != 0).collect(Collectors.toList());
assertTrue(checkEqualElements(even, mainOutputs));
assertTrue(checkEqualElements(odd, bonusOutputs1));
assertTrue(checkEqualElements(odd, bonusOutputs2));
}
private RuntimeEdge<IRVertex> createEdge(final IRVertex src,
final IRVertex dst,
final String runtimeIREdgeId) {
ExecutionPropertyMap<EdgeExecutionProperty> edgeProperties = new ExecutionPropertyMap<>(runtimeIREdgeId);
edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst);
}
private StageEdge mockStageEdgeFrom(final IRVertex irVertex) {
return new StageEdge("SEdge" + RUNTIME_EDGE_ID.getAndIncrement(),
ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne),
irVertex,
new OperatorVertex(new StreamTransform()),
mock(Stage.class),
mock(Stage.class));
}
private StageEdge mockStageEdgeTo(final IRVertex irVertex) {
final ExecutionPropertyMap executionPropertyMap =
ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne);
return new StageEdge("runtime outgoing edge id",
executionPropertyMap,
new OperatorVertex(new StreamTransform()),
irVertex,
mock(Stage.class),
mock(Stage.class));
}
/**
* Represents the answer return an inter-stage {@link InputReader},
* which will have multiple iterable according to the source parallelism.
*/
private class ParentTaskReaderAnswer implements Answer<InputReader> {
@Override
public InputReader answer(final InvocationOnMock invocationOnMock) throws Throwable {
final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> inputFutures = new ArrayList<>(SOURCE_PARALLELISM);
final int elementsPerSource = DATA_SIZE / SOURCE_PARALLELISM;
for (int i = 0; i < SOURCE_PARALLELISM; i++) {
inputFutures.add(CompletableFuture.completedFuture(
DataUtil.IteratorWithNumBytes.of(elements.subList(i * elementsPerSource, (i + 1) * elementsPerSource)
.iterator())));
}
final InputReader inputReader = mock(InputReader.class);
final IRVertex srcVertex = (IRVertex) invocationOnMock.getArgument(1);
srcVertex.setProperty(ParallelismProperty.of(SOURCE_PARALLELISM));
when(inputReader.getSrcIrVertex()).thenReturn(srcVertex);
when(inputReader.read()).thenReturn(inputFutures);
return inputReader;
}
}
/**
* Represents the answer return a {@link OutputWriter},
* which will stores the data to the map between task id and output data.
*/
private class ChildTaskWriterAnswer implements Answer<OutputWriter> {
@Override
public OutputWriter answer(final InvocationOnMock invocationOnMock) throws Throwable {
final Object[] args = invocationOnMock.getArguments();
final RuntimeEdge runtimeEdge = (RuntimeEdge) args[1];
final OutputWriter outputWriter = mock(OutputWriter.class);
doAnswer(new Answer() {
@Override
public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
final Object[] args = invocationOnMock.getArguments();
final Object dataToWrite = args[0];
runtimeEdgeToOutputData.computeIfAbsent(runtimeEdge.getId(), emptyTaskId -> new ArrayList<>());
runtimeEdgeToOutputData.get(runtimeEdge.getId()).add(dataToWrite);
return null;
}
}).when(outputWriter).write(any());
return outputWriter;
}
}
/**
* This transform does not emit watermark to OutputWriter
* because OutputWriter currently does not support watermarks (TODO #245)
*
* @param <T> type
*/
private class StreamTransformNoWatermarkEmit<T> implements Transform<T, T> {
private OutputCollector<T> outputCollector;
private final List<Watermark> emittedWatermarks;
StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
this.emittedWatermarks = emittedWatermarks;
}
@Override
public void prepare(final Context context, final OutputCollector<T> outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void onWatermark(Watermark watermark) {
emittedWatermarks.add(watermark);
}
@Override
public void onData(final Object element) {
outputCollector.emit((T) element);
}
@Override
public void close() {
// Do nothing.
}
}
/**
* Source vertex for unbounded source test.
*/
private final class TestUnboundedSourceVertex extends SourceVertex {
@Override
public boolean isBounded() {
return false;
}
@Override
public List<Readable> getReadables(int desiredNumOfSplits) throws Exception {
return null;
}
@Override
public long getEstimatedSizeBytes() {
return 0L;
}
@Override
public void clearInternalStates() {
}
@Override
public IRVertex getClone() {
return null;
}
}
// This emulates unbounded source that throws NoSuchElementException
// It reads current data until middle point and throws NoSuchElementException at the middle point.
// It resumes the data reading after emitting a watermark, and finishes at the end of the data.
private final class TestUnboundedSourceReadable implements Readable {
int pointer = 0;
final int middle = elements.size() / 2;
final int end = elements.size();
final Queue<Long> watermarks;
int numEmittedWatermarks = 0;
final int expectedNumWatermarks;
long currWatermark = -1;
public TestUnboundedSourceReadable(final Queue<Long> watermarks,
final int expectedNumWatermarks) {
this.watermarks = watermarks;
this.expectedNumWatermarks = expectedNumWatermarks;
}
@Override
public void prepare() {
}
@Override
public Object readCurrent() throws NoSuchElementException {
if (pointer == middle && numEmittedWatermarks < expectedNumWatermarks) {
throw new NoSuchElementException();
}
final Object element = elements.get(pointer);
pointer += 1;
return element;
}
@Override
public long readWatermark() {
if (numEmittedWatermarks >= expectedNumWatermarks) {
return Long.MAX_VALUE;
}
final Long watermark = watermarks.poll();
if (watermark == null) {
return currWatermark;
}
currWatermark = watermark;
numEmittedWatermarks += 1;
return watermark;
}
@Override
public boolean isFinished() {
return pointer == end;
}
@Override
public List<String> getLocations() throws Exception {
return null;
}
@Override
public void close() throws IOException {
}
}
/**
* Simple identity function for testing.
*
* @param <T> input/output type.
*/
private class StreamTransform<T> implements Transform<T, T> {
private OutputCollector<T> outputCollector;
@Override
public void prepare(final Context context, final OutputCollector<T> outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void onWatermark(Watermark watermark) {
outputCollector.emitWatermark(watermark);
}
@Override
public void onData(final Object element) {
outputCollector.emit((T) element);
}
@Override
public void close() {
// Do nothing.
}
}
/**
* Creates a view.
*
* @param <T> input type.
*/
private class CreateSingleListTransform<T> implements Transform<T, List<T>> {
private List<T> list;
private OutputCollector<List<T>> outputCollector;
@Override
public void prepare(final Context context, final OutputCollector<List<T>> outputCollector) {
this.list = new ArrayList<>();
this.outputCollector = outputCollector;
}
@Override
public void onWatermark(Watermark watermark) {
// do nothing
}
@Override
public void onData(final Object element) {
list.add((T) element);
}
@Override
public void close() {
outputCollector.emit(list);
}
}
/**
* Pairs data element with a broadcast variable.
*
* @param <T> input/output type.
*/
private class BroadcastVariablePairingTransform<T> implements Transform<T, T> {
private final Serializable broadcastVariableId;
private Context context;
private OutputCollector<T> outputCollector;
public BroadcastVariablePairingTransform(final Serializable broadcastVariableId) {
this.broadcastVariableId = broadcastVariableId;
}
@Override
public void prepare(final Context context, final OutputCollector<T> outputCollector) {
this.context = context;
this.outputCollector = outputCollector;
}
@Override
public void onWatermark(Watermark watermark) {
outputCollector.emitWatermark(watermark);
}
@Override
public void onData(final Object element) {
final Object broadcastVariable = context.getBroadcastVariable(broadcastVariableId);
outputCollector.emit((T) Pair.of(broadcastVariable, element));
}
@Override
public void close() {
// Do nothing.
}
}
/**
* Simple conditional identity function for testing additional outputs.
*/
private class RoutingTransform implements Transform<Integer, Integer> {
private OutputCollector<Integer> outputCollector;
private final Collection<String> additionalTags;
public RoutingTransform(final Collection<String> additionalTags) {
this.additionalTags = additionalTags;
}
@Override
public void prepare(final Context context, OutputCollector<Integer> outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void onData(final Integer element) {
final int i = element;
if (i % 2 == 0) {
// route to all main outputs. Invoked if user calls c.output(element)
outputCollector.emit(i);
} else {
// route to all additional outputs. Invoked if user calls c.output(tupleTag, element)
additionalTags.forEach(tag -> outputCollector.emit(tag, i));
}
}
@Override
public void onWatermark(Watermark watermark) {
outputCollector.emitWatermark(watermark);
}
@Override
public void close() {
// Do nothing.
}
}
/**
* Gets a list of integer pair elements in range.
*
* @param start value of the range (inclusive).
* @param end value of the range (exclusive).
* @return the list of elements.
*/
private List<Integer> getRangedNumList(final int start, final int end) {
final List<Integer> numList = new ArrayList<>(end - start);
IntStream.range(start, end).forEach(number -> numList.add(number));
return numList;
}
private TaskExecutor getTaskExecutor(final Task task, final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag) {
return new TaskExecutor(task, taskDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
metricMessageSender, persistentConnectionToMasterMap);
}
}