blob: 904c165684c0d0c2d26692367652e54efa51d8b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.logging.proto;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.app.web.AMWebController;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.KVPair;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Convert history event into HistoryEventProto message.
*/
public class HistoryEventProtoConverter {
private static final Logger log =
LoggerFactory.getLogger(HistoryEventProtoConverter.class);
private final ObjectMapper mapper = new ObjectMapper();
/**
* Convert a given history event to HistoryEventProto message.
*/
public HistoryEventProto convert(HistoryEvent historyEvent) {
validateEvent(historyEvent);
switch (historyEvent.getEventType()) {
case APP_LAUNCHED:
return convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
case AM_LAUNCHED:
return convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
case AM_STARTED:
return convertAMStartedEvent((AMStartedEvent) historyEvent);
case CONTAINER_LAUNCHED:
return convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
case CONTAINER_STOPPED:
return convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
case DAG_SUBMITTED:
return convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
case DAG_INITIALIZED:
return convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
case DAG_STARTED:
return convertDAGStartedEvent((DAGStartedEvent) historyEvent);
case DAG_FINISHED:
return convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
case VERTEX_INITIALIZED:
return convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
case VERTEX_STARTED:
return convertVertexStartedEvent((VertexStartedEvent) historyEvent);
case VERTEX_FINISHED:
return convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
case TASK_STARTED:
return convertTaskStartedEvent((TaskStartedEvent) historyEvent);
case TASK_FINISHED:
return convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
case TASK_ATTEMPT_STARTED:
return convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
case TASK_ATTEMPT_FINISHED:
return convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
case VERTEX_CONFIGURE_DONE:
return convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent);
case DAG_RECOVERED:
return convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent);
case VERTEX_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_FINISHED:
case DAG_COMMIT_STARTED:
case DAG_KILL_REQUEST:
throw new UnsupportedOperationException("Invalid Event, does not support history, eventType="
+ historyEvent.getEventType());
// Do not add default, if a new event type is added, we'll get a warning for the
// switch.
}
throw new UnsupportedOperationException(
"Unhandled Event, eventType=" + historyEvent.getEventType());
}
private void validateEvent(HistoryEvent event) {
if (!event.isHistoryEvent()) {
throw new UnsupportedOperationException(
"Invalid Event, does not support history" + ", eventType=" + event.getEventType());
}
}
private HistoryEventProto.Builder makeBuilderForEvent(HistoryEvent event, long time,
TezDAGID dagId, ApplicationId appId, ApplicationAttemptId appAttemptId, TezVertexID vertexId,
TezTaskID taskId, TezTaskAttemptID taskAttemptId, String user) {
HistoryEventProto.Builder builder = HistoryEventProto.newBuilder();
builder.setEventType(event.getEventType().name());
builder.setEventTime(time);
if (taskAttemptId != null) {
builder.setTaskAttemptId(taskAttemptId.toString());
taskId = taskAttemptId.getTaskID();
}
if (taskId != null) {
builder.setTaskId(taskId.toString());
vertexId = taskId.getVertexID();
}
if (vertexId != null) {
builder.setVertexId(vertexId.toString());
dagId = vertexId.getDAGID();
}
if (dagId != null) {
builder.setDagId(dagId.toString());
if (appId == null) {
appId = dagId.getApplicationId();
}
}
if (appAttemptId != null) {
builder.setAppAttemptId(appAttemptId.toString());
if (appId == null) {
appId = appAttemptId.getApplicationId();
}
}
if (appId != null) {
builder.setAppId(appId.toString());
}
if (user != null) {
builder.setUser(user);
}
return builder;
}
private void addEventData(HistoryEventProto.Builder builder, String key, String value) {
if (value == null) {
return;
}
builder.addEventData(KVPair.newBuilder().setKey(key).setValue(value));
}
private void addEventData(HistoryEventProto.Builder builder, String key, Number value) {
builder.addEventData(KVPair.newBuilder().setKey(key).setValue(value.toString()));
}
private void addEventData(HistoryEventProto.Builder builder, String key,
Map<String, Object> value) {
try {
builder.addEventData(
KVPair.newBuilder().setKey(key).setValue(mapper.writeValueAsString(value)));
} catch (IOException e) {
log.error("Error converting value for key {} to json: ", key, e);
}
}
private HistoryEventProto convertAppLaunchedEvent(AppLaunchedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null,
event.getApplicationId(), null, null, null, null, event.getUser());
// This is ok as long as we do not modify the underlying map.
@SuppressWarnings({ "unchecked", "rawtypes" })
Map<String, Object> confMap = (Map)DAGUtils.convertConfigurationToATSMap(event.getConf());
addEventData(builder, ATSConstants.CONFIG, confMap);
if (event.getVersion() != null) {
addEventData(builder, ATSConstants.TEZ_VERSION,
DAGUtils.convertTezVersionToATSMap(event.getVersion()));
}
addEventData(builder, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION);
return builder.build();
}
private HistoryEventProto convertAMLaunchedEvent(AMLaunchedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null,
null, event.getApplicationAttemptId(), null, null, null, event.getUser());
addEventData(builder, ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
return builder.build();
}
private HistoryEventProto convertAMStartedEvent(AMStartedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(), null,
null, event.getApplicationAttemptId(), null, null, null, event.getUser());
return builder.build();
}
private HistoryEventProto convertContainerLaunchedEvent(ContainerLaunchedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null,
null, event.getApplicationAttemptId(), null, null, null, null);
addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString());
return builder.build();
}
private HistoryEventProto convertContainerStoppedEvent(ContainerStoppedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStoppedTime(), null,
null, event.getApplicationAttemptId(), null, null, null, null);
addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString());
addEventData(builder, ATSConstants.EXIT_STATUS, event.getExitStatus());
addEventData(builder, ATSConstants.FINISH_TIME, event.getStoppedTime());
return builder.build();
}
private HistoryEventProto convertDAGSubmittedEvent(DAGSubmittedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getSubmitTime(),
event.getDAGID(), null, event.getApplicationAttemptId(), null, null, null,
event.getUser());
addEventData(builder, ATSConstants.DAG_NAME, event.getDAGName());
if (event.getDAGPlan().hasCallerContext() &&
event.getDAGPlan().getCallerContext().hasCallerId()) {
CallerContextProto callerContext = event.getDagPlan().getCallerContext();
addEventData(builder, ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId());
addEventData(builder, ATSConstants.CALLER_CONTEXT_TYPE, callerContext.getCallerType());
addEventData(builder, ATSConstants.CALLER_CONTEXT, callerContext.getContext());
}
if (event.getQueueName() != null) {
addEventData(builder, ATSConstants.DAG_QUEUE_NAME, event.getQueueName());
}
addEventData(builder, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION);
addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL + "_" +
event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs());
try {
addEventData(builder, ATSConstants.DAG_PLAN,
DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
} catch (IOException e) {
throw new TezUncheckedException(e);
}
return builder.build();
}
private HistoryEventProto convertDAGInitializedEvent(DAGInitializedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getInitTime(),
event.getDAGID(), null, null, null, null, null, event.getUser());
addEventData(builder, ATSConstants.DAG_NAME, event.getDagName());
if (event.getVertexNameIDMap() != null) {
Map<String, Object> nameIdStrMap = new TreeMap<>();
for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
}
addEventData(builder, ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
}
return builder.build();
}
private HistoryEventProto convertDAGStartedEvent(DAGStartedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(),
event.getDAGID(), null, null, null, null, null, event.getUser());
addEventData(builder, ATSConstants.DAG_NAME, event.getDagName());
addEventData(builder, ATSConstants.STATUS, event.getDagState().name());
return builder.build();
}
private HistoryEventProto convertDAGFinishedEvent(DAGFinishedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
event.getDAGID(), null, event.getApplicationAttemptId(), null, null, null,
event.getUser());
addEventData(builder, ATSConstants.DAG_NAME, event.getDagName());
if (event.getDAGPlan().hasCallerContext()) {
if (event.getDAGPlan().getCallerContext().hasCallerType()) {
addEventData(builder, ATSConstants.CALLER_CONTEXT_TYPE,
event.getDAGPlan().getCallerContext().getCallerType());
}
if (event.getDAGPlan().getCallerContext().hasCallerId()) {
addEventData(builder, ATSConstants.CALLER_CONTEXT_ID,
event.getDAGPlan().getCallerContext().getCallerId());
}
}
addEventData(builder, ATSConstants.START_TIME, event.getStartTime());
addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
addEventData(builder, ATSConstants.STATUS, event.getState().name());
addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics());
addEventData(builder, ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
event.getApplicationAttemptId().toString());
addEventData(builder, ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getTezCounters()));
Map<String, Integer> dagTaskStats = event.getDagTaskStats();
if (dagTaskStats != null) {
for (Entry<String, Integer> entry : dagTaskStats.entrySet()) {
addEventData(builder, entry.getKey(), entry.getValue());
}
}
return builder.build();
}
private HistoryEventProto convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(),
null, null, null, null, null, event.getTaskAttemptID(), null);
if (event.getInProgressLogsUrl() != null) {
addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
}
if (event.getCompletedLogsUrl() != null) {
addEventData(builder, ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
}
addEventData(builder, ATSConstants.NODE_ID, event.getNodeId().toString());
addEventData(builder, ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString());
addEventData(builder, ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
return builder.build();
}
private HistoryEventProto convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
null, null, null, null, null, event.getTaskAttemptID(), null);
addEventData(builder, ATSConstants.STATUS, event.getState().name());
if (event.getTaskFailureType() != null) {
addEventData(builder, ATSConstants.TASK_FAILURE_TYPE, event.getTaskFailureType().name());
}
addEventData(builder, ATSConstants.CREATION_TIME, event.getCreationTime());
addEventData(builder, ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
addEventData(builder, ATSConstants.START_TIME, event.getStartTime());
if (event.getCreationCausalTA() != null) {
addEventData(builder, ATSConstants.CREATION_CAUSAL_ATTEMPT,
event.getCreationCausalTA().toString());
}
addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
addEventData(builder, ATSConstants.STATUS, event.getState().name());
if (event.getTaskAttemptError() != null) {
addEventData(builder, ATSConstants.TASK_ATTEMPT_ERROR_ENUM,
event.getTaskAttemptError().name());
}
addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics());
addEventData(builder, ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getCounters()));
if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
addEventData(builder, ATSConstants.LAST_DATA_EVENTS,
DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents()));
}
if (event.getNodeId() != null) {
addEventData(builder, ATSConstants.NODE_ID, event.getNodeId().toString());
}
if (event.getContainerId() != null) {
addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString());
}
if (event.getInProgressLogsUrl() != null) {
addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
}
if (event.getCompletedLogsUrl() != null) {
addEventData(builder, ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
}
if (event.getNodeHttpAddress() != null) {
addEventData(builder, ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
}
return builder.build();
}
private HistoryEventProto convertTaskFinishedEvent(TaskFinishedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
null, null, null, null, event.getTaskID(), null, null);
addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
addEventData(builder, ATSConstants.STATUS, event.getState().name());
addEventData(builder, ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, event.getNumFailedAttempts());
if (event.getSuccessfulAttemptID() != null) {
addEventData(builder, ATSConstants.SUCCESSFUL_ATTEMPT_ID,
event.getSuccessfulAttemptID().toString());
}
addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics());
addEventData(builder, ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getTezCounters()));
return builder.build();
}
private HistoryEventProto convertTaskStartedEvent(TaskStartedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(),
null, null, null, null, event.getTaskID(), null, null);
addEventData(builder, ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
addEventData(builder, ATSConstants.STATUS, event.getState().name());
return builder.build();
}
private HistoryEventProto convertVertexFinishedEvent(VertexFinishedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
null, null, null, event.getVertexID(), null, null, null);
addEventData(builder, ATSConstants.STATUS, event.getState().name());
addEventData(builder, ATSConstants.VERTEX_NAME, event.getVertexName());
addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics());
addEventData(builder, ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getTezCounters()));
addEventData(builder, ATSConstants.STATS,
DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
if (event.getServicePluginInfo() != null) {
addEventData(builder, ATSConstants.SERVICE_PLUGIN,
DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo()));
}
final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
if (vertexTaskStats != null) {
for (Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
addEventData(builder, entry.getKey(), entry.getValue());
}
}
return builder.build();
}
private HistoryEventProto convertVertexInitializedEvent(VertexInitializedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getInitedTime(),
null, null, null, event.getVertexID(), null, null, null);
addEventData(builder, ATSConstants.VERTEX_NAME, event.getVertexName());
addEventData(builder, ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
addEventData(builder, ATSConstants.INIT_TIME, event.getInitedTime());
addEventData(builder, ATSConstants.NUM_TASKS, event.getNumTasks());
addEventData(builder, ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
if (event.getServicePluginInfo() != null) {
addEventData(builder, ATSConstants.SERVICE_PLUGIN,
DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo()));
}
return builder.build();
}
private HistoryEventProto convertVertexStartedEvent(VertexStartedEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(),
null, null, null, event.getVertexID(), null, null, null);
addEventData(builder, ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
addEventData(builder, ATSConstants.STATUS, event.getVertexState().name());
return builder.build();
}
private HistoryEventProto convertVertexReconfigureDoneEvent(VertexConfigurationDoneEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getReconfigureDoneTime(),
null, null, null, event.getVertexID(), null, null, null);
if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
Map<String, Object> updatedEdgeManagers = new HashMap<>();
for (Entry<String, EdgeProperty> entry : event.getSourceEdgeProperties().entrySet()) {
updatedEdgeManagers.put(entry.getKey(), DAGUtils.convertEdgeProperty(entry.getValue()));
}
addEventData(builder, ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
}
addEventData(builder, ATSConstants.NUM_TASKS, event.getNumTasks());
return builder.build();
}
private HistoryEventProto convertDAGRecoveredEvent(DAGRecoveredEvent event) {
HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getRecoveredTime(),
event.getDagID(), null, event.getApplicationAttemptId(), null, null, null,
event.getUser());
addEventData(builder, ATSConstants.DAG_NAME, event.getDagName());
if (event.getRecoveredDagState() != null) {
addEventData(builder, ATSConstants.DAG_STATE, event.getRecoveredDagState().name());
}
if (event.getRecoveryFailureReason() != null) {
addEventData(builder, ATSConstants.RECOVERY_FAILURE_REASON,
event.getRecoveryFailureReason());
}
addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL + "_" +
event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs());
return builder.build();
}
}