blob: 64f66bc12ba9d1afe2f227992e8665e187393e05 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.logging.proto;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.app.web.AMWebController;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGKillRequestEvent;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.KVPair;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.TaskFailureType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestHistoryEventProtoConverter {
private ApplicationAttemptId applicationAttemptId;
private ApplicationId applicationId;
private String user = "user";
private Random random = new Random();
private TezDAGID tezDAGID;
private TezVertexID tezVertexID;
private TezTaskID tezTaskID;
private TezTaskAttemptID tezTaskAttemptID;
private DAGPlan dagPlan;
private ContainerId containerId;
private NodeId nodeId;
private String containerLogs = "containerLogs";
private HistoryEventProtoConverter converter = new HistoryEventProtoConverter();
@Before
public void setup() {
applicationId = ApplicationId.newInstance(9999l, 1);
applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
CallerContextProto.Builder callerContextProto = CallerContextProto.newBuilder();
callerContextProto.setContext("ctxt");
callerContextProto.setCallerId("Caller_ID");
callerContextProto.setCallerType("Caller_Type");
callerContextProto.setBlob("Desc_1");
dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock")
.setCallerContext(callerContextProto).build();
containerId = ContainerId.newContainerId(applicationAttemptId, 111);
nodeId = NodeId.newInstance("node", 13435);
}
@Test(timeout = 5000)
public void testHandlerExists() {
for (HistoryEventType eventType : HistoryEventType.values()) {
HistoryEvent event = null;
switch (eventType) {
case APP_LAUNCHED:
event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
user, new Configuration(false), null);
break;
case AM_LAUNCHED:
event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
user);
break;
case AM_STARTED:
event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
null, user, null, containerLogs, null);
break;
case DAG_INITIALIZED:
event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
break;
case DAG_STARTED:
event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
break;
case DAG_FINISHED:
event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
null, null, user, dagPlan.getName(), null, applicationAttemptId, dagPlan);
break;
case VERTEX_INITIALIZED:
event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
random.nextInt(), "proc", null, null, null);
break;
case VERTEX_STARTED:
event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
break;
case VERTEX_CONFIGURE_DONE:
event = new VertexConfigurationDoneEvent(tezVertexID, 0L, 1, null, null, null, true);
break;
case VERTEX_FINISHED:
event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
null, null, null, null, null);
break;
case TASK_STARTED:
event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
break;
case TASK_FINISHED:
event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
tezTaskAttemptID, TaskState.FAILED, null, null, 0);
break;
case TASK_ATTEMPT_STARTED:
event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
nodeId, null, null, "nodeHttpAddress");
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
random.nextInt(), TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.OUTPUT_LOST,
null, null, null, null, 0, null, 0,
containerId, nodeId, null, null, "nodeHttpAddress");
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
applicationAttemptId);
break;
case CONTAINER_STOPPED:
event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
break;
case DAG_COMMIT_STARTED:
event = new DAGCommitStartedEvent();
break;
case VERTEX_COMMIT_STARTED:
event = new VertexCommitStartedEvent();
break;
case VERTEX_GROUP_COMMIT_STARTED:
event = new VertexGroupCommitStartedEvent();
break;
case VERTEX_GROUP_COMMIT_FINISHED:
event = new VertexGroupCommitFinishedEvent();
break;
case DAG_RECOVERED:
event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(),
user, random.nextLong(), containerLogs);
break;
case DAG_KILL_REQUEST:
event = new DAGKillRequestEvent();
break;
default:
Assert.fail("Unhandled event type " + eventType);
}
if (event == null || !event.isHistoryEvent()) {
continue;
}
converter.convert(event);
}
}
static class MockVersionInfo extends VersionInfo {
MockVersionInfo() {
super("component", "1.1.0", "rev1", "20120101", "git.apache.org");
}
}
private String findEventData(HistoryEventProto proto, String key) {
for (KVPair data : proto.getEventDataList()) {
if (data.getKey().equals(key)) {
return data.getValue();
}
}
return null;
}
private void assertEventData(HistoryEventProto proto, String key, String value) {
String evtVal = findEventData(proto, key);
if (evtVal == null) {
Assert.fail("Cannot find kv pair: " + key);
}
if (value != null) {
Assert.assertEquals(value, evtVal);
}
}
private void assertNoEventData(HistoryEventProto proto, String key) {
for (KVPair data : proto.getEventDataList()) {
if (data.getKey().equals(key)) {
Assert.fail("Found find kv pair: " + key);
}
}
}
private String safeToString(Object obj) {
return obj == null ? "" : obj.toString();
}
private void assertCommon(HistoryEventProto proto, HistoryEventType type, long eventTime,
EntityTypes entityType, ApplicationAttemptId appAttemptId, String user, int numData) {
Assert.assertEquals(type.name(), proto.getEventType());
Assert.assertEquals(eventTime, proto.getEventTime());
// Assert.assertEquals(safeToString(appId), proto.getAppId());
Assert.assertEquals(safeToString(appAttemptId), proto.getAppAttemptId());
Assert.assertEquals(safeToString(user), proto.getUser());
if (entityType != null) {
switch (entityType) { // Intentional fallthrough.
case TEZ_TASK_ATTEMPT_ID:
Assert.assertEquals(tezTaskAttemptID.toString(), proto.getTaskAttemptId());
case TEZ_TASK_ID:
Assert.assertEquals(tezTaskID.toString(), proto.getTaskId());
case TEZ_VERTEX_ID:
Assert.assertEquals(tezVertexID.toString(), proto.getVertexId());
case TEZ_DAG_ID:
Assert.assertEquals(tezDAGID.toString(), proto.getDagId());
case TEZ_APPLICATION:
Assert.assertEquals(applicationId.toString(), proto.getAppId());
break;
default:
Assert.fail("Invalid type: " + entityType.name());
}
}
Assert.assertEquals(numData, proto.getEventDataCount());
}
@Test(timeout = 5000)
public void testConvertAppLaunchedEvent() {
long launchTime = random.nextLong();
long submitTime = random.nextLong();
Configuration conf = new Configuration(false);
conf.set("foo", "bar");
conf.set("applicationId", "1234");
MockVersionInfo mockVersionInfo = new MockVersionInfo();
AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime, submitTime, user,
conf, mockVersionInfo);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.APP_LAUNCHED, launchTime, EntityTypes.TEZ_APPLICATION,
null, user, 3);
assertEventData(proto, ATSConstants.CONFIG, null);
assertEventData(proto, ATSConstants.TEZ_VERSION, null);
assertEventData(proto, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION);
}
@Test(timeout = 5000)
public void testConvertAMLaunchedEvent() {
long launchTime = random.nextLong();
long submitTime = random.nextLong();
AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime,
user);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.AM_LAUNCHED, launchTime, EntityTypes.TEZ_APPLICATION,
applicationAttemptId, user, 1);
assertEventData(proto, ATSConstants.APP_SUBMIT_TIME, String.valueOf(submitTime));
}
@Test(timeout = 5000)
public void testConvertAMStartedEvent() {
long startTime = random.nextLong();
AMStartedEvent event = new AMStartedEvent(applicationAttemptId, startTime, user);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.AM_STARTED, startTime, EntityTypes.TEZ_APPLICATION,
applicationAttemptId, user, 0);
}
@Test(timeout = 5000)
public void testConvertContainerLaunchedEvent() {
long launchTime = random.nextLong();
ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
applicationAttemptId);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.CONTAINER_LAUNCHED, launchTime, EntityTypes.TEZ_APPLICATION,
applicationAttemptId, null, 1);
assertEventData(proto, ATSConstants.CONTAINER_ID, containerId.toString());
}
@Test(timeout = 5000)
public void testConvertContainerStoppedEvent() {
long stopTime = random.nextLong();
int exitStatus = random.nextInt();
ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus,
applicationAttemptId);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.CONTAINER_STOPPED, stopTime, EntityTypes.TEZ_APPLICATION,
applicationAttemptId, null, 3);
assertEventData(proto, ATSConstants.CONTAINER_ID, containerId.toString());
assertEventData(proto, ATSConstants.EXIT_STATUS, String.valueOf(exitStatus));
assertEventData(proto, ATSConstants.FINISH_TIME, String.valueOf(stopTime));
}
@Test(timeout = 5000)
public void testConvertDAGStartedEvent() {
long startTime = random.nextLong();
String dagName = "testDagName";
DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.DAG_STARTED, startTime, EntityTypes.TEZ_DAG_ID, null,
user, 2);
assertEventData(proto, ATSConstants.DAG_NAME, dagName);
assertEventData(proto, ATSConstants.STATUS, DAGState.RUNNING.name());
}
@Test(timeout = 5000)
public void testConvertDAGSubmittedEvent() {
long submitTime = random.nextLong();
final String queueName = "TEST_DAG_SUBMITTED";
DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
applicationAttemptId, null, user, null, containerLogs, queueName);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.DAG_SUBMITTED, submitTime, EntityTypes.TEZ_DAG_ID,
applicationAttemptId, user, 8);
assertEventData(proto, ATSConstants.DAG_NAME, dagPlan.getName());
assertEventData(proto, ATSConstants.DAG_QUEUE_NAME, event.getQueueName());
assertEventData(proto, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION);
assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ applicationAttemptId.getAttemptId(), containerLogs);
assertEventData(proto, ATSConstants.CALLER_CONTEXT_ID,
dagPlan.getCallerContext().getCallerId());
assertEventData(proto, ATSConstants.CALLER_CONTEXT_TYPE,
dagPlan.getCallerContext().getCallerType());
assertEventData(proto, ATSConstants.CALLER_CONTEXT, dagPlan.getCallerContext().getContext());
assertEventData(proto, ATSConstants.DAG_PLAN, null);
}
@Test(timeout = 5000)
public void testConvertTaskAttemptFinishedEvent() {
String vertexName = "testVertex";
long creationTime = random.nextLong();
long startTime = creationTime + 1000;
long allocationTime = creationTime + 1001;
long finishTime = startTime + 1002;
TaskAttemptState state = TaskAttemptState
.values()[random.nextInt(TaskAttemptState.values().length)];
TaskAttemptTerminationCause error = TaskAttemptTerminationCause
.values()[random.nextInt(TaskAttemptTerminationCause.values().length)];
String diagnostics = "random diagnostics message";
TezCounters counters = new TezCounters();
long lastDataEventTime = finishTime - 1;
List<DataEventDependencyInfo> events = Lists.newArrayList();
events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
startTime, finishTime, state, TaskFailureType.FATAL, error, diagnostics, counters, events,
null, creationTime, tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL",
"logsURL", "nodeHttpAddress");
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.TASK_ATTEMPT_FINISHED, finishTime,
EntityTypes.TEZ_DAG_ID, null, null, 17);
assertEventData(proto, ATSConstants.STATUS, state.name());
assertEventData(proto, ATSConstants.CREATION_CAUSAL_ATTEMPT, tezTaskAttemptID.toString());
assertEventData(proto, ATSConstants.CREATION_TIME, String.valueOf(creationTime));
assertEventData(proto, ATSConstants.ALLOCATION_TIME, String.valueOf(allocationTime));
assertEventData(proto, ATSConstants.START_TIME, String.valueOf(startTime));
assertEventData(proto, ATSConstants.TIME_TAKEN, String.valueOf(finishTime - startTime));
assertEventData(proto, ATSConstants.TASK_FAILURE_TYPE, TaskFailureType.FATAL.name());
assertEventData(proto, ATSConstants.TASK_ATTEMPT_ERROR_ENUM, error.name());
assertEventData(proto, ATSConstants.DIAGNOSTICS, diagnostics);
assertEventData(proto, ATSConstants.LAST_DATA_EVENTS, null);
assertEventData(proto, ATSConstants.COUNTERS, null);
assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL, "inProgressURL");
assertEventData(proto, ATSConstants.COMPLETED_LOGS_URL, "logsURL");
assertEventData(proto, ATSConstants.NODE_ID, nodeId.toString());
assertEventData(proto, ATSConstants.CONTAINER_ID, containerId.toString());
assertEventData(proto, ATSConstants.NODE_HTTP_ADDRESS, "nodeHttpAddress");
TaskAttemptFinishedEvent eventWithNullFailureType =
new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
startTime, finishTime, state, null, error, diagnostics, counters, events, null,
creationTime,
tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL",
"nodeHttpAddress");
proto = converter.convert(eventWithNullFailureType);
assertNoEventData(proto, ATSConstants.TASK_FAILURE_TYPE);
}
@Test(timeout = 5000)
public void testConvertDAGInitializedEvent() {
long initTime = random.nextLong();
Map<String, TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
nameIdMap.put("foo", tezVertexID);
DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
nameIdMap);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.DAG_INITIALIZED, initTime,
EntityTypes.TEZ_DAG_ID, null, user, 2);
assertEventData(proto, ATSConstants.DAG_NAME, "dagName");
assertEventData(proto, ATSConstants.VERTEX_NAME_ID_MAPPING, null);
}
@Test(timeout = 5000)
public void testConvertDAGFinishedEvent() {
long finishTime = random.nextLong();
long startTime = random.nextLong();
Map<String, Integer> taskStats = new HashMap<String, Integer>();
taskStats.put("FOO", 100);
taskStats.put("BAR", 200);
DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
"diagnostics", null, user, dagPlan.getName(), taskStats, applicationAttemptId, dagPlan);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.DAG_FINISHED, finishTime,
EntityTypes.TEZ_DAG_ID, applicationAttemptId, user, 11);
assertEventData(proto, ATSConstants.DAG_NAME, dagPlan.getName());
assertEventData(proto, ATSConstants.STATUS, DAGState.ERROR.name());
assertEventData(proto, ATSConstants.CALLER_CONTEXT_ID,
dagPlan.getCallerContext().getCallerId());
assertEventData(proto, ATSConstants.CALLER_CONTEXT_TYPE,
dagPlan.getCallerContext().getCallerType());
assertEventData(proto, ATSConstants.START_TIME, String.valueOf(startTime));
assertEventData(proto, ATSConstants.TIME_TAKEN, String.valueOf(finishTime - startTime));
assertEventData(proto, ATSConstants.DIAGNOSTICS, "diagnostics");
assertEventData(proto, ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
applicationAttemptId.toString());
assertEventData(proto, "FOO", String.valueOf(100));
assertEventData(proto, "BAR", String.valueOf(200));
assertEventData(proto, ATSConstants.COUNTERS, null);
}
@Test(timeout = 5000)
public void testConvertVertexInitializedEvent() {
long initRequestedTime = random.nextLong();
long initedTime = random.nextLong();
int numTasks = random.nextInt();
VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime,
initedTime, numTasks, "proc", null, null,
new ServicePluginInfo().setContainerLauncherName("abc")
.setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
.setContainerLauncherClassName("abc1")
.setTaskSchedulerClassName("def1")
.setTaskCommunicatorClassName("ghi1"));
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.VERTEX_INITIALIZED, initedTime,
EntityTypes.TEZ_VERTEX_ID, null, null, 6);
assertEventData(proto, ATSConstants.VERTEX_NAME, "v1");
assertEventData(proto, ATSConstants.PROCESSOR_CLASS_NAME, "proc");
assertEventData(proto, ATSConstants.INIT_REQUESTED_TIME, String.valueOf(initRequestedTime));
assertEventData(proto, ATSConstants.NUM_TASKS, String.valueOf(numTasks));
assertEventData(proto, ATSConstants.SERVICE_PLUGIN, null);
/*
Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN));
Assert.assertEquals("abc",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.CONTAINER_LAUNCHER_NAME));
Assert.assertEquals("def",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_SCHEDULER_NAME));
Assert.assertEquals("ghi",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_COMMUNICATOR_NAME));
Assert.assertEquals("abc1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
Assert.assertEquals("def1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_SCHEDULER_CLASS_NAME));
Assert.assertEquals("ghi1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
*/
}
@Test(timeout = 5000)
public void testConvertVertexStartedEvent() {
long startRequestedTime = random.nextLong();
long startTime = random.nextLong();
VertexStartedEvent event = new VertexStartedEvent(tezVertexID, startRequestedTime, startTime);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.VERTEX_STARTED, startTime,
EntityTypes.TEZ_VERTEX_ID, null, null, 2);
assertEventData(proto, ATSConstants.START_REQUESTED_TIME, String.valueOf(startRequestedTime));
assertEventData(proto, ATSConstants.STATUS, VertexState.RUNNING.name());
}
@Test(timeout = 5000)
public void testConvertVertexFinishedEvent() {
String vertexName = "v1";
long initRequestedTime = random.nextLong();
long initedTime = random.nextLong();
long startRequestedTime = random.nextLong();
long startTime = random.nextLong();
long finishTime = random.nextLong();
Map<String, Integer> taskStats = new HashMap<String, Integer>();
taskStats.put("FOO", 100);
taskStats.put("BAR", 200);
VertexStats vertexStats = new VertexStats();
VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, vertexName, 1,
initRequestedTime, initedTime, startRequestedTime, startTime, finishTime,
VertexState.ERROR, "diagnostics", null, vertexStats, taskStats,
new ServicePluginInfo().setContainerLauncherName("abc")
.setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
.setContainerLauncherClassName("abc1")
.setTaskSchedulerClassName("def1")
.setTaskCommunicatorClassName("ghi1"));
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.VERTEX_FINISHED, finishTime,
EntityTypes.TEZ_VERTEX_ID, null, null, 9);
assertEventData(proto, ATSConstants.VERTEX_NAME, vertexName);
assertEventData(proto, ATSConstants.STATUS, VertexState.ERROR.name());
assertEventData(proto, ATSConstants.TIME_TAKEN, String.valueOf(finishTime - startTime));
assertEventData(proto, ATSConstants.DIAGNOSTICS, "diagnostics");
assertEventData(proto, ATSConstants.COUNTERS, null);
assertEventData(proto, ATSConstants.STATS, null);
assertEventData(proto, "FOO", "100");
assertEventData(proto, "BAR", "200");
assertEventData(proto, ATSConstants.SERVICE_PLUGIN, null);
/*
Assert.assertEquals("abc",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.CONTAINER_LAUNCHER_NAME));
Assert.assertEquals("def",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_SCHEDULER_NAME));
Assert.assertEquals("ghi",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_COMMUNICATOR_NAME));
Assert.assertEquals("abc1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
Assert.assertEquals("def1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_SCHEDULER_CLASS_NAME));
Assert.assertEquals("ghi1",
((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
*/
}
@Test(timeout = 5000)
public void testConvertTaskStartedEvent() {
long scheduleTime = random.nextLong();
long startTime = random.nextLong();
TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.TASK_STARTED, startTime,
EntityTypes.TEZ_TASK_ID, null, null, 2);
assertEventData(proto, ATSConstants.SCHEDULED_TIME, String.valueOf(scheduleTime));
assertEventData(proto, ATSConstants.STATUS, TaskState.SCHEDULED.name());
}
@Test(timeout = 5000)
public void testConvertTaskAttemptStartedEvent() {
long startTime = random.nextLong();
TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.TASK_ATTEMPT_STARTED, startTime,
EntityTypes.TEZ_TASK_ATTEMPT_ID, null, null, 6);
assertEventData(proto, ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL, "inProgressURL");
assertEventData(proto, ATSConstants.COMPLETED_LOGS_URL, "logsURL");
assertEventData(proto, ATSConstants.NODE_ID, nodeId.toString());
assertEventData(proto, ATSConstants.CONTAINER_ID, containerId.toString());
assertEventData(proto, ATSConstants.NODE_HTTP_ADDRESS, "nodeHttpAddress");
}
@Test(timeout = 5000)
public void testConvertTaskFinishedEvent() {
String vertexName = "testVertexName";
long startTime = random.nextLong();
long finishTime = random.nextLong();
TaskState state = TaskState.values()[random.nextInt(TaskState.values().length)];
String diagnostics = "diagnostics message";
TezCounters counters = new TezCounters();
TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime,
tezTaskAttemptID, state, diagnostics, counters, 3);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.TASK_FINISHED, finishTime,
EntityTypes.TEZ_TASK_ID, null, null, 6);
assertEventData(proto, ATSConstants.STATUS, state.name());
assertEventData(proto, ATSConstants.TIME_TAKEN, String.valueOf(finishTime - startTime));
assertEventData(proto, ATSConstants.SUCCESSFUL_ATTEMPT_ID, tezTaskAttemptID.toString());
assertEventData(proto, ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, "3");
assertEventData(proto, ATSConstants.DIAGNOSTICS, diagnostics);
assertEventData(proto, ATSConstants.COUNTERS, null);
}
@Test(timeout = 5000)
public void testConvertVertexReconfigreDoneEvent() {
TezVertexID vId = tezVertexID;
Map<String, EdgeProperty> edgeMgrs =
new HashMap<String, EdgeProperty>();
edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class")
.setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In")));
VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 0L, 1, null,
edgeMgrs, null, true);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.VERTEX_CONFIGURE_DONE, 0L,
EntityTypes.TEZ_VERTEX_ID, null, null, 2);
assertEventData(proto, ATSConstants.NUM_TASKS, "1");
assertEventData(proto, ATSConstants.UPDATED_EDGE_MANAGERS, null);
/*
Map<String, Object> updatedEdgeMgrs = (Map<String, Object>)
evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS);
Assert.assertEquals(1, updatedEdgeMgrs.size());
Assert.assertTrue(updatedEdgeMgrs.containsKey("a"));
Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a");
Assert.assertEquals(DataMovementType.CUSTOM.name(),
updatedEdgeMgr.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
Assert.assertEquals("In", updatedEdgeMgr.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
*/
}
@Test(timeout = 5000)
public void testConvertDAGRecoveredEvent() {
long recoverTime = random.nextLong();
DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID,
dagPlan.getName(), user, recoverTime, containerLogs);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.DAG_RECOVERED, recoverTime,
EntityTypes.TEZ_DAG_ID, applicationAttemptId, user, 2);
assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ applicationAttemptId.getAttemptId(), containerLogs);
assertEventData(proto, ATSConstants.DAG_NAME, dagPlan.getName());
}
@Test(timeout = 5000)
public void testConvertDAGRecoveredEvent2() {
long recoverTime = random.nextLong();
DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID,
dagPlan.getName(), user, recoverTime, DAGState.ERROR, "mock reason", containerLogs);
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.DAG_RECOVERED, recoverTime,
EntityTypes.TEZ_DAG_ID, applicationAttemptId, user, 4);
assertEventData(proto, ATSConstants.DAG_STATE, DAGState.ERROR.name());
assertEventData(proto, ATSConstants.RECOVERY_FAILURE_REASON, "mock reason");
assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ applicationAttemptId.getAttemptId(), containerLogs);
assertEventData(proto, ATSConstants.DAG_NAME, dagPlan.getName());
}
}