blob: 2b21c897bf5dcb4ba3209c17b4f5c02f75574c25 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
public class TaskAttemptFinishedEvent implements HistoryEvent {
private static final Log LOG = LogFactory.getLog(TaskAttemptFinishedEvent.class);
private TezTaskAttemptID taskAttemptId;
private String vertexName;
private long startTime;
private long finishTime;
private TaskAttemptState state;
private String diagnostics;
private TezCounters tezCounters;
private TaskAttemptTerminationCause error;
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
long startTime,
long finishTime,
TaskAttemptState state,
TaskAttemptTerminationCause error,
String diagnostics, TezCounters counters) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.startTime = startTime;
this.finishTime = finishTime;
this.state = state;
this.diagnostics = diagnostics;
this.tezCounters = counters;
this.error = error;
}
public TaskAttemptFinishedEvent() {
}
@Override
public HistoryEventType getEventType() {
return HistoryEventType.TASK_ATTEMPT_FINISHED;
}
@Override
public boolean isRecoveryEvent() {
return true;
}
@Override
public boolean isHistoryEvent() {
return true;
}
public TaskAttemptFinishedProto toProto() {
TaskAttemptFinishedProto.Builder builder =
TaskAttemptFinishedProto.newBuilder();
builder.setTaskAttemptId(taskAttemptId.toString())
.setState(state.ordinal())
.setFinishTime(finishTime);
if (diagnostics != null) {
builder.setDiagnostics(diagnostics);
}
if (error != null) {
builder.setErrorEnum(error.name());
}
if (tezCounters != null) {
builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
}
return builder.build();
}
public void fromProto(TaskAttemptFinishedProto proto) {
this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
this.finishTime = proto.getFinishTime();
this.state = TaskAttemptState.values()[proto.getState()];
if (proto.hasDiagnostics()) {
this.diagnostics = proto.getDiagnostics();
}
if (proto.hasErrorEnum()) {
this.error = TaskAttemptTerminationCause.valueOf(proto.getErrorEnum());
}
if (proto.hasCounters()) {
this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
}
}
@Override
public void toProtoStream(OutputStream outputStream) throws IOException {
toProto().writeDelimitedTo(outputStream);
}
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
TaskAttemptFinishedProto proto =
TaskAttemptFinishedProto.parseDelimitedFrom(inputStream);
if (proto == null) {
throw new IOException("No data found in stream");
}
fromProto(proto);
}
@Override
public String toString() {
return "vertexName=" + vertexName
+ ", taskAttemptId=" + taskAttemptId
+ ", startTime=" + startTime
+ ", finishTime=" + finishTime
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
+ ", counters=" + (tezCounters == null ? "null" :
tezCounters.toString()
.replaceAll("\\n", ", ").replaceAll("\\s+", " "));
}
public TezTaskAttemptID getTaskAttemptID() {
return taskAttemptId;
}
public TezCounters getCounters() {
return tezCounters;
}
public String getDiagnostics() {
return diagnostics;
}
public TaskAttemptTerminationCause getTaskAttemptError() {
return error;
}
public long getFinishTime() {
return finishTime;
}
public TaskAttemptState getState() {
return state;
}
public long getStartTime() {
return startTime;
}
}