blob: 1663cb0227a0b7265f38dcca3ff5b0c99ab9e0d0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.logging.ats;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.app.web.AMWebController;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGKillRequestEvent;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.TaskFailureType;
import org.codehaus.jettison.json.JSONException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestHistoryEventTimelineConversion {
private ApplicationAttemptId applicationAttemptId;
private ApplicationId applicationId;
private String user = "user";
private Random random = new Random();
private TezDAGID tezDAGID;
private TezVertexID tezVertexID;
private TezTaskID tezTaskID;
private TezTaskAttemptID tezTaskAttemptID;
private DAGPlan dagPlan;
private ContainerId containerId;
private NodeId nodeId;
private String containerLogs = "containerLogs";
@SuppressWarnings("deprecation")
@Before
public void setup() {
applicationId = ApplicationId.newInstance(9999l, 1);
applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
CallerContextProto.Builder callerContextProto = CallerContextProto.newBuilder();
callerContextProto.setContext("ctxt");
callerContextProto.setCallerId("Caller_ID");
callerContextProto.setCallerType("Caller_Type");
callerContextProto.setBlob("Desc_1");
dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock")
.setCallerContext(callerContextProto).build();
containerId = ContainerId.newInstance(applicationAttemptId, 111);
nodeId = NodeId.newInstance("node", 13435);
}
@Test(timeout = 5000)
public void testHandlerExists() throws JSONException {
for (HistoryEventType eventType : HistoryEventType.values()) {
HistoryEvent event = null;
switch (eventType) {
case APP_LAUNCHED:
event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
user, new Configuration(false), null);
break;
case AM_LAUNCHED:
event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
user);
break;
case AM_STARTED:
event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
null, user, null, containerLogs, null);
break;
case DAG_INITIALIZED:
event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
break;
case DAG_STARTED:
event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
break;
case DAG_FINISHED:
event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
null, null, user, dagPlan.getName(), null, applicationAttemptId, dagPlan);
break;
case VERTEX_INITIALIZED:
event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
random.nextInt(), "proc", null, null, null);
break;
case VERTEX_STARTED:
event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
break;
case VERTEX_CONFIGURE_DONE:
event = new VertexConfigurationDoneEvent(tezVertexID, 0L, 1, null, null, null, true);
break;
case VERTEX_FINISHED:
event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
null, null, null, null, null);
break;
case TASK_STARTED:
event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
break;
case TASK_FINISHED:
event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
tezTaskAttemptID, TaskState.FAILED, null, null, 0);
break;
case TASK_ATTEMPT_STARTED:
event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
nodeId, null, null, "nodeHttpAddress");
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
random.nextInt(), TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.OUTPUT_LOST,
null, null, null, null, 0, null, 0,
containerId, nodeId, null, null, "nodeHttpAddress");
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
applicationAttemptId);
break;
case CONTAINER_STOPPED:
event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
break;
case DAG_COMMIT_STARTED:
event = new DAGCommitStartedEvent();
break;
case VERTEX_COMMIT_STARTED:
event = new VertexCommitStartedEvent();
break;
case VERTEX_GROUP_COMMIT_STARTED:
event = new VertexGroupCommitStartedEvent();
break;
case VERTEX_GROUP_COMMIT_FINISHED:
event = new VertexGroupCommitFinishedEvent();
break;
case DAG_RECOVERED:
event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(),
user, random.nextLong(), containerLogs);
break;
case DAG_KILL_REQUEST:
event = new DAGKillRequestEvent();
break;
default:
Assert.fail("Unhandled event type " + eventType);
}
if (event == null || !event.isHistoryEvent()) {
continue;
}
HistoryEventTimelineConversion.convertToTimelineEntities(event);
}
}
static class MockVersionInfo extends VersionInfo {
MockVersionInfo() {
super("component", "1.1.0", "rev1", "20120101", "git.apache.org");
}
}
@Test(timeout = 5000)
public void testConvertAppLaunchedEventConcurrentModificationException()
throws InterruptedException {
long launchTime = random.nextLong();
long submitTime = random.nextLong();
final Configuration conf = new Configuration(false);
conf.set("foo", "bar");
conf.set("applicationId", "1234");
final AtomicBoolean shutdown = new AtomicBoolean(false);
Thread confChanger = new Thread() {
@Override
public void run() {
int i = 1;
while (!shutdown.get()) {
// trigger an actual change to test concurrency with Configuration#iterator
conf.set("test" + i++, "test");
}
}
};
confChanger.start();
try {
MockVersionInfo mockVersionInfo = new MockVersionInfo();
AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
submitTime, user, conf, mockVersionInfo);
HistoryEventTimelineConversion.convertToTimelineEntities(event);
} finally {
shutdown.set(true);
confChanger.join();
}
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertAppLaunchedEvent() {
long launchTime = random.nextLong();
long submitTime = random.nextLong();
Configuration conf = new Configuration(false);
conf.set("foo", "bar");
conf.set("applicationId", "1234");
MockVersionInfo mockVersionInfo = new MockVersionInfo();
AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
submitTime, user, conf, mockVersionInfo);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType());
Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
Assert.assertEquals(5, timelineEntity.getOtherInfo().size());
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG));
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.TEZ_VERSION));
Assert.assertEquals(user, timelineEntity.getOtherInfo().get(ATSConstants.USER));
Assert.assertEquals(applicationId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
Assert.assertEquals(AMWebController.VERSION,
timelineEntity.getOtherInfo().get(ATSConstants.DAG_AM_WEB_SERVICE_VERSION));
Map<String, String> config =
(Map<String, String>) timelineEntity.getOtherInfo().get(ATSConstants.CONFIG);
Assert.assertEquals(conf.get("foo"), config.get("foo"));
Assert.assertEquals(conf.get("applicationId"), config.get("applicationId"));
Map<String, String> versionInfo =
(Map<String, String>) timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION);
Assert.assertEquals(mockVersionInfo.getVersion(),
versionInfo.get(ATSConstants.VERSION));
Assert.assertEquals(mockVersionInfo.getRevision(),
versionInfo.get(ATSConstants.REVISION));
Assert.assertEquals(mockVersionInfo.getBuildTime(),
versionInfo.get(ATSConstants.BUILD_TIME));
}
@Test(timeout = 5000)
public void testConvertAMLaunchedEvent() {
long launchTime = random.nextLong();
long submitTime = random.nextLong();
AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime, user);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType());
final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities();
Assert.assertEquals(0, relatedEntities.size());
final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
Assert.assertEquals(2, primaryFilters.size());
Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user));
Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent evt = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.AM_LAUNCHED.name(), evt.getEventType());
Assert.assertEquals(launchTime, evt.getTimestamp());
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
Assert.assertEquals(4, otherInfo.size());
Assert.assertEquals(submitTime, otherInfo.get(ATSConstants.APP_SUBMIT_TIME));
Assert.assertEquals(applicationId.toString(), otherInfo.get(ATSConstants.APPLICATION_ID));
Assert.assertEquals(applicationAttemptId.toString(), otherInfo.get(ATSConstants.APPLICATION_ATTEMPT_ID));
Assert.assertEquals(user, otherInfo.get(ATSConstants.USER));
}
@Test(timeout = 5000)
public void testConvertAMStartedEvent() {
long startTime = random.nextLong();
AMStartedEvent event = new AMStartedEvent(applicationAttemptId, startTime, user);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType());
final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities();
Assert.assertEquals(0, relatedEntities.size());
final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
Assert.assertEquals(2, primaryFilters.size());
Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user));
Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent evt = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.AM_STARTED.name(), evt.getEventType());
Assert.assertEquals(startTime, evt.getTimestamp());
}
@Test(timeout = 5000)
public void testConvertContainerLaunchedEvent() {
long launchTime = random.nextLong();
ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
applicationAttemptId);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
Assert.assertTrue(
timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains(
"tez_" + applicationAttemptId.toString()));
Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationAttemptId.getApplicationId().toString()));
Assert.assertEquals(containerId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID));
Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
Assert.assertEquals(1, timelineEntity.getEvents().size());
Assert.assertEquals(HistoryEventType.CONTAINER_LAUNCHED.name(),
timelineEntity.getEvents().get(0).getEventType());
Assert.assertEquals(launchTime,
timelineEntity.getEvents().get(0).getTimestamp());
}
@Test(timeout = 5000)
public void testConvertContainerStoppedEvent() {
long stopTime = random.nextLong();
int exitStatus = random.nextInt();
ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus,
applicationAttemptId);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities();
Assert.assertEquals(1, relatedEntities.size());
Assert.assertTrue(relatedEntities.get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name())
.contains("tez_" + applicationAttemptId.toString()));
final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
Assert.assertEquals(2, primaryFilters.size());
Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertTrue(primaryFilters.get(ATSConstants.EXIT_STATUS).contains(exitStatus));
Assert.assertEquals(1, timelineEntity.getEvents().size());
final TimelineEvent evt = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.CONTAINER_STOPPED.name(), evt.getEventType());
Assert.assertEquals(stopTime, evt.getTimestamp());
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
Assert.assertEquals(2, otherInfo.size());
Assert.assertEquals(exitStatus, otherInfo.get(ATSConstants.EXIT_STATUS));
Assert.assertEquals(stopTime, otherInfo.get(ATSConstants.FINISH_TIME));
}
@Test(timeout = 5000)
public void testConvertDAGStartedEvent() {
long startTime = random.nextLong();
String dagName = "testDagName";
DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent evt = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.DAG_STARTED.name(), evt.getEventType());
Assert.assertEquals(startTime, evt.getTimestamp());
final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
Assert.assertEquals(3, primaryFilters.size());
Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user));
Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertTrue(primaryFilters.get(ATSConstants.DAG_NAME).contains(dagName));
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
Assert.assertEquals(2, otherInfo.size());
Assert.assertEquals(startTime, otherInfo.get(ATSConstants.START_TIME));
Assert.assertEquals(DAGState.RUNNING.name(), otherInfo.get(ATSConstants.STATUS));
}
@Test(timeout = 5000)
public void testConvertDAGSubmittedEvent() {
long submitTime = random.nextLong();
final String queueName = "TEST_DAG_SUBMITTED";
DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
applicationAttemptId, null, user, null, containerLogs, queueName);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(2, entities.size());
if (entities.get(0).getEntityType().equals(EntityTypes.TEZ_DAG_ID.name())) {
assertDagSubmittedEntity(submitTime, event, entities.get(0));
assertDagSubmittedExtraInfoEntity(submitTime, event, entities.get(1));
} else {
assertDagSubmittedExtraInfoEntity(submitTime, event, entities.get(0));
assertDagSubmittedEntity(submitTime, event, entities.get(1));
}
}
private void assertDagSubmittedEntity(long submitTime, DAGSubmittedEvent event,
TimelineEntity timelineEntity) {
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
Assert.assertTrue(
timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION.name()).contains(
"tez_" + applicationId.toString()));
Assert.assertTrue(
timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains(
"tez_" + applicationAttemptId.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType());
Assert.assertEquals(submitTime, timelineEvent.getTimestamp());
Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
Assert.assertEquals(5, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(
dagPlan.getName()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_ID).contains(
dagPlan.getCallerContext().getCallerId()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationAttemptId.getApplicationId().toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_QUEUE_NAME)
.contains(event.getQueueName()));
Assert.assertEquals(9, timelineEntity.getOtherInfo().size());
Assert.assertEquals(applicationId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
Assert.assertEquals(applicationAttemptId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ATTEMPT_ID));
Assert.assertEquals(applicationAttemptId.getApplicationId().toString(),
timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
Assert.assertEquals(AMWebController.VERSION,
timelineEntity.getOtherInfo().get(ATSConstants.DAG_AM_WEB_SERVICE_VERSION));
Assert.assertEquals(user,
timelineEntity.getOtherInfo().get(ATSConstants.USER));
Assert.assertEquals(containerLogs,
timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ applicationAttemptId.getAttemptId()));
Assert.assertEquals(
timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_ID),
dagPlan.getCallerContext().getCallerId());
Assert.assertEquals(
timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE),
dagPlan.getCallerContext().getCallerType());
Assert.assertEquals(dagPlan.getCallerContext().getContext(),
timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT));
Assert.assertEquals(
event.getQueueName(), timelineEntity.getOtherInfo().get(ATSConstants.DAG_QUEUE_NAME));
}
private void assertDagSubmittedExtraInfoEntity(long submitTime, DAGSubmittedEvent event,
TimelineEntity timelineEntity) {
Assert.assertEquals(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
Assert.assertTrue(timelineEntity.getRelatedEntities()
.get(EntityTypes.TEZ_DAG_ID.name()).contains(tezDAGID.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType());
Assert.assertEquals(submitTime, timelineEvent.getTimestamp());
Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
Assert.assertEquals(0, timelineEntity.getPrimaryFilters().size());
Assert.assertEquals(1, timelineEntity.getOtherInfo().size());
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertTaskAttemptFinishedEvent() {
String vertexName = "testVertex";
long creationTime = random.nextLong();
long startTime = creationTime + 1000;
long allocationTime = creationTime + 1001;
long finishTime = startTime + 1002;
TaskAttemptState state = TaskAttemptState
.values()[random.nextInt(TaskAttemptState.values().length)];
TaskAttemptTerminationCause error = TaskAttemptTerminationCause
.values()[random.nextInt(TaskAttemptTerminationCause.values().length)];
String diagnostics = "random diagnostics message";
TezCounters counters = new TezCounters();
long lastDataEventTime = finishTime - 1;
List<DataEventDependencyInfo> events = Lists.newArrayList();
events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
startTime, finishTime, state, TaskFailureType.FATAL, error, diagnostics, counters, events, null, creationTime,
tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
Assert.assertEquals(5, primaryFilters.size());
Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name())
.contains(tezDAGID.toString()));
Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name())
.contains(tezVertexID.toString()));
Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_TASK_ID.name())
.contains(tezTaskID.toString()));
Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent evt = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_FINISHED.name(), evt.getEventType());
Assert.assertEquals(finishTime, evt.getTimestamp());
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
Assert.assertEquals(17, otherInfo.size());
Assert.assertEquals(tezTaskAttemptID.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT));
Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME));
Assert.assertEquals(allocationTime, timelineEntity.getOtherInfo().get(ATSConstants.ALLOCATION_TIME));
Assert.assertEquals(startTime, timelineEntity.getOtherInfo().get(ATSConstants.START_TIME));
Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
Assert.assertEquals(TaskFailureType.FATAL.name(), otherInfo.get(ATSConstants.TASK_FAILURE_TYPE));
Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
Map<String, Object> obj1 = (Map<String, Object>)otherInfo.get(ATSConstants.LAST_DATA_EVENTS);
List<Object> obj2 = (List<Object>) obj1.get(ATSConstants.LAST_DATA_EVENTS);
Assert.assertEquals(2, obj2.size());
Map<String, Object> obj3 = (Map<String, Object>) obj2.get(0);
Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP));
Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
Assert.assertEquals("inProgressURL", otherInfo.get(ATSConstants.IN_PROGRESS_LOGS_URL));
Assert.assertEquals("logsURL", otherInfo.get(ATSConstants.COMPLETED_LOGS_URL));
Assert.assertEquals(nodeId.toString(), otherInfo.get(ATSConstants.NODE_ID));
Assert.assertEquals(containerId.toString(), otherInfo.get(ATSConstants.CONTAINER_ID));
Assert.assertEquals("nodeHttpAddress", otherInfo.get(ATSConstants.NODE_HTTP_ADDRESS));
TaskAttemptFinishedEvent eventWithNullFailureType =
new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
startTime, finishTime, state, null, error, diagnostics, counters, events, null,
creationTime,
tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL",
"nodeHttpAddress");
List<TimelineEntity> evtEntities = HistoryEventTimelineConversion.convertToTimelineEntities(
eventWithNullFailureType);
Assert.assertEquals(1, evtEntities.size());
TimelineEntity timelineEntityWithNullFailureType = evtEntities.get(0);
Assert.assertNull(
timelineEntityWithNullFailureType.getOtherInfo().get(ATSConstants.TASK_FAILURE_TYPE));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertDAGInitializedEvent() {
long initTime = random.nextLong();
Map<String, TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
nameIdMap.put("foo", tezVertexID);
DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
nameIdMap);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType());
Assert.assertEquals(initTime, timelineEvent.getTimestamp());
Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName"));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(
ATSConstants.VERTEX_NAME_ID_MAPPING));
Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get(
ATSConstants.VERTEX_NAME_ID_MAPPING);
Assert.assertEquals(1, vIdMap.size());
Assert.assertNotNull(vIdMap.containsKey("foo"));
Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo"));
}
@Test(timeout = 5000)
public void testConvertDAGFinishedEvent() {
long finishTime = random.nextLong();
long startTime = random.nextLong();
Map<String, Integer> taskStats = new HashMap<String, Integer>();
taskStats.put("FOO", 100);
taskStats.put("BAR", 200);
DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
"diagnostics", null, user, dagPlan.getName(), taskStats, applicationAttemptId, dagPlan);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(2, entities.size());
if (entities.get(0).getEntityType().equals(EntityTypes.TEZ_DAG_ID.name())) {
assertDagFinishedEntity(finishTime, startTime, event, entities.get(0));
assertDagFinishedExtraInfoEntity(finishTime, entities.get(1));
} else {
assertDagFinishedExtraInfoEntity(finishTime, entities.get(0));
assertDagFinishedEntity(finishTime, startTime, event, entities.get(1));
}
}
private void assertDagFinishedEntity(long finishTime, long startTime, DAGFinishedEvent event,
TimelineEntity timelineEntity) {
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
Assert.assertEquals(5, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
DAGState.ERROR.name()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_ID).contains(
dagPlan.getCallerContext().getCallerId()));
Assert.assertEquals(startTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
Assert.assertEquals(finishTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
Assert.assertEquals(finishTime - startTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
Assert.assertEquals(DAGState.ERROR.name(),
timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
Assert.assertEquals("diagnostics",
timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
Assert.assertEquals(applicationAttemptId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID));
Assert.assertEquals(100,
((Integer) timelineEntity.getOtherInfo().get("FOO")).intValue());
Assert.assertEquals(200,
((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue());
}
private void assertDagFinishedExtraInfoEntity(long finishTime, TimelineEntity timelineEntity) {
Assert.assertEquals(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
Assert.assertTrue(
timelineEntity.getRelatedEntities().get(ATSConstants.TEZ_DAG_ID).contains(
tezDAGID.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertVertexInitializedEvent() {
long initRequestedTime = random.nextLong();
long initedTime = random.nextLong();
int numTasks = random.nextInt();
VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime,
initedTime, numTasks, "proc", null, null,
new ServicePluginInfo().setContainerLauncherName("abc")
.setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
.setContainerLauncherClassName("abc1")
.setTaskSchedulerClassName("def1")
.setTaskCommunicatorClassName("ghi1"));
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(initedTime, timelineEntity.getStartTime().longValue());
Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
Assert.assertTrue(
timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_DAG_ID.name()).contains(
tezDAGID.toString()));
Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
tezDAGID.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.VERTEX_INITIALIZED.name(), timelineEvent.getEventType());
Assert.assertEquals(initedTime, timelineEvent.getTimestamp());
Assert.assertEquals("v1", timelineEntity.getOtherInfo().get(ATSConstants.VERTEX_NAME));
Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME));
Assert.assertEquals(initedTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
Assert.assertEquals(initRequestedTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue());
Assert.assertEquals(initedTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
Assert.assertEquals(numTasks,
((Integer) timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue());
Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN));
Assert.assertEquals("abc",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.CONTAINER_LAUNCHER_NAME));
Assert.assertEquals("def",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_SCHEDULER_NAME));
Assert.assertEquals("ghi",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_COMMUNICATOR_NAME));
Assert.assertEquals("abc1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
Assert.assertEquals("def1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_SCHEDULER_CLASS_NAME));
Assert.assertEquals("ghi1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
}
@Test(timeout = 5000)
public void testConvertVertexStartedEvent() {
long startRequestedTime = random.nextLong();
long startTime = random.nextLong();
VertexStartedEvent event = new VertexStartedEvent(tezVertexID, startRequestedTime, startTime);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
tezDAGID.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.VERTEX_STARTED.name(), timelineEvent.getEventType());
Assert.assertEquals(startTime, timelineEvent.getTimestamp());
Assert.assertEquals(3, timelineEntity.getOtherInfo().size());
Assert.assertEquals(startRequestedTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.START_REQUESTED_TIME)).longValue());
Assert.assertEquals(startTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
Assert.assertEquals(VertexState.RUNNING.name(),
timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertVertexFinishedEvent() {
String vertexName = "v1";
long initRequestedTime = random.nextLong();
long initedTime = random.nextLong();
long startRequestedTime = random.nextLong();
long startTime = random.nextLong();
long finishTime = random.nextLong();
Map<String, Integer> taskStats = new HashMap<String, Integer>();
taskStats.put("FOO", 100);
taskStats.put("BAR", 200);
VertexStats vertexStats = new VertexStats();
VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, vertexName, 1, initRequestedTime,
initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
"diagnostics", null, vertexStats, taskStats,
new ServicePluginInfo().setContainerLauncherName("abc")
.setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
.setContainerLauncherClassName("abc1")
.setTaskSchedulerClassName("def1")
.setTaskCommunicatorClassName("ghi1"));
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
tezDAGID.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
VertexState.ERROR.name()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType());
Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
Assert.assertEquals(vertexName,
timelineEntity.getOtherInfo().get(ATSConstants.VERTEX_NAME));
Assert.assertEquals(finishTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
Assert.assertEquals(finishTime - startTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
Assert.assertEquals(VertexState.ERROR.name(),
timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
Assert.assertEquals("diagnostics",
timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN));
Assert.assertEquals("abc",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.CONTAINER_LAUNCHER_NAME));
Assert.assertEquals("def",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_SCHEDULER_NAME));
Assert.assertEquals("ghi",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_COMMUNICATOR_NAME));
Assert.assertEquals("abc1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
Assert.assertEquals("def1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_SCHEDULER_CLASS_NAME));
Assert.assertEquals("ghi1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
Assert.assertEquals(100,
((Integer) timelineEntity.getOtherInfo().get("FOO")).intValue());
Assert.assertEquals(200,
((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue());
}
@Test(timeout = 5000)
public void testConvertTaskStartedEvent() {
long scheduleTime = random.nextLong();
long startTime = random.nextLong();
TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue());
Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
Assert.assertTrue(
timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
tezVertexID.toString()));
Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
tezDAGID.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
tezVertexID.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.TASK_STARTED.name(), timelineEvent.getEventType());
Assert.assertEquals(startTime, timelineEvent.getTimestamp());
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.SCHEDULED_TIME));
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
Assert.assertEquals(scheduleTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue());
Assert.assertEquals(startTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
Assert.assertTrue(TaskState.SCHEDULED.name()
.equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
}
@Test(timeout = 5000)
public void testConvertTaskAttemptStartedEvent() {
long startTime = random.nextLong();
TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue());
Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
Assert.assertTrue(
timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_TASK_ID.name()).contains(
tezTaskID.toString()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_STARTED.name(), timelineEvent.getEventType());
Assert.assertEquals(startTime, timelineEvent.getTimestamp());
Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
tezDAGID.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
tezVertexID.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_TASK_ID.name()).contains(
tezTaskID.toString()));
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
Assert.assertEquals("inProgressURL",
timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL));
Assert.assertEquals("logsURL",
timelineEntity.getOtherInfo().get(ATSConstants.COMPLETED_LOGS_URL));
Assert.assertEquals(nodeId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.NODE_ID));
Assert.assertEquals(containerId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID));
Assert.assertEquals("nodeHttpAddress",
timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS));
Assert.assertTrue(TaskAttemptState.RUNNING.name()
.equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
}
@Test(timeout = 5000)
public void testConvertTaskFinishedEvent() {
String vertexName = "testVertexName";
long startTime = random.nextLong();
long finishTime = random.nextLong();
TaskState state = TaskState.values()[random.nextInt(TaskState.values().length)];
String diagnostics = "diagnostics message";
TezCounters counters = new TezCounters();
TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime,
tezTaskAttemptID, state, diagnostics, counters, 3);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
Assert.assertEquals(4, primaryFilters.size());
Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name())
.contains(tezDAGID.toString()));
Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name())
.contains(tezVertexID.toString()));
Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.name()));
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent evt = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.TASK_FINISHED.name(), evt.getEventType());
Assert.assertEquals(finishTime, evt.getTimestamp());
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
Assert.assertEquals(7, otherInfo.size());
Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
Assert.assertEquals(tezTaskAttemptID.toString(),
otherInfo.get(ATSConstants.SUCCESSFUL_ATTEMPT_ID));
Assert.assertEquals(3, otherInfo.get(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS));
Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertVertexReconfigreDoneEvent() {
TezVertexID vId = tezVertexID;
Map<String, EdgeProperty> edgeMgrs =
new HashMap<String, EdgeProperty>();
edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class")
.setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In")));
VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 0L, 1, null,
edgeMgrs, null, true);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType());
Assert.assertEquals(vId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(1, timelineEntity.getEvents().size());
final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters();
Assert.assertEquals(2, primaryFilters.size());
Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID)
.contains(applicationId.toString()));
Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name())
.contains(tezDAGID.toString()));
TimelineEvent evt = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.VERTEX_CONFIGURE_DONE.name(), evt.getEventType());
Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS));
Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS));
Map<String, Object> updatedEdgeMgrs = (Map<String, Object>)
evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS);
Assert.assertEquals(1, updatedEdgeMgrs.size());
Assert.assertTrue(updatedEdgeMgrs.containsKey("a"));
Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a");
Assert.assertEquals(DataMovementType.CUSTOM.name(),
updatedEdgeMgr.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
Assert.assertEquals("In", updatedEdgeMgr.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS));
}
@Test(timeout = 5000)
public void testConvertDAGRecoveredEvent() {
long recoverTime = random.nextLong();
DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID,
dagPlan.getName(), user, recoverTime, containerLogs);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.DAG_RECOVERED.name(), timelineEvent.getEventType());
Assert.assertEquals(recoverTime, timelineEvent.getTimestamp());
Assert.assertTrue(timelineEvent.getEventInfo().containsKey(ATSConstants.APPLICATION_ATTEMPT_ID));
Assert.assertEquals(applicationAttemptId.toString(),
timelineEvent.getEventInfo().get(ATSConstants.APPLICATION_ATTEMPT_ID));
Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("DAGPlanMock"));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
Assert.assertEquals(containerLogs,
timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ applicationAttemptId.getAttemptId()));
}
@Test(timeout = 5000)
public void testConvertDAGRecoveredEvent2() {
long recoverTime = random.nextLong();
DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID,
dagPlan.getName(), user, recoverTime, DAGState.ERROR, "mock reason", containerLogs);
List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
Assert.assertEquals(1, entities.size());
TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
Assert.assertEquals(1, timelineEntity.getEvents().size());
TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
Assert.assertEquals(HistoryEventType.DAG_RECOVERED.name(), timelineEvent.getEventType());
Assert.assertEquals(recoverTime, timelineEvent.getTimestamp());
Assert.assertTrue(timelineEvent.getEventInfo().containsKey(ATSConstants.APPLICATION_ATTEMPT_ID));
Assert.assertEquals(applicationAttemptId.toString(),
timelineEvent.getEventInfo().get(ATSConstants.APPLICATION_ATTEMPT_ID));
Assert.assertEquals(DAGState.ERROR.name(),
timelineEvent.getEventInfo().get(ATSConstants.DAG_STATE));
Assert.assertEquals("mock reason",
timelineEvent.getEventInfo().get(ATSConstants.RECOVERY_FAILURE_REASON));
Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationId.toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("DAGPlanMock"));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
Assert.assertEquals(containerLogs,
timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ applicationAttemptId.getAttemptId()));
}
}