blob: 79a0c344eed38b9b61b390ada5688222d61490bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.logging.impl;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezVertexID;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
public class HistoryEventJsonConversion {
public static JSONObject convertToJson(HistoryEvent historyEvent) throws JSONException {
if (!historyEvent.isHistoryEvent()) {
throw new UnsupportedOperationException("Invalid Event, does not support history"
+ ", eventType=" + historyEvent.getEventType());
}
JSONObject jsonObject = null;
switch (historyEvent.getEventType()) {
case APP_LAUNCHED:
jsonObject = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
break;
case AM_LAUNCHED:
jsonObject = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
break;
case AM_STARTED:
jsonObject = convertAMStartedEvent((AMStartedEvent) historyEvent);
break;
case CONTAINER_LAUNCHED:
jsonObject = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
break;
case CONTAINER_STOPPED:
jsonObject = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
break;
case DAG_SUBMITTED:
jsonObject = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
break;
case DAG_INITIALIZED:
jsonObject = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
break;
case DAG_STARTED:
jsonObject = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
break;
case DAG_FINISHED:
jsonObject = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
break;
case VERTEX_INITIALIZED:
jsonObject = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
break;
case VERTEX_STARTED:
jsonObject = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
break;
case VERTEX_FINISHED:
jsonObject = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
break;
case TASK_STARTED:
jsonObject = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
break;
case TASK_FINISHED:
jsonObject = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
break;
case TASK_ATTEMPT_STARTED:
jsonObject = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
break;
case TASK_ATTEMPT_FINISHED:
jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
break;
case VERTEX_PARALLELISM_UPDATED:
jsonObject = convertVertexParallelismUpdatedEvent((VertexParallelismUpdatedEvent) historyEvent);
break;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
case VERTEX_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_FINISHED:
case DAG_COMMIT_STARTED:
throw new UnsupportedOperationException("Invalid Event, does not support history"
+ ", eventType=" + historyEvent.getEventType());
default:
throw new UnsupportedOperationException("Unhandled Event"
+ ", eventType=" + historyEvent.getEventType());
}
return jsonObject;
}
private static JSONObject convertAppLaunchedEvent(AppLaunchedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
"tez_" + event.getApplicationId().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION.name());
// Other info to tag with Tez App
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.USER, event.getUser());
otherInfo.put(ATSConstants.CONFIG, new JSONObject(
DAGUtils.convertConfigurationToATSMap(event.getConf())));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertAMLaunchedEvent(AMLaunchedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
"tez_" + event.getApplicationAttemptId().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
// Related Entities
JSONArray relatedEntities = new JSONArray();
JSONObject appEntity = new JSONObject();
appEntity.put(ATSConstants.ENTITY,
event.getApplicationAttemptId().getApplicationId().toString());
appEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ID);
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY,
event.getApplicationAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ATTEMPT_ID);
relatedEntities.put(appEntity);
relatedEntities.put(appAttemptEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject initEvent = new JSONObject();
initEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
initEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.AM_LAUNCHED.name());
events.put(initEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info to tag with Tez AM
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertAMStartedEvent(AMStartedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
"tez_" + event.getApplicationAttemptId().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
// Related Entities
JSONArray relatedEntities = new JSONArray();
JSONObject appEntity = new JSONObject();
appEntity.put(ATSConstants.ENTITY,
event.getApplicationAttemptId().getApplicationId().toString());
appEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ID);
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY,
event.getApplicationAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ATTEMPT_ID);
relatedEntities.put(appEntity);
relatedEntities.put(appAttemptEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.AM_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
return jsonObject; }
private static JSONObject convertContainerLaunchedEvent(ContainerLaunchedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
"tez_" + event.getContainerId().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_CONTAINER_ID.name());
JSONArray relatedEntities = new JSONArray();
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY,
event.getApplicationAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
JSONObject containerEntity = new JSONObject();
containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
relatedEntities.put(appAttemptEntity);
relatedEntities.put(containerEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject launchEvent = new JSONObject();
launchEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
launchEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.CONTAINER_LAUNCHED.name());
events.put(launchEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// TODO add other container info here? or assume AHS will have this?
// TODO container logs?
return jsonObject;
}
private static JSONObject convertContainerStoppedEvent(ContainerStoppedEvent event) throws JSONException {
// structure is identical to ContainerLaunchedEvent
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
"tez_" + event.getContainerId().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_CONTAINER_ID.name());
JSONArray relatedEntities = new JSONArray();
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY,
event.getApplicationAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
JSONObject containerEntity = new JSONObject();
containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
relatedEntities.put(appAttemptEntity);
relatedEntities.put(containerEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject stopEvent = new JSONObject();
stopEvent.put(ATSConstants.TIMESTAMP, event.getStoppedTime());
stopEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.CONTAINER_STOPPED.name());
events.put(stopEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// TODO add other container info here? or assume AHS will have this?
// TODO container logs?
// Other info
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.EXIT_STATUS, event.getExitStatus());
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject; }
private static JSONObject convertDAGFinishedEvent(DAGFinishedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
event.getDagID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_DAG_ID.name());
// Related Entities not needed as should have been done in
// dag submission event
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject finishEvent = new JSONObject();
finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
finishEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_FINISHED.name());
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
otherInfo.put(ATSConstants.STATUS, event.getState().name());
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getTezCounters()));
final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
if (dagTaskStats != null) {
for(Entry<String, Integer> entry : dagTaskStats.entrySet()) {
otherInfo.put(entry.getKey(), entry.getValue().intValue());
}
}
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertDAGInitializedEvent(DAGInitializedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
event.getDagID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_DAG_ID.name());
// Related Entities not needed as should have been done in
// dag submission event
JSONArray events = new JSONArray();
JSONObject initEvent = new JSONObject();
initEvent.put(ATSConstants.TIMESTAMP, event.getInitTime());
initEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_INITIALIZED.name());
events.put(initEvent);
jsonObject.put(ATSConstants.EVENTS, events);
JSONObject otherInfo = new JSONObject();
if (event.getVertexNameIDMap() != null) {
Map<String, String> nameIdStrMap = new TreeMap<String, String>();
for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
}
otherInfo.put(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
}
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertDAGStartedEvent(DAGStartedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
event.getDagID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_DAG_ID.name());
// Related Entities not needed as should have been done in
// dag submission event
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
return jsonObject;
}
private static JSONObject convertDAGSubmittedEvent(DAGSubmittedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
event.getDagID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_DAG_ID.name());
// Related Entities
JSONArray relatedEntities = new JSONArray();
JSONObject tezAppEntity = new JSONObject();
tezAppEntity.put(ATSConstants.ENTITY,
"tez_" + event.getApplicationAttemptId().getApplicationId().toString());
tezAppEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION.name());
JSONObject tezAppAttemptEntity = new JSONObject();
tezAppAttemptEntity.put(ATSConstants.ENTITY,
"tez_" + event.getApplicationAttemptId().toString());
tezAppAttemptEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
JSONObject appEntity = new JSONObject();
appEntity.put(ATSConstants.ENTITY,
event.getApplicationAttemptId().getApplicationId().toString());
appEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ID);
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY,
event.getApplicationAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ATTEMPT_ID);
JSONObject userEntity = new JSONObject();
userEntity.put(ATSConstants.ENTITY,
event.getUser());
userEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.USER);
relatedEntities.put(tezAppEntity);
relatedEntities.put(tezAppAttemptEntity);
relatedEntities.put(appEntity);
relatedEntities.put(appAttemptEntity);
relatedEntities.put(userEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// filters
JSONObject primaryFilters = new JSONObject();
primaryFilters.put(ATSConstants.DAG_NAME,
event.getDAGName());
jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject submitEvent = new JSONObject();
submitEvent.put(ATSConstants.TIMESTAMP, event.getSubmitTime());
submitEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_SUBMITTED.name());
events.put(submitEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info such as dag plan
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.DAG_PLAN,
DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
// Events
JSONArray events = new JSONArray();
JSONObject finishEvent = new JSONObject();
finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
finishEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.TASK_ATTEMPT_FINISHED.name());
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
otherInfo.put(ATSConstants.STATUS, event.getState().name());
if (event.getTaskAttemptError() != null) {
otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
}
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getCounters()));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
// Related entities
JSONArray relatedEntities = new JSONArray();
JSONObject nodeEntity = new JSONObject();
nodeEntity.put(ATSConstants.ENTITY, event.getNodeId().toString());
nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
JSONObject containerEntity = new JSONObject();
containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
JSONObject taskEntity = new JSONObject();
taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptID().getTaskID().toString());
taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
relatedEntities.put(nodeEntity);
relatedEntities.put(containerEntity);
relatedEntities.put(taskEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// Events
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.TASK_ATTEMPT_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertTaskFinishedEvent(TaskFinishedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
// Events
JSONArray events = new JSONArray();
JSONObject finishEvent = new JSONObject();
finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
finishEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.TASK_FINISHED.name());
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
otherInfo.put(ATSConstants.STATUS, event.getState().name());
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getTezCounters()));
if (event.getSuccessfulAttemptID() != null) {
otherInfo.put(ATSConstants.SUCCESSFUL_ATTEMPT_ID, event.getSuccessfulAttemptID().toString());
}
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertTaskStartedEvent(TaskStartedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
// Related entities
JSONArray relatedEntities = new JSONArray();
JSONObject vertexEntity = new JSONObject();
vertexEntity.put(ATSConstants.ENTITY, event.getTaskID().getVertexID().toString());
vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
relatedEntities.put(vertexEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// Events
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.TASK_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
// TODO fix schedule/launch time to be events
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertVertexFinishedEvent(VertexFinishedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
// Events
JSONArray events = new JSONArray();
JSONObject finishEvent = new JSONObject();
finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
finishEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.VERTEX_FINISHED.name());
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
otherInfo.put(ATSConstants.STATUS, event.getState().name());
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getTezCounters()));
otherInfo.put(ATSConstants.STATS,
DAGUtils.convertVertexStatsToJSON(event.getVertexStats()));
final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
if (vertexTaskStats != null) {
for(Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
otherInfo.put(entry.getKey(), entry.getValue().intValue());
}
}
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
// Related entities
JSONArray relatedEntities = new JSONArray();
JSONObject vertexEntity = new JSONObject();
vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
relatedEntities.put(vertexEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// Events
JSONArray events = new JSONArray();
JSONObject initEvent = new JSONObject();
initEvent.put(ATSConstants.TIMESTAMP, event.getInitedTime());
initEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.VERTEX_INITIALIZED.name());
events.put(initEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
// TODO fix requested times to be events
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.VERTEX_NAME, event.getVertexName());
otherInfo.put(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
otherInfo.put(ATSConstants.INIT_TIME, event.getInitedTime());
otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertVertexStartedEvent(VertexStartedEvent event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
// Related entities
JSONArray relatedEntities = new JSONArray();
JSONObject vertexEntity = new JSONObject();
vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
relatedEntities.put(vertexEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// Events
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.VERTEX_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
// TODO fix requested times to be events
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertVertexParallelismUpdatedEvent(
VertexParallelismUpdatedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
// Events
JSONArray events = new JSONArray();
JSONObject updateEvent = new JSONObject();
updateEvent.put(ATSConstants.TIMESTAMP, event.getUpdateTime());
updateEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
JSONObject eventInfo = new JSONObject();
eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) {
JSONObject updatedEdgeManagers = new JSONObject();
for (Entry<String, EdgeManagerPluginDescriptor> entry :
event.getSourceEdgeManagers().entrySet()) {
updatedEdgeManagers.put(entry.getKey(),
new JSONObject(DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue())));
}
eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
}
updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
events.put(updateEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
// TODO add more on all other updated information
return jsonObject;
}
}