blob: ebea9a4f3fd6795e5ed7ae8aa72fae9f8275bb73 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.api.impl;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.tez.common.ProtoConverters;
import org.apache.tez.common.TezConverterUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
import org.apache.tez.runtime.api.events.EventProtos;
import org.apache.tez.runtime.api.events.EventProtos.CompositeEventProto;
import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto;
import org.apache.tez.runtime.api.events.EventProtos.CompositeRoutedDataMovementEventProto;
import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
import org.apache.tez.runtime.api.events.EventProtos.RootInputDataInformationEventProto;
import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptKilledEventProto;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import static org.apache.tez.runtime.api.events.EventProtos.*;
public class TezEvent implements Writable {
private EventType eventType;
private Event event;
private EventMetaData sourceInfo;
private EventMetaData destinationInfo;
private long eventReceivedTime;
public TezEvent() {
}
public TezEvent(Event event, EventMetaData sourceInfo) {
this(event, sourceInfo, System.currentTimeMillis());
}
public TezEvent(Event event, EventMetaData sourceInfo, long time) {
this.event = event;
this.eventReceivedTime = time;
this.setSourceInfo(sourceInfo);
if (event instanceof DataMovementEvent) {
eventType = EventType.DATA_MOVEMENT_EVENT;
} else if (event instanceof CustomProcessorEvent) {
eventType = EventType.CUSTOM_PROCESSOR_EVENT;
} else if (event instanceof CompositeDataMovementEvent) {
eventType = EventType.COMPOSITE_DATA_MOVEMENT_EVENT;
} else if (event instanceof CompositeRoutedDataMovementEvent) {
eventType = EventType.COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT;
} else if (event instanceof VertexManagerEvent) {
eventType = EventType.VERTEX_MANAGER_EVENT;
} else if (event instanceof InputReadErrorEvent) {
eventType = EventType.INPUT_READ_ERROR_EVENT;
} else if (event instanceof TaskAttemptFailedEvent) {
eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
} else if (event instanceof TaskAttemptKilledEvent) {
eventType = EventType.TASK_ATTEMPT_KILLED_EVENT;
} else if (event instanceof TaskAttemptCompletedEvent) {
eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
} else if (event instanceof InputFailedEvent) {
eventType = EventType.INPUT_FAILED_EVENT;
} else if (event instanceof TaskStatusUpdateEvent) {
eventType = EventType.TASK_STATUS_UPDATE_EVENT;
} else if (event instanceof InputDataInformationEvent) {
eventType = EventType.ROOT_INPUT_DATA_INFORMATION_EVENT;
} else if (event instanceof InputInitializerEvent) {
eventType = EventType.ROOT_INPUT_INITIALIZER_EVENT;
} else {
throw new TezUncheckedException("Unknown event, event="
+ event.getClass().getName());
}
}
public Event getEvent() {
return event;
}
public void setEventReceivedTime(long eventReceivedTime) { // TODO save
this.eventReceivedTime = eventReceivedTime;
}
public long getEventReceivedTime() {
return eventReceivedTime;
}
public EventMetaData getSourceInfo() {
return sourceInfo;
}
public void setSourceInfo(EventMetaData sourceInfo) {
this.sourceInfo = sourceInfo;
}
public EventMetaData getDestinationInfo() {
return destinationInfo;
}
public void setDestinationInfo(EventMetaData destinationInfo) {
this.destinationInfo = destinationInfo;
}
public EventType getEventType() {
return eventType;
}
private void serializeEvent(DataOutput out) throws IOException {
if (event == null) {
out.writeBoolean(false);
return;
}
out.writeBoolean(true);
out.writeInt(eventType.ordinal());
out.writeLong(eventReceivedTime);
if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
// TODO NEWTEZ convert to PB
TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
sEvt.write(out);
} else {
AbstractMessage message;
switch (eventType) {
case CUSTOM_PROCESSOR_EVENT:
message =
ProtoConverters.convertCustomProcessorEventToProto(
(CustomProcessorEvent) event);
break;
case DATA_MOVEMENT_EVENT:
message =
ProtoConverters.convertDataMovementEventToProto(
(DataMovementEvent) event);
break;
case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT:
message =
ProtoConverters.convertCompositeRoutedDataMovementEventToProto(
(CompositeRoutedDataMovementEvent) event);
break;
case COMPOSITE_DATA_MOVEMENT_EVENT:
message =
ProtoConverters.convertCompositeDataMovementEventToProto(
(CompositeDataMovementEvent) event);
break;
case VERTEX_MANAGER_EVENT:
message = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event);
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
message = InputReadErrorEventProto.newBuilder()
.setIndex(ideEvt.getIndex())
.setDiagnostics(ideEvt.getDiagnostics())
.setVersion(ideEvt.getVersion())
.setIsLocalFetch(ideEvt.isLocalFetch())
.setIsDiskErrorAtSource(ideEvt.isDiskErrorAtSource())
.build();
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
message = TaskAttemptFailedEventProto.newBuilder()
.setDiagnostics(tfEvt.getDiagnostics())
.setTaskFailureType(TezConverterUtils.failureTypeToProto(tfEvt.getTaskFailureType()))
.build();
break;
case TASK_ATTEMPT_KILLED_EVENT:
TaskAttemptKilledEvent tkEvent = (TaskAttemptKilledEvent) event;
message = TaskAttemptKilledEventProto.newBuilder()
.setDiagnostics(tkEvent.getDiagnostics()).build();
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
message = TaskAttemptCompletedEventProto.newBuilder()
.build();
break;
case INPUT_FAILED_EVENT:
InputFailedEvent ifEvt = (InputFailedEvent) event;
message = InputFailedEventProto.newBuilder()
.setTargetIndex(ifEvt.getTargetIndex())
.setVersion(ifEvt.getVersion()).build();
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
message = ProtoConverters.convertRootInputDataInformationEventToProto(
(InputDataInformationEvent) event);
break;
case ROOT_INPUT_INITIALIZER_EVENT:
message = ProtoConverters
.convertRootInputInitializerEventToProto((InputInitializerEvent) event);
break;
default:
throw new TezUncheckedException("Unknown TezEvent"
+ ", type=" + eventType);
}
if (out instanceof OutputStream) { //DataOutputBuffer extends DataOutputStream
int serializedSize = message.getSerializedSize();
out.writeInt(serializedSize);
int buffersize = serializedSize < CodedOutputStream.DEFAULT_BUFFER_SIZE ? serializedSize
: CodedOutputStream.DEFAULT_BUFFER_SIZE;
CodedOutputStream codedOut = CodedOutputStream.newInstance(
(OutputStream) out, buffersize);
message.writeTo(codedOut);
codedOut.flush();
} else {
byte[] eventBytes = message.toByteArray();
out.writeInt(eventBytes.length);
out.write(eventBytes);
}
}
}
private void deserializeEvent(DataInput in) throws IOException {
if (!in.readBoolean()) {
event = null;
return;
}
eventType = EventType.values()[in.readInt()];
eventReceivedTime = in.readLong();
if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
// TODO NEWTEZ convert to PB
event = new TaskStatusUpdateEvent();
((TaskStatusUpdateEvent)event).readFields(in);
} else {
int eventBytesLen = in.readInt();
byte[] eventBytes;
CodedInputStream input;
int startOffset = 0;
if (in instanceof DataInputBuffer) {
eventBytes = ((DataInputBuffer)in).getData();
startOffset = ((DataInputBuffer) in).getPosition();
} else {
eventBytes = new byte[eventBytesLen];
in.readFully(eventBytes);
}
input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen);
switch (eventType) {
case CUSTOM_PROCESSOR_EVENT:
CustomProcessorEventProto cpProto =
CustomProcessorEventProto.parseFrom(input);
event = ProtoConverters.convertCustomProcessorEventFromProto(cpProto);
break;
case DATA_MOVEMENT_EVENT:
DataMovementEventProto dmProto =
DataMovementEventProto.parseFrom(input);
event = ProtoConverters.convertDataMovementEventFromProto(dmProto);
break;
case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT:
CompositeRoutedDataMovementEventProto edmProto =
CompositeRoutedDataMovementEventProto.parseFrom(eventBytes);
event = ProtoConverters.convertCompositeRoutedDataMovementEventFromProto(edmProto);
break;
case COMPOSITE_DATA_MOVEMENT_EVENT:
CompositeEventProto cProto = CompositeEventProto.parseFrom(input);
event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
break;
case VERTEX_MANAGER_EVENT:
VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(input);
event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEventProto ideProto = InputReadErrorEventProto.parseFrom(input);
event = InputReadErrorEvent.create(ideProto.getDiagnostics(), ideProto.getIndex(),
ideProto.getVersion(), ideProto.getIsLocalFetch(), ideProto.getIsDiskErrorAtSource());
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEventProto tfProto =
TaskAttemptFailedEventProto.parseFrom(input);
event = new TaskAttemptFailedEvent(tfProto.getDiagnostics(),
TezConverterUtils.failureTypeFromProto(tfProto.getTaskFailureType()));
break;
case TASK_ATTEMPT_KILLED_EVENT:
TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(input);
event = new TaskAttemptKilledEvent(tkProto.getDiagnostics());
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
event = new TaskAttemptCompletedEvent();
break;
case INPUT_FAILED_EVENT:
InputFailedEventProto ifProto =
InputFailedEventProto.parseFrom(input);
event = InputFailedEvent.create(ifProto.getTargetIndex(), ifProto.getVersion());
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
.parseFrom(input);
event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto);
break;
case ROOT_INPUT_INITIALIZER_EVENT:
EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(input);
event = ProtoConverters.convertRootInputInitializerEventFromProto(riiProto);
break;
default:
// RootInputUpdatePayload event not wrapped in a TezEvent.
throw new TezUncheckedException("Unexpected TezEvent"
+ ", type=" + eventType);
}
if (in instanceof DataInputBuffer) {
// Skip so that position is updated
int skipped = in.skipBytes(eventBytesLen);
if (skipped != eventBytesLen) {
throw new TezUncheckedException("Expected to skip " + eventBytesLen + " bytes. Actually skipped = " + skipped);
}
}
}
}
@Override
public void write(DataOutput out) throws IOException {
serializeEvent(out);
if (sourceInfo != null) {
out.writeBoolean(true);
sourceInfo.write(out);
} else {
out.writeBoolean(false);
}
if (destinationInfo != null) {
out.writeBoolean(true);
destinationInfo.write(out);
} else {
out.writeBoolean(false);
}
}
@Override
public void readFields(DataInput in) throws IOException {
deserializeEvent(in);
if (in.readBoolean()) {
sourceInfo = new EventMetaData();
sourceInfo.readFields(in);
}
if (in.readBoolean()) {
destinationInfo = new EventMetaData();
destinationInfo.readFields(in);
}
}
@Override
public String toString() {
return "TezEvent{" +
"eventType=" + eventType +
", sourceInfo=" + sourceInfo +
", destinationInfo=" + destinationInfo +
'}';
}
}