| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nemo.runtime.executor.task; |
| |
| import com.google.common.collect.Lists; |
| import org.apache.commons.lang3.SerializationUtils; |
| import org.apache.commons.lang3.exception.ExceptionUtils; |
| import org.apache.nemo.common.Pair; |
| import org.apache.nemo.common.dag.DAG; |
| import org.apache.nemo.common.dag.Edge; |
| import org.apache.nemo.common.ir.OutputCollector; |
| import org.apache.nemo.common.ir.Readable; |
| import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty; |
| import org.apache.nemo.common.ir.vertex.IRVertex; |
| import org.apache.nemo.common.ir.vertex.OperatorVertex; |
| import org.apache.nemo.common.ir.vertex.SourceVertex; |
| import org.apache.nemo.common.ir.vertex.transform.MessageAggregatorTransform; |
| import org.apache.nemo.common.ir.vertex.transform.Transform; |
| import org.apache.nemo.common.punctuation.Finishmark; |
| import org.apache.nemo.common.punctuation.Watermark; |
| import org.apache.nemo.runtime.common.RuntimeIdManager; |
| import org.apache.nemo.runtime.common.comm.ControlMessage; |
| import org.apache.nemo.runtime.common.message.MessageEnvironment; |
| import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap; |
| import org.apache.nemo.runtime.common.plan.RuntimeEdge; |
| import org.apache.nemo.runtime.common.plan.StageEdge; |
| import org.apache.nemo.runtime.common.plan.Task; |
| import org.apache.nemo.runtime.common.state.TaskState; |
| import org.apache.nemo.runtime.executor.MetricMessageSender; |
| import org.apache.nemo.runtime.executor.TaskStateManager; |
| import org.apache.nemo.runtime.executor.TransformContextImpl; |
| import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker; |
| import org.apache.nemo.runtime.executor.datatransfer.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.concurrent.NotThreadSafe; |
| import java.io.IOException; |
| import java.util.*; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Executes a task. |
| * Should be accessed by a single thread. |
| */ |
| @NotThreadSafe |
| public final class TaskExecutor { |
| private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName()); |
| |
| // Essential information |
| private boolean isExecuted; |
| private final String taskId; |
| private final TaskStateManager taskStateManager; |
| private final List<DataFetcher> dataFetchers; |
| private final BroadcastManagerWorker broadcastManagerWorker; |
| private final List<VertexHarness> sortedHarnesses; |
| |
| // Metrics information |
| private long boundedSourceReadTime = 0; |
| private long serializedReadBytes = 0; |
| private long encodedReadBytes = 0; |
| private final MetricMessageSender metricMessageSender; |
| |
| // Dynamic optimization |
| private String idOfVertexPutOnHold; |
| |
| private final PersistentConnectionToMasterMap persistentConnectionToMasterMap; |
| |
| /** |
| * Constructor. |
| * |
| * @param task Task with information needed during execution. |
| * @param irVertexDag A DAG of vertices. |
| * @param taskStateManager State manager for this Task. |
| * @param intermediateDataIOFactory For reading from/writing to data to other tasks. |
| * @param broadcastManagerWorker For broadcasts. |
| * @param metricMessageSender For sending metric with execution stats to the master. |
| * @param persistentConnectionToMasterMap For sending messages to the master. |
| */ |
| public TaskExecutor(final Task task, |
| final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag, |
| final TaskStateManager taskStateManager, |
| final IntermediateDataIOFactory intermediateDataIOFactory, |
| final BroadcastManagerWorker broadcastManagerWorker, |
| final MetricMessageSender metricMessageSender, |
| final PersistentConnectionToMasterMap persistentConnectionToMasterMap) { |
| // Essential information |
| this.isExecuted = false; |
| this.taskId = task.getTaskId(); |
| this.taskStateManager = taskStateManager; |
| this.broadcastManagerWorker = broadcastManagerWorker; |
| |
| // Metric sender |
| this.metricMessageSender = metricMessageSender; |
| |
| // Dynamic optimization |
| // Assigning null is very bad, but we are keeping this for now |
| this.idOfVertexPutOnHold = null; |
| |
| this.persistentConnectionToMasterMap = persistentConnectionToMasterMap; |
| |
| // Prepare data structures |
| final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, intermediateDataIOFactory); |
| this.dataFetchers = pair.left(); |
| this.sortedHarnesses = pair.right(); |
| } |
| |
| // Get all of the intra-task edges + inter-task edges |
| private List<Edge> getAllIncomingEdges( |
| final Task task, |
| final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag, |
| final IRVertex childVertex) { |
| final List<Edge> edges = new ArrayList<>(); |
| edges.addAll(irVertexDag.getIncomingEdgesOf(childVertex)); |
| final List<StageEdge> taskEdges = task.getTaskIncomingEdges().stream() |
| .filter(edge -> edge.getDstIRVertex().getId().equals(childVertex.getId())) |
| .collect(Collectors.toList()); |
| edges.addAll(taskEdges); |
| return edges; |
| } |
| |
| |
| /** |
| * Converts the DAG of vertices into pointer-based DAG of vertex harnesses. |
| * This conversion is necessary for constructing concrete data channels for each vertex's inputs and outputs. |
| * <p> |
| * - Source vertex read: Explicitly handled (SourceVertexDataFetcher) |
| * - Sink vertex write: Implicitly handled within the vertex |
| * <p> |
| * - Parent-task read: Explicitly handled (ParentTaskDataFetcher) |
| * - Children-task write: Explicitly handled (VertexHarness) |
| * <p> |
| * - Intra-task read: Implicitly handled when performing Intra-task writes |
| * - Intra-task write: Explicitly handled (VertexHarness) |
| * <p> |
| * For element-wise data processing, we traverse vertex harnesses from the roots to the leaves for each element. |
| * This means that overheads associated with jumping from one harness to the other should be minimal. |
| * For example, we should never perform an expensive hash operation to traverse the harnesses. |
| * |
| * @param task task. |
| * @param irVertexDag dag. |
| * @param intermediateDataIOFactory intermediate IO. |
| * @return fetchers and harnesses. |
| */ |
| private Pair<List<DataFetcher>, List<VertexHarness>> prepare( |
| final Task task, |
| final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag, |
| final IntermediateDataIOFactory intermediateDataIOFactory) { |
| final int taskIndex = RuntimeIdManager.getIndexFromTaskId(task.getTaskId()); |
| |
| // Traverse in a reverse-topological order to ensure that each visited vertex's children vertices exist. |
| final List<IRVertex> reverseTopologicallySorted = Lists.reverse(irVertexDag.getTopologicalSort()); |
| |
| // Build a map for edge as a key and edge index as a value |
| // This variable is used for creating NextIntraTaskOperatorInfo |
| // in {@link this#getInternalMainOutputs and this#internalMainOutputs} |
| final Map<Edge, Integer> edgeIndexMap = new HashMap<>(); |
| reverseTopologicallySorted.forEach(childVertex -> { |
| final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, childVertex); |
| for (int edgeIndex = 0; edgeIndex < edges.size(); edgeIndex++) { |
| final Edge edge = edges.get(edgeIndex); |
| edgeIndexMap.putIfAbsent(edge, edgeIndex); |
| } |
| }); |
| |
| // Build a map for InputWatermarkManager for each operator vertex |
| // This variable is used for creating NextIntraTaskOperatorInfo |
| // in {@link this#getInternalMainOutputs and this#internalMainOutputs} |
| final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap = new HashMap<>(); |
| reverseTopologicallySorted.forEach(childVertex -> { |
| if (childVertex instanceof OperatorVertex) { |
| final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, childVertex); |
| if (edges.size() == 1) { |
| operatorWatermarkManagerMap.putIfAbsent(childVertex, |
| new SingleInputWatermarkManager( |
| new OperatorWatermarkCollector((OperatorVertex) childVertex))); |
| } else { |
| operatorWatermarkManagerMap.putIfAbsent(childVertex, |
| new MultiInputWatermarkManager(edges.size(), |
| new OperatorWatermarkCollector((OperatorVertex) childVertex))); |
| } |
| } |
| }); |
| |
| // Create a harness for each vertex |
| final List<DataFetcher> dataFetcherList = new ArrayList<>(); |
| final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>(); |
| |
| reverseTopologicallySorted.forEach(irVertex -> { |
| final Optional<Readable> sourceReader = getSourceVertexReader(irVertex, task.getIrVertexIdToReadable()); |
| if (sourceReader.isPresent() != irVertex instanceof SourceVertex) { |
| throw new IllegalStateException(irVertex.toString()); |
| } |
| |
| // Additional outputs |
| final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputMap = |
| getInternalOutputMap(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap); |
| final Map<String, List<OutputWriter>> externalAdditionalOutputMap = |
| getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory); |
| |
| // Main outputs |
| final List<NextIntraTaskOperatorInfo> internalMainOutputs; |
| if (internalAdditionalOutputMap.containsKey(AdditionalOutputTagProperty.getMainOutputTag())) { |
| internalMainOutputs = internalAdditionalOutputMap.remove(AdditionalOutputTagProperty.getMainOutputTag()); |
| } else { |
| internalMainOutputs = new ArrayList<>(); |
| } |
| final List<OutputWriter> externalMainOutputs = |
| getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory); |
| |
| final OutputCollector outputCollector; |
| |
| if (irVertex instanceof OperatorVertex |
| && ((OperatorVertex) irVertex).getTransform() instanceof MessageAggregatorTransform) { |
| outputCollector = new RunTimeMessageOutputCollector( |
| taskId, irVertex, persistentConnectionToMasterMap, this); |
| } else { |
| outputCollector = new OperatorVertexOutputCollector( |
| irVertex, internalMainOutputs, internalAdditionalOutputMap, |
| externalMainOutputs, externalAdditionalOutputMap); |
| } |
| |
| // Create VERTEX HARNESS |
| final VertexHarness vertexHarness = new VertexHarness( |
| irVertex, outputCollector, new TransformContextImpl(broadcastManagerWorker), |
| externalMainOutputs, externalAdditionalOutputMap); |
| |
| prepareTransform(vertexHarness); |
| vertexIdToHarness.put(irVertex.getId(), vertexHarness); |
| |
| // Prepare data READ |
| // Source read |
| if (irVertex instanceof SourceVertex) { |
| // Source vertex read |
| dataFetcherList.add(new SourceVertexDataFetcher( |
| (SourceVertex) irVertex, |
| sourceReader.get(), |
| outputCollector)); |
| } |
| |
| // Parent-task read |
| // TODO #285: Cache broadcasted data |
| task.getTaskIncomingEdges() |
| .stream() |
| .filter(inEdge -> inEdge.getDstIRVertex().getId().equals(irVertex.getId())) // edge to this vertex |
| .map(incomingEdge -> |
| Pair.of(incomingEdge, intermediateDataIOFactory |
| .createReader(taskIndex, incomingEdge.getSrcIRVertex(), incomingEdge))) |
| .forEach(pair -> { |
| if (irVertex instanceof OperatorVertex) { |
| final StageEdge edge = pair.left(); |
| final int edgeIndex = edgeIndexMap.get(edge); |
| final InputWatermarkManager watermarkManager = operatorWatermarkManagerMap.get(irVertex); |
| final InputReader parentTaskReader = pair.right(); |
| final OutputCollector dataFetcherOutputCollector = |
| new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager); |
| |
| if (parentTaskReader instanceof PipeInputReader) { |
| dataFetcherList.add( |
| new MultiThreadParentTaskDataFetcher( |
| parentTaskReader.getSrcIrVertex(), |
| parentTaskReader, |
| dataFetcherOutputCollector)); |
| } else { |
| dataFetcherList.add( |
| new ParentTaskDataFetcher( |
| parentTaskReader.getSrcIrVertex(), |
| parentTaskReader, |
| dataFetcherOutputCollector)); |
| } |
| } |
| }); |
| }); |
| |
| final List<VertexHarness> sortedHarnessList = irVertexDag.getTopologicalSort() |
| .stream() |
| .map(vertex -> vertexIdToHarness.get(vertex.getId())) |
| .collect(Collectors.toList()); |
| |
| return Pair.of(dataFetcherList, sortedHarnessList); |
| } |
| |
| /** |
| * Process a data element down the DAG dependency. |
| */ |
| private void processElement(final OutputCollector outputCollector, final Object dataElement) { |
| outputCollector.emit(dataElement); |
| } |
| |
| private void processWatermark(final OutputCollector outputCollector, |
| final Watermark watermark) { |
| outputCollector.emitWatermark(watermark); |
| } |
| |
| /** |
| * Execute a task, while handling unrecoverable errors and exceptions. |
| */ |
| public void execute() { |
| try { |
| doExecute(); |
| } catch (Throwable throwable) { |
| // ANY uncaught throwable is reported to the master |
| taskStateManager.onTaskStateChanged(TaskState.State.FAILED, Optional.empty(), Optional.empty()); |
| LOG.error(ExceptionUtils.getStackTrace(throwable)); |
| } |
| } |
| |
| /** |
| * The task is executed in the following two phases. |
| * - Phase 1: Consume task-external input data |
| * - Phase 2: Finalize task-internal states and data elements |
| */ |
| private void doExecute() { |
| // Housekeeping stuff |
| if (isExecuted) { |
| throw new RuntimeException("Task {" + taskId + "} execution called again"); |
| } |
| LOG.info("{} started", taskId); |
| taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty()); |
| |
| // Phase 1: Consume task-external input data. |
| if (!handleDataFetchers(dataFetchers)) { |
| return; |
| } |
| |
| metricMessageSender.send("TaskMetric", taskId, |
| "boundedSourceReadTime", SerializationUtils.serialize(boundedSourceReadTime)); |
| metricMessageSender.send("TaskMetric", taskId, |
| "serializedReadBytes", SerializationUtils.serialize(serializedReadBytes)); |
| metricMessageSender.send("TaskMetric", taskId, |
| "encodedReadBytes", SerializationUtils.serialize(encodedReadBytes)); |
| |
| // Phase 2: Finalize task-internal states and elements |
| for (final VertexHarness vertexHarness : sortedHarnesses) { |
| finalizeVertex(vertexHarness); |
| } |
| |
| if (idOfVertexPutOnHold == null) { |
| taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty()); |
| LOG.info("{} completed", taskId); |
| } else { |
| taskStateManager.onTaskStateChanged(TaskState.State.ON_HOLD, |
| Optional.of(idOfVertexPutOnHold), |
| Optional.empty()); |
| LOG.info("{} on hold", taskId); |
| } |
| } |
| |
| private void finalizeVertex(final VertexHarness vertexHarness) { |
| closeTransform(vertexHarness); |
| finalizeOutputWriters(vertexHarness); |
| } |
| |
| /** |
| * Process an event generated from the dataFetcher. |
| * If the event is an instance of Finishmark, we remove the dataFetcher from the current list. |
| * |
| * @param event event |
| * @param dataFetcher current data fetcher |
| */ |
| private void onEventFromDataFetcher(final Object event, |
| final DataFetcher dataFetcher) { |
| if (event instanceof Finishmark) { |
| // We've consumed all the data from this data fetcher. |
| if (dataFetcher instanceof SourceVertexDataFetcher) { |
| boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime(); |
| } else if (dataFetcher instanceof ParentTaskDataFetcher) { |
| serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes(); |
| encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes(); |
| } else if (dataFetcher instanceof MultiThreadParentTaskDataFetcher) { |
| serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes(); |
| encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes(); |
| } |
| } else if (event instanceof Watermark) { |
| // Watermark |
| processWatermark(dataFetcher.getOutputCollector(), (Watermark) event); |
| } else { |
| // Process data element |
| processElement(dataFetcher.getOutputCollector(), event); |
| } |
| } |
| |
| /** |
| * Check if it is time to poll pending fetchers' data. |
| * |
| * @param pollingPeriod polling period |
| * @param currentTime current time |
| * @param prevTime prev time |
| */ |
| private boolean isPollingTime(final long pollingPeriod, |
| final long currentTime, |
| final long prevTime) { |
| return (currentTime - prevTime) >= pollingPeriod; |
| } |
| |
| /** |
| * This retrieves data from data fetchers and process them. |
| * It maintains two lists: |
| * -- availableFetchers: maintain data fetchers that currently have data elements to retreive |
| * -- pendingFetchers: maintain data fetchers that currently do not have available elements. |
| * This can become available in the future, and therefore we check the pending fetchers every pollingInterval. |
| * <p> |
| * If a data fetcher finishes, we remove it from the two lists. |
| * If a data fetcher has no available element, we move the data fetcher to pendingFetchers |
| * If a pending data fetcher has element, we move it to availableFetchers |
| * If there are no available fetchers but pending fetchers, sleep for pollingPeriod |
| * and retry fetching data from the pendingFetchers. |
| * |
| * @param fetchers to handle. |
| * @return false if IOException. |
| */ |
| private boolean handleDataFetchers(final List<DataFetcher> fetchers) { |
| final List<DataFetcher> availableFetchers = new LinkedList<>(fetchers); |
| final List<DataFetcher> pendingFetchers = new LinkedList<>(); |
| |
| // Polling interval. |
| final long pollingInterval = 100; // ms |
| |
| // Previous polling time |
| long prevPollingTime = System.currentTimeMillis(); |
| |
| // empty means we've consumed all task-external input data |
| while (!availableFetchers.isEmpty() || !pendingFetchers.isEmpty()) { |
| // We first fetch data from available data fetchers |
| final Iterator<DataFetcher> availableIterator = availableFetchers.iterator(); |
| |
| while (availableIterator.hasNext()) { |
| final DataFetcher dataFetcher = availableIterator.next(); |
| try { |
| final Object element = dataFetcher.fetchDataElement(); |
| onEventFromDataFetcher(element, dataFetcher); |
| if (element instanceof Finishmark) { |
| availableIterator.remove(); |
| } |
| } catch (final NoSuchElementException e) { |
| // No element in current data fetcher, fetch data from next fetcher |
| // move current data fetcher to pending. |
| availableIterator.remove(); |
| pendingFetchers.add(dataFetcher); |
| } catch (final IOException e) { |
| // IOException means that this task should be retried. |
| taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY, |
| Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE)); |
| LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e); |
| return false; |
| } |
| } |
| |
| final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator(); |
| final long currentTime = System.currentTimeMillis(); |
| |
| |
| if (isPollingTime(pollingInterval, currentTime, prevPollingTime)) { |
| // We check pending data every polling interval |
| prevPollingTime = currentTime; |
| |
| while (pendingIterator.hasNext()) { |
| final DataFetcher dataFetcher = pendingIterator.next(); |
| try { |
| final Object element = dataFetcher.fetchDataElement(); |
| onEventFromDataFetcher(element, dataFetcher); |
| |
| // We processed data. This means the data fetcher is now available. |
| // Add current data fetcher to available |
| pendingIterator.remove(); |
| if (!(element instanceof Finishmark)) { |
| availableFetchers.add(dataFetcher); |
| } |
| |
| } catch (final NoSuchElementException e) { |
| // The current data fetcher is still pending.. try next data fetcher |
| } catch (final IOException e) { |
| // IOException means that this task should be retried. |
| taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY, |
| Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE)); |
| LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e); |
| return false; |
| } |
| } |
| } |
| |
| // If there are no available fetchers, |
| // Sleep and retry fetching element from pending fetchers every polling interval |
| if (availableFetchers.isEmpty() && !pendingFetchers.isEmpty()) { |
| try { |
| Thread.sleep(pollingInterval); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| // Close all data fetchers |
| fetchers.forEach(fetcher -> { |
| try { |
| fetcher.close(); |
| } catch (final Exception e) { |
| e.printStackTrace(); |
| throw new RuntimeException(e); |
| } |
| }); |
| |
| return true; |
| } |
| |
| ////////////////////////////////////////////// Helper methods for setting up initial data structures |
| private Map<String, List<OutputWriter>> getExternalAdditionalOutputMap( |
| final IRVertex irVertex, |
| final List<StageEdge> outEdgesToChildrenTasks, |
| final IntermediateDataIOFactory intermediateDataIOFactory) { |
| // Add all inter-task additional tags to additional output map. |
| final Map<String, List<OutputWriter>> map = new HashMap<>(); |
| |
| outEdgesToChildrenTasks |
| .stream() |
| .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId())) |
| .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent()) |
| .map(edge -> |
| Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), |
| intermediateDataIOFactory.createWriter(taskId, edge))) |
| .forEach(pair -> { |
| map.putIfAbsent(pair.left(), new ArrayList<>()); |
| map.get(pair.left()).add(pair.right()); |
| }); |
| |
| return map; |
| } |
| |
| /** |
| * Return a map of Internal Outputs associated with their output tag. |
| * If an edge has no output tag, its info are added to the mainOutputTag. |
| * |
| * @param irVertex source irVertex |
| * @param irVertexDag DAG of IRVertex and RuntimeEdge |
| * @param edgeIndexMap Map of edge and index |
| * @param operatorWatermarkManagerMap Map of irVertex and InputWatermarkManager |
| * @return The map of output tag to the list of next intra-task operator information. |
| */ |
| private Map<String, List<NextIntraTaskOperatorInfo>> getInternalOutputMap( |
| final IRVertex irVertex, |
| final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag, |
| final Map<Edge, Integer> edgeIndexMap, |
| final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) { |
| // Add all intra-task tags to additional output map. |
| final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>(); |
| |
| irVertexDag.getOutgoingEdgesOf(irVertex.getId()) |
| .stream() |
| .map(edge -> { |
| final boolean isPresent = edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent(); |
| final String outputTag; |
| if (isPresent) { |
| outputTag = edge.getPropertyValue(AdditionalOutputTagProperty.class).get(); |
| } else { |
| outputTag = AdditionalOutputTagProperty.getMainOutputTag(); |
| } |
| final int index = edgeIndexMap.get(edge); |
| final OperatorVertex nextOperator = (OperatorVertex) edge.getDst(); |
| final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator); |
| return Pair.of(outputTag, new NextIntraTaskOperatorInfo(index, nextOperator, inputWatermarkManager)); |
| }) |
| .forEach(pair -> { |
| map.putIfAbsent(pair.left(), new ArrayList<>()); |
| map.get(pair.left()).add(pair.right()); |
| }); |
| |
| return map; |
| } |
| |
| /** |
| * Return inter-task OutputWriters, for single output or output associated with main tag. |
| * |
| * @param irVertex source irVertex |
| * @param outEdgesToChildrenTasks outgoing edges to child tasks |
| * @param intermediateDataIOFactory intermediateDataIOFactory |
| * @return OutputWriters for main children tasks |
| */ |
| private List<OutputWriter> getExternalMainOutputs(final IRVertex irVertex, |
| final List<StageEdge> outEdgesToChildrenTasks, |
| final IntermediateDataIOFactory intermediateDataIOFactory) { |
| return outEdgesToChildrenTasks |
| .stream() |
| .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId())) |
| .filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent()) |
| .map(outEdgeForThisVertex -> intermediateDataIOFactory |
| .createWriter(taskId, outEdgeForThisVertex)) |
| .collect(Collectors.toList()); |
| } |
| |
| |
| private Optional<Readable> getSourceVertexReader(final IRVertex irVertex, |
| final Map<String, Readable> irVertexIdToReadable) { |
| if (irVertex instanceof SourceVertex) { |
| final Readable readable = irVertexIdToReadable.get(irVertex.getId()); |
| if (readable == null) { |
| throw new IllegalStateException(irVertex.toString()); |
| } |
| return Optional.of(readable); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| private List<InputReader> getParentTaskReaders(final int taskIndex, |
| final List<StageEdge> inEdgesFromParentTasks, |
| final IntermediateDataIOFactory intermediateDataIOFactory) { |
| return inEdgesFromParentTasks |
| .stream() |
| .map(inEdgeForThisVertex -> intermediateDataIOFactory |
| .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), inEdgeForThisVertex)) |
| .collect(Collectors.toList()); |
| } |
| |
| ////////////////////////////////////////////// Transform-specific helper methods |
| |
| private void prepareTransform(final VertexHarness vertexHarness) { |
| final IRVertex irVertex = vertexHarness.getIRVertex(); |
| final Transform transform; |
| if (irVertex instanceof OperatorVertex) { |
| transform = ((OperatorVertex) irVertex).getTransform(); |
| transform.prepare(vertexHarness.getContext(), vertexHarness.getOutputCollector()); |
| } |
| } |
| |
| private void closeTransform(final VertexHarness vertexHarness) { |
| final IRVertex irVertex = vertexHarness.getIRVertex(); |
| final Transform transform; |
| if (irVertex instanceof OperatorVertex) { |
| transform = ((OperatorVertex) irVertex).getTransform(); |
| transform.close(); |
| } |
| |
| vertexHarness.getContext().getSerializedData().ifPresent(data -> |
| persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send( |
| ControlMessage.Message.newBuilder() |
| .setId(RuntimeIdManager.generateMessageId()) |
| .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID) |
| .setType(ControlMessage.MessageType.ExecutorDataCollected) |
| .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(data).build()) |
| .build())); |
| } |
| |
| ////////////////////////////////////////////// Misc |
| |
| public void setIRVertexPutOnHold(final IRVertex irVertex) { |
| idOfVertexPutOnHold = irVertex.getId(); |
| } |
| |
| /** |
| * Finalize the output write of this vertex. |
| * As element-wise output write is done and the block is in memory, |
| * flush the block into the designated data store and commit it. |
| * |
| * @param vertexHarness harness. |
| */ |
| private void finalizeOutputWriters(final VertexHarness vertexHarness) { |
| final List<Long> writtenBytesList = new ArrayList<>(); |
| |
| // finalize OutputWriters for main children |
| vertexHarness.getWritersToMainChildrenTasks().forEach(outputWriter -> { |
| outputWriter.close(); |
| final Optional<Long> writtenBytes = outputWriter.getWrittenBytes(); |
| writtenBytes.ifPresent(writtenBytesList::add); |
| }); |
| |
| // finalize OutputWriters for additional tagged children |
| vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriters -> { |
| outputWriters.forEach(outputWriter -> { |
| outputWriter.close(); |
| final Optional<Long> writtenBytes = outputWriter.getWrittenBytes(); |
| writtenBytes.ifPresent(writtenBytesList::add); |
| }); |
| }); |
| |
| long totalWrittenBytes = 0; |
| for (final Long writtenBytes : writtenBytesList) { |
| totalWrittenBytes += writtenBytes; |
| } |
| |
| // TODO #236: Decouple metric collection and sending logic |
| metricMessageSender.send("TaskMetric", taskId, |
| "writtenBytes", SerializationUtils.serialize(totalWrittenBytes)); |
| } |
| } |