blob: ef84b2ec61d283dd9ae3555a6588b0f510dcbeef [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.logging.proto;
import java.util.Iterator;
import java.util.Optional;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.KVPair;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
/**
* Convert HistoryEventProto into JSONObject for analyzers, which can already consume the output of
* SimpleHistoryLoggingService's JSONs. This class is based on HistoryEventJsonConversion, and all
* of the specific HistoryEvent calls were transformed info HistoryEventProto calls by taking the
* corresponding HistoryEventProtoConverter methods into consideration.
*/
public final class HistoryEventProtoJsonConversion {
private HistoryEventProtoJsonConversion() {
}
public static JSONObject convertToJson(HistoryEventProto historyEvent) throws JSONException {
JSONObject jsonObject = null;
switch (historyEvent.getEventType()) {
case "APP_LAUNCHED":
jsonObject = convertAppLaunchedEvent(historyEvent);
break;
case "AM_LAUNCHED":
jsonObject = convertAMLaunchedEvent(historyEvent);
break;
case "AM_STARTED":
jsonObject = convertAMStartedEvent(historyEvent);
break;
case "CONTAINER_LAUNCHED":
jsonObject = convertContainerLaunchedEvent(historyEvent);
break;
case "CONTAINER_STOPPED":
jsonObject = convertContainerStoppedEvent(historyEvent);
break;
case "DAG_SUBMITTED":
jsonObject = convertDAGSubmittedEvent(historyEvent);
break;
case "DAG_INITIALIZED":
jsonObject = convertDAGInitializedEvent(historyEvent);
break;
case "DAG_STARTED":
jsonObject = convertDAGStartedEvent(historyEvent);
break;
case "DAG_FINISHED":
jsonObject = convertDAGFinishedEvent(historyEvent);
break;
case "VERTEX_INITIALIZED":
jsonObject = convertVertexInitializedEvent(historyEvent);
break;
case "VERTEX_STARTED":
jsonObject = convertVertexStartedEvent(historyEvent);
break;
case "VERTEX_FINISHED":
jsonObject = convertVertexFinishedEvent(historyEvent);
break;
case "TASK_STARTED":
jsonObject = convertTaskStartedEvent(historyEvent);
break;
case "TASK_FINISHED":
jsonObject = convertTaskFinishedEvent(historyEvent);
break;
case "TASK_ATTEMPT_STARTED":
jsonObject = convertTaskAttemptStartedEvent(historyEvent);
break;
case "TASK_ATTEMPT_FINISHED":
jsonObject = convertTaskAttemptFinishedEvent(historyEvent);
break;
case "VERTEX_CONFIGURE_DONE":
jsonObject = convertVertexReconfigureDoneEvent(historyEvent);
break;
case "DAG_RECOVERED":
jsonObject = convertDAGRecoveredEvent(historyEvent);
break;
case "VERTEX_COMMIT_STARTED":
case "VERTEX_GROUP_COMMIT_STARTED":
case "VERTEX_GROUP_COMMIT_FINISHED":
case "DAG_COMMIT_STARTED":
throw new UnsupportedOperationException(
"Invalid Event, does not support history" + ", eventType=" + historyEvent.getEventType());
default:
throw new UnsupportedOperationException(
"Unhandled Event" + ", eventType=" + historyEvent.getEventType());
}
return jsonObject;
}
private static JSONObject convertDAGRecoveredEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getDagId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
// Related Entities not needed as should have been done in
// dag submission event
JSONArray events = new JSONArray();
JSONObject recoverEvent = new JSONObject();
recoverEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
recoverEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.DAG_RECOVERED.name());
JSONObject recoverEventInfo = new JSONObject();
recoverEventInfo.put(ATSConstants.APPLICATION_ATTEMPT_ID, event.getAppAttemptId().toString());
recoverEventInfo.put(ATSConstants.DAG_STATE, getDataValueByKey(event, ATSConstants.DAG_STATE));
recoverEventInfo.put(ATSConstants.RECOVERY_FAILURE_REASON,
getDataValueByKey(event, ATSConstants.RECOVERY_FAILURE_REASON));
recoverEvent.put(ATSConstants.EVENT_INFO, recoverEventInfo);
events.put(recoverEvent);
jsonObject.put(ATSConstants.EVENTS, events);
return jsonObject;
}
private static JSONObject convertAppLaunchedEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, "tez_" + event.getAppId().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_APPLICATION.name());
// Other info to tag with Tez App
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.USER, event.getUser());
otherInfo.put(ATSConstants.CONFIG, new JSONObject()); // TODO: config from proto?
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertAMLaunchedEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, "tez_" + event.getAppAttemptId().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
// Related Entities
JSONArray relatedEntities = new JSONArray();
JSONObject appEntity = new JSONObject();
appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY, event.getAppAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ATTEMPT_ID);
relatedEntities.put(appEntity);
relatedEntities.put(appAttemptEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject initEvent = new JSONObject();
initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
initEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.AM_LAUNCHED.name());
events.put(initEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info to tag with Tez AM
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.APP_SUBMIT_TIME,
getDataValueByKey(event, ATSConstants.APP_SUBMIT_TIME));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertAMStartedEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, "tez_" + event.getAppAttemptId().toString());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
// Related Entities
JSONArray relatedEntities = new JSONArray();
JSONObject appEntity = new JSONObject();
appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY, event.getAppAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ATTEMPT_ID);
relatedEntities.put(appEntity);
relatedEntities.put(appAttemptEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
startEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.AM_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
return jsonObject;
}
private static JSONObject convertContainerLaunchedEvent(HistoryEventProto event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
"tez_" + getDataValueByKey(event, ATSConstants.CONTAINER_ID));
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_CONTAINER_ID.name());
JSONArray relatedEntities = new JSONArray();
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY, event.getAppAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
JSONObject containerEntity = new JSONObject();
containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event, ATSConstants.CONTAINER_ID));
containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
relatedEntities.put(appAttemptEntity);
relatedEntities.put(containerEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject launchEvent = new JSONObject();
launchEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
launchEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.CONTAINER_LAUNCHED.name());
events.put(launchEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// TODO add other container info here? or assume AHS will have this?
// TODO container logs?
return jsonObject;
}
private static JSONObject convertContainerStoppedEvent(HistoryEventProto event)
throws JSONException {
// structure is identical to ContainerLaunchedEvent
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
"tez_" + getDataValueByKey(event, ATSConstants.CONTAINER_ID));
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_CONTAINER_ID.name());
JSONArray relatedEntities = new JSONArray();
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY, event.getAppAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
JSONObject containerEntity = new JSONObject();
containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event, ATSConstants.CONTAINER_ID));
containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
relatedEntities.put(appAttemptEntity);
relatedEntities.put(containerEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject stopEvent = new JSONObject();
stopEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
stopEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.CONTAINER_STOPPED.name());
events.put(stopEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// TODO add other container info here? or assume AHS will have this?
// TODO container logs?
// Other info
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.EXIT_STATUS, getDataValueByKey(event, ATSConstants.EXIT_STATUS));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertDAGFinishedEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getDagId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
// Related Entities not needed as should have been done in
// dag submission event
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject finishEvent = new JSONObject();
finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
finishEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.DAG_FINISHED.name());
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
JSONObject otherInfo = new JSONObject();
long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
otherInfo.put(ATSConstants.START_TIME, startTime);
otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS));
otherInfo.put(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
event.getAppAttemptId().toString());
// added all info to otherInfo in order to cover
// all key/value pairs added from event.getDagTaskStats()
Iterator<KVPair> it = event.getEventDataList().iterator();
while (it.hasNext()) {
KVPair pair = it.next();
otherInfo.put(pair.getKey(), pair.getValue());
}
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertDAGInitializedEvent(HistoryEventProto event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getDagId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
// Related Entities not needed as should have been done in
// dag submission event
JSONArray events = new JSONArray();
JSONObject initEvent = new JSONObject();
initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
initEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.DAG_INITIALIZED.name());
events.put(initEvent);
jsonObject.put(ATSConstants.EVENTS, events);
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.VERTEX_NAME_ID_MAPPING,
getJSONDataValueByKey(event, ATSConstants.VERTEX_NAME_ID_MAPPING));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertDAGStartedEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getDagId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
// Related Entities not needed as should have been done in
// dag submission event
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
startEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.DAG_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
return jsonObject;
}
private static JSONObject convertDAGSubmittedEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getDagId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
// Related Entities
JSONArray relatedEntities = new JSONArray();
JSONObject tezAppEntity = new JSONObject();
tezAppEntity.put(ATSConstants.ENTITY, "tez_" + event.getAppId().toString());
tezAppEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_APPLICATION.name());
JSONObject tezAppAttemptEntity = new JSONObject();
tezAppAttemptEntity.put(ATSConstants.ENTITY, "tez_" + event.getAppAttemptId().toString());
tezAppAttemptEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
JSONObject appEntity = new JSONObject();
appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
JSONObject appAttemptEntity = new JSONObject();
appAttemptEntity.put(ATSConstants.ENTITY, event.getAppAttemptId().toString());
appAttemptEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ATTEMPT_ID);
JSONObject userEntity = new JSONObject();
userEntity.put(ATSConstants.ENTITY, event.getUser());
userEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.USER);
relatedEntities.put(tezAppEntity);
relatedEntities.put(tezAppAttemptEntity);
relatedEntities.put(appEntity);
relatedEntities.put(appAttemptEntity);
relatedEntities.put(userEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// filters
JSONObject primaryFilters = new JSONObject();
primaryFilters.put(ATSConstants.DAG_NAME, getDataValueByKey(event, ATSConstants.DAG_NAME));
primaryFilters.put(ATSConstants.CALLER_CONTEXT_ID,
getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_ID));
primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE,
getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_TYPE));
primaryFilters.put(ATSConstants.DAG_QUEUE_NAME,
getDataValueByKey(event, ATSConstants.DAG_QUEUE_NAME));
jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
// TODO decide whether this goes into different events,
// event info or other info.
JSONArray events = new JSONArray();
JSONObject submitEvent = new JSONObject();
submitEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
submitEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.DAG_SUBMITTED.name());
events.put(submitEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info such as dag plan
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.DAG_PLAN, getJSONDataValueByKey(event, ATSConstants.DAG_PLAN));
otherInfo.put(ATSConstants.CALLER_CONTEXT_ID,
getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_ID));
otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_TYPE));
otherInfo.put(ATSConstants.DAG_QUEUE_NAME,
getDataValueByKey(event, ATSConstants.DAG_QUEUE_NAME));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertTaskAttemptFinishedEvent(HistoryEventProto event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
// Events
JSONArray events = new JSONArray();
JSONObject finishEvent = new JSONObject();
finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
finishEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.TASK_ATTEMPT_FINISHED.name());
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
JSONObject otherInfo = new JSONObject();
long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
otherInfo.put(ATSConstants.CREATION_TIME, getDataValueByKey(event, ATSConstants.CREATION_TIME));
otherInfo.put(ATSConstants.ALLOCATION_TIME,
getDataValueByKey(event, ATSConstants.ALLOCATION_TIME));
otherInfo.put(ATSConstants.START_TIME, startTime);
otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT,
getDataValueByKey(event, ATSConstants.CREATION_CAUSAL_ATTEMPT));
otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM,
getDataValueByKey(event, ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
otherInfo.put(ATSConstants.TASK_FAILURE_TYPE,
getDataValueByKey(event, ATSConstants.TASK_FAILURE_TYPE));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS));
otherInfo.put(ATSConstants.LAST_DATA_EVENTS,
getJSONDataValueByKey(event, ATSConstants.LAST_DATA_EVENTS));
otherInfo.put(ATSConstants.NODE_ID, getDataValueByKey(event, ATSConstants.NODE_ID));
otherInfo.put(ATSConstants.CONTAINER_ID, getDataValueByKey(event, ATSConstants.CONTAINER_ID));
otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL,
getDataValueByKey(event, ATSConstants.IN_PROGRESS_LOGS_URL));
otherInfo.put(ATSConstants.COMPLETED_LOGS_URL,
getDataValueByKey(event, ATSConstants.COMPLETED_LOGS_URL));
otherInfo.put(ATSConstants.NODE_HTTP_ADDRESS,
getDataValueByKey(event, ATSConstants.NODE_HTTP_ADDRESS));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertTaskAttemptStartedEvent(HistoryEventProto event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
// Related entities
JSONArray relatedEntities = new JSONArray();
JSONObject nodeEntity = new JSONObject();
nodeEntity.put(ATSConstants.ENTITY, getDataValueByKey(event, ATSConstants.NODE_ID));
nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
JSONObject containerEntity = new JSONObject();
containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event, ATSConstants.CONTAINER_ID));
containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
JSONObject taskEntity = new JSONObject();
taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptId());
taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
relatedEntities.put(nodeEntity);
relatedEntities.put(containerEntity);
relatedEntities.put(taskEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// Events
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
startEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.TASK_ATTEMPT_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL,
getDataValueByKey(event, ATSConstants.IN_PROGRESS_LOGS_URL));
otherInfo.put(ATSConstants.COMPLETED_LOGS_URL,
getDataValueByKey(event, ATSConstants.COMPLETED_LOGS_URL));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertTaskFinishedEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getTaskId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
// Events
JSONArray events = new JSONArray();
JSONObject finishEvent = new JSONObject();
finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
finishEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.TASK_FINISHED.name());
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);
otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS));
otherInfo.put(ATSConstants.SUCCESSFUL_ATTEMPT_ID,
getDataValueByKey(event, ATSConstants.SUCCESSFUL_ATTEMPT_ID));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertTaskStartedEvent(HistoryEventProto event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getTaskId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
// Related entities
JSONArray relatedEntities = new JSONArray();
JSONObject vertexEntity = new JSONObject();
vertexEntity.put(ATSConstants.ENTITY, event.getVertexId());
vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
relatedEntities.put(vertexEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// Events
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
startEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.TASK_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
// TODO fix schedule/launch time to be events
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getEventTime());
otherInfo.put(ATSConstants.SCHEDULED_TIME,
getDataValueByKey(event, ATSConstants.SCHEDULED_TIME));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertVertexFinishedEvent(HistoryEventProto event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
// Events
JSONArray events = new JSONArray();
JSONObject finishEvent = new JSONObject();
finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
finishEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.VERTEX_FINISHED.name());
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);
long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);
otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS));
otherInfo.put(ATSConstants.STATS, getJSONDataValueByKey(event, ATSConstants.STATS));
// added all info to otherInfo in order to cover
// all key/value pairs added from event.getVertexTaskStats()
Iterator<KVPair> it = event.getEventDataList().iterator();
while (it.hasNext()) {
KVPair pair = it.next();
otherInfo.put(pair.getKey(), pair.getValue());
}
otherInfo.put(ATSConstants.SERVICE_PLUGIN,
getJSONDataValueByKey(event, ATSConstants.SERVICE_PLUGIN));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertVertexReconfigureDoneEvent(HistoryEventProto event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
// Events
JSONArray events = new JSONArray();
JSONObject updateEvent = new JSONObject();
updateEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
updateEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.VERTEX_CONFIGURE_DONE.name());
JSONObject eventInfo = new JSONObject();
eventInfo.put(ATSConstants.NUM_TASKS, getDataValueByKey(event, ATSConstants.NUM_TASKS));
eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS,
getJSONDataValueByKey(event, ATSConstants.UPDATED_EDGE_MANAGERS));
updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
events.put(updateEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
JSONObject otherInfo = new JSONObject();
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
// TODO add more on all other updated information
return jsonObject;
}
private static JSONObject convertVertexInitializedEvent(HistoryEventProto event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
// Related entities
JSONArray relatedEntities = new JSONArray();
JSONObject vertexEntity = new JSONObject();
vertexEntity.put(ATSConstants.ENTITY, event.getDagId());
vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
relatedEntities.put(vertexEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// Events
JSONArray events = new JSONArray();
JSONObject initEvent = new JSONObject();
initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
initEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.VERTEX_INITIALIZED.name());
events.put(initEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
// TODO fix requested times to be events
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.VERTEX_NAME, getDataValueByKey(event, ATSConstants.VERTEX_NAME));
otherInfo.put(ATSConstants.INIT_REQUESTED_TIME,
getDataValueByKey(event, ATSConstants.INIT_REQUESTED_TIME));
otherInfo.put(ATSConstants.INIT_TIME, getDataValueByKey(event, ATSConstants.INIT_TIME));
otherInfo.put(ATSConstants.NUM_TASKS, getDataValueByKey(event, ATSConstants.NUM_TASKS));
otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME,
getDataValueByKey(event, ATSConstants.PROCESSOR_CLASS_NAME));
otherInfo.put(ATSConstants.SERVICE_PLUGIN,
getJSONDataValueByKey(event, ATSConstants.SERVICE_PLUGIN));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static JSONObject convertVertexStartedEvent(HistoryEventProto event)
throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
// Related entities
JSONArray relatedEntities = new JSONArray();
JSONObject vertexEntity = new JSONObject();
vertexEntity.put(ATSConstants.ENTITY, event.getDagId());
vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
relatedEntities.put(vertexEntity);
jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
// Events
JSONArray events = new JSONArray();
JSONObject startEvent = new JSONObject();
startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
startEvent.put(ATSConstants.EVENT_TYPE, HistoryEventType.VERTEX_STARTED.name());
events.put(startEvent);
jsonObject.put(ATSConstants.EVENTS, events);
// Other info
// TODO fix requested times to be events
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_REQUESTED_TIME,
getDataValueByKey(event, ATSConstants.START_REQUESTED_TIME));
otherInfo.put(ATSConstants.START_TIME, event.getEventTime());
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
}
private static String getDataValueByKey(HistoryEventProto event, String key) {
Optional<KVPair> pair =
event.getEventDataList().stream().filter(p -> p.getKey().equals(key)).findAny();
return pair.isPresent() ? pair.get().getValue() : null;
}
private static long getLongDataValueByKey(HistoryEventProto event, String key) {
String value = getDataValueByKey(event, key);
return (value == null || value.isEmpty()) ? 0 : Long.parseLong(value);
}
private static JSONObject getJSONDataValueByKey(HistoryEventProto event, String key)
throws JSONException {
String value = getDataValueByKey(event, key);
return (value == null || value.isEmpty()) ? null : new JSONObject(value);
}
}