blob: bcdb4af6c8df04bd99892bd7e2c33160a8b3d0ce [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
public class Edge {
private EdgeProperty edgeProperty;
private EdgeManager edgeManager;
@SuppressWarnings("rawtypes")
private EventHandler eventHandler;
private AtomicBoolean bufferEvents = new AtomicBoolean(false);
private List<TezEvent> destinationEventBuffer = new ArrayList<TezEvent>();
private List<TezEvent> sourceEventBuffer = new ArrayList<TezEvent>();
private Vertex sourceVertex;
private Vertex destinationVertex; // this may end up being a list for shared edge
@SuppressWarnings("rawtypes")
public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
this.edgeProperty = edgeProperty;
this.eventHandler = eventHandler;
switch (edgeProperty.getDataMovementType()) {
case ONE_TO_ONE:
edgeManager = new OneToOneEdgeManager();
break;
case BROADCAST:
edgeManager = new BroadcastEdgeManager();
break;
case SCATTER_GATHER:
edgeManager = new ScatterGatherEdgeManager();
break;
default:
String message = "Unknown edge data movement type: "
+ edgeProperty.getDataMovementType();
throw new TezUncheckedException(message);
}
}
public EdgeProperty getEdgeProperty() {
return this.edgeProperty;
}
public EdgeManager getEdgeManager() {
return this.edgeManager;
}
public void setEdgeManager(EdgeManager edgeManager) {
if(edgeManager == null) {
throw new TezUncheckedException("Edge manager cannot be null");
}
this.edgeManager = edgeManager;
}
public void setSourceVertex(Vertex sourceVertex) {
if (this.sourceVertex != null && this.sourceVertex != sourceVertex) {
throw new TezUncheckedException("Source vertex exists: "
+ sourceVertex.getName());
}
this.sourceVertex = sourceVertex;
}
public void setDestinationVertex(Vertex destinationVertex) {
if (this.destinationVertex != null
&& this.destinationVertex != destinationVertex) {
throw new TezUncheckedException("Destination vertex exists: "
+ destinationVertex.getName());
}
this.destinationVertex = destinationVertex;
}
public InputSpec getDestinationSpec(int destinationTaskIndex) {
return new InputSpec(sourceVertex.getName(),
edgeProperty.getEdgeDestination(),
edgeManager.getNumDestinationTaskInputs(sourceVertex.getTotalTasks(),
destinationTaskIndex));
}
public OutputSpec getSourceSpec(int sourceTaskIndex) {
return new OutputSpec(destinationVertex.getName(),
edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskOutputs(
destinationVertex.getTotalTasks(), sourceTaskIndex));
}
public void startEventBuffering() {
bufferEvents.set(true);
}
public void stopEventBuffering() {
// assume only 1 entity will start and stop event buffering
bufferEvents.set(false);
for(TezEvent event : destinationEventBuffer) {
sendTezEventToDestinationTasks(event);
}
destinationEventBuffer.clear();
for(TezEvent event : sourceEventBuffer) {
sendTezEventToSourceTasks(event);
}
sourceEventBuffer.clear();
}
@SuppressWarnings("unchecked")
public void sendTezEventToSourceTasks(TezEvent tezEvent) {
if (!bufferEvents.get()) {
switch (tezEvent.getEventType()) {
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent event = (InputReadErrorEvent) tezEvent.getEvent();
TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo()
.getTaskAttemptID();
int destTaskIndex = destAttemptId.getTaskID().getId();
int srcTaskIndex = edgeManager.routeEventToSourceTasks(destTaskIndex,
event);
int numConsumers = edgeManager.getDestinationConsumerTaskNumber(
srcTaskIndex, destinationVertex.getTotalTasks());
TezTaskID srcTaskId = sourceVertex.getTask(srcTaskIndex).getTaskId();
int taskAttemptIndex = event.getVersion();
TezTaskAttemptID srcTaskAttemptId = new TezTaskAttemptID(srcTaskId,
taskAttemptIndex);
eventHandler.handle(new TaskAttemptEventOutputFailed(srcTaskAttemptId,
tezEvent, numConsumers));
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ tezEvent.getEventType());
}
} else {
sourceEventBuffer.add(tezEvent);
}
}
public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
if (!bufferEvents.get()) {
List<Integer> destTaskIndices = new ArrayList<Integer>();
switch (tezEvent.getEventType()) {
case DATA_MOVEMENT_EVENT:
DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
TezTaskAttemptID dmSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
int dmSourceTaskIndex = dmSourceAttemptId.getTaskID().getId();
edgeManager.routeEventToDestinationTasks(dmEvent, dmSourceTaskIndex,
destinationVertex.getTotalTasks(), destTaskIndices);
for(Integer destTaskIndex : destTaskIndices) {
EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT,
destinationVertex.getName(),
sourceVertex.getName(),
null); // will be filled by Task when sending the event. Is it needed?
destMeta.setIndex(dmEvent.getTargetIndex());
tezEvent.setDestinationInfo(destMeta);
Task destTask = destinationVertex.getTask(destTaskIndex);
TezTaskID destTaskId = destTask.getTaskId();
sendEventToTask(destTaskId, tezEvent);
}
break;
case INPUT_FAILED_EVENT:
InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
TezTaskAttemptID ifSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
int ifSourceTaskIndex = ifSourceAttemptId.getTaskID().getId();
edgeManager.routeEventToDestinationTasks(ifEvent, ifSourceTaskIndex,
destinationVertex.getTotalTasks(), destTaskIndices);
for(Integer destTaskIndex : destTaskIndices) {
EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT,
destinationVertex.getName(),
sourceVertex.getName(),
null); // will be filled by Task when sending the event. Is it needed?
destMeta.setIndex(ifEvent.getTargetIndex());
tezEvent.setDestinationInfo(destMeta);
TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
sendEventToTask(destTaskId, tezEvent);
}
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ tezEvent.getEventType());
}
} else {
destinationEventBuffer.add(tezEvent);
}
}
@SuppressWarnings("unchecked")
private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
eventHandler.handle(new TaskEventAddTezEvent(taskId, tezEvent));
}
}