MAPREDUCE-4738. fix and re-enable disabled unit tests in the mr-app2 module. (sseth)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-3902@1400232 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-mapreduce-project/CHANGES.txt.MR-3902 b/hadoop-mapreduce-project/CHANGES.txt.MR-3902
index 8dd1657..ff8e2d8 100644
--- a/hadoop-mapreduce-project/CHANGES.txt.MR-3902
+++ b/hadoop-mapreduce-project/CHANGES.txt.MR-3902
@@ -30,3 +30,5 @@
MAPREDUCE-4727. Handle successful NM stop requests. (sseth)
MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol for Job, Task and TaskAttempt. (sseth)
+
+ MAPREDUCE-4738. fix and re-enable disabled unit tests in the mr-app2 module. (sseth)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
index 5bac505..4e0d55b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
@@ -74,14 +74,17 @@
import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app2.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app2.recover.RecoveryService;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
@@ -111,6 +114,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -178,7 +182,8 @@
private boolean newApiCommitter;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
- private JobHistoryEventHandler2 jobHistoryEventHandler;
+ private EventHandler<JobHistoryEvent> jobHistoryEventHandler;
+ private AbstractService stagingDirCleanerService;
private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private ContainerRequestor containerRequestor;
@@ -292,10 +297,9 @@
addIfService(clientService);
//service to log job history events
- EventHandler<JobHistoryEvent> historyService =
- createJobHistoryHandler(context);
+ jobHistoryEventHandler = createJobHistoryHandler(context);
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
- historyService);
+ jobHistoryEventHandler);
this.jobEventDispatcher = new JobEventDispatcher();
@@ -324,10 +328,21 @@
addIfService(containerLauncher);
dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ containerRequestor = createContainerRequestor(clientService, context);
+ addIfService(containerRequestor);
+ dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
+
+ amScheduler = createAMScheduler(containerRequestor, context);
+ addIfService(amScheduler);
+ dispatcher.register(AMSchedulerEventType.class, amScheduler);
+
// Add the staging directory cleaner before the history server but after
// the container allocator so the staging directory is cleaned after
// the history has been flushed but before unregistering with the RM.
- addService(createStagingDirCleaningService());
+ this.stagingDirCleanerService = createStagingDirCleaningService();
+ addService(stagingDirCleanerService);
+
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
@@ -335,7 +350,7 @@
// Note: Even though JobHistoryEventHandler is started last, if any
// component creates a JobHistoryEvent in the meanwhile, it will be just be
// queued inside the JobHistoryEventHandler
- addIfService(historyService);
+ addIfService(this.jobHistoryEventHandler);
super.init(conf);
} // end of init()
@@ -580,44 +595,33 @@
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext, getCommitter());
}
-
+
/**
* Create the RMContainerRequestor.
- * @param clientService the MR Client Service.
- * @param appContext the application context.
+ *
+ * @param clientService
+ * the MR Client Service.
+ * @param appContext
+ * the application context.
* @return an instance of the RMContainerRequestor.
*/
protected ContainerRequestor createContainerRequestor(
ClientService clientService, AppContext appContext) {
- ContainerRequestor containerRequestor;
- if (job.isUber()) {
- containerRequestor = new LocalContainerRequestor(clientService,
- appContext);
- } else {
- containerRequestor = new RMContainerRequestor(clientService, appContext);
- }
- return containerRequestor;
+ return new ContainerRequestorRouter(clientService, appContext);
}
/**
* Create the AM Scheduler.
*
- * @param requestor The Container Requestor.
- * @param appContext the application context.
+ * @param requestor
+ * The Container Requestor.
+ * @param appContext
+ * the application context.
* @return an instance of the AMScheduler.
*/
protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
AppContext appContext) {
- if (job.isUber()) {
- return new LocalContainerAllocator(appContext, jobId, nmHost, nmPort,
- nmHttpPort, containerID, (TaskUmbilicalProtocol) taskAttemptListener,
- taskAttemptListener, (RMCommunicator)containerRequestor);
- } else {
- // TODO XXX: This is terrible. Assuming RMContainerRequestor is sent in
- // when non-uberized. Fix RMContainerRequestor to be a proper interface, etc.
- return new RMContainerAllocator((RMContainerRequestor) requestor,
- appContext);
- }
+ return new AMSchedulerRouter(requestor, appContext);
}
/** Create and initialize (but don't start) a single job. */
@@ -681,9 +685,7 @@
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
- this.jobHistoryEventHandler = new JobHistoryEventHandler2(context,
- getStartCount());
- return this.jobHistoryEventHandler;
+ return new JobHistoryEventHandler2(context, getStartCount());
}
protected AbstractService createStagingDirCleaningService() {
@@ -810,7 +812,117 @@
public TaskAttemptListener getTaskAttemptListener() {
return taskAttemptListener;
}
-
+
+ /**
+ * By the time life-cycle of this router starts, job-init would have already
+ * happened.
+ */
+ private final class ContainerRequestorRouter extends AbstractService
+ implements ContainerRequestor {
+ private final ClientService clientService;
+ private final AppContext context;
+ private ContainerRequestor real;
+
+ public ContainerRequestorRouter(ClientService clientService,
+ AppContext appContext) {
+ super(ContainerRequestorRouter.class.getName());
+ this.clientService = clientService;
+ this.context = appContext;
+ }
+
+ @Override
+ public void start() {
+ if (job.isUber()) {
+ real = new LocalContainerRequestor(clientService,
+ context);
+ } else {
+ real = new RMContainerRequestor(clientService, context);
+ }
+ ((Service)this.real).init(getConfig());
+ ((Service)this.real).start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if (real != null) {
+ ((Service) real).stop();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(RMCommunicatorEvent event) {
+ real.handle(event);
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ return real.getAvailableResources();
+ }
+
+ @Override
+ public void addContainerReq(ContainerRequest req) {
+ real.addContainerReq(req);
+ }
+
+ @Override
+ public void decContainerReq(ContainerRequest req) {
+ real.decContainerReq(req);
+ }
+
+ public void setSignalled(boolean isSignalled) {
+ ((RMCommunicator) real).setSignalled(isSignalled);
+ }
+ }
+
+ /**
+ * By the time life-cycle of this router starts, job-init would have already
+ * happened.
+ */
+ private final class AMSchedulerRouter extends AbstractService
+ implements ContainerAllocator {
+ private final ContainerRequestor requestor;
+ private final AppContext context;
+ private ContainerAllocator containerAllocator;
+
+ AMSchedulerRouter(ContainerRequestor requestor,
+ AppContext context) {
+ super(AMSchedulerRouter.class.getName());
+ this.requestor = requestor;
+ this.context = context;
+ }
+
+ @Override
+ public synchronized void start() {
+ if (job.isUber()) {
+ this.containerAllocator = new LocalContainerAllocator(this.context,
+ jobId, nmHost, nmPort, nmHttpPort, containerID,
+ (TaskUmbilicalProtocol) taskAttemptListener, taskAttemptListener,
+ (RMCommunicator) this.requestor);
+ } else {
+ this.containerAllocator = new RMContainerAllocator(this.requestor,
+ this.context);
+ }
+ ((Service)this.containerAllocator).init(getConfig());
+ ((Service)this.containerAllocator).start();
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (containerAllocator != null) {
+ ((Service) this.containerAllocator).stop();
+ super.stop();
+ }
+ }
+
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ this.containerAllocator.handle(event);
+ }
+ }
+
public TaskHeartbeatHandler getTaskHeartbeatHandler() {
return taskHeartbeatHandler;
}
@@ -974,16 +1086,6 @@
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
- // service to allocate containers from RM (if non-uber) or to fake it (uber)
- containerRequestor = createContainerRequestor(clientService, context);
- addIfService(containerRequestor);
- ((Service)containerRequestor).init(getConfig());
- dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
-
- amScheduler = createAMScheduler(containerRequestor, context);
- addIfService(amScheduler);
- ((Service)amScheduler).init(getConfig());
- dispatcher.register(AMSchedulerEventType.class, amScheduler);
//start all the components
super.start();
@@ -1155,10 +1257,12 @@
// that they don't take too long in shutting down
// Signal the RMCommunicator.
- ((RMCommunicator)appMaster.containerRequestor).setSignalled(true);
+ ((ContainerRequestorRouter) appMaster.containerRequestor)
+ .setSignalled(true);
if(appMaster.jobHistoryEventHandler != null) {
- appMaster.jobHistoryEventHandler.setSignalled(true);
+ ((JobHistoryEventHandler2) appMaster.jobHistoryEventHandler)
+ .setSignalled(true);
}
appMaster.stop();
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java
index 10c2fbb..8800170 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java
@@ -20,7 +20,6 @@
import java.net.InetSocketAddress;
-import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java
index e7d21a9..2641a2d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
@@ -346,8 +347,7 @@
appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
- new TaskAttemptEvent(taskAttemptId,
- TaskAttemptEventType.TA_FAIL_REQUEST));
+ new TaskAttemptEventFailRequest(taskAttemptId, message));
FailTaskAttemptResponse response = recordFactory.
newRecordInstance(FailTaskAttemptResponse.class);
return response;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java
index ca77bc9..2767ea8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java
@@ -29,8 +29,8 @@
this.message = message;
}
+ // TODO: This is not used at the moment.
public String getMessage() {
return this.message;
}
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventKillRequest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventKillRequest.java
index 42b94e2..fdd6a38 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventKillRequest.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventKillRequest.java
@@ -11,6 +11,7 @@
this.message = message;
}
+ // TODO: This is not used at the moment.
public String getMessage() {
return this.message;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptScheduleEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptScheduleEvent.java
index 3c3dcdf..911446c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptScheduleEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptScheduleEvent.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package org.apache.hadoop.mapreduce.v2.app2.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -5,14 +22,14 @@
public class TaskAttemptScheduleEvent extends TaskAttemptEvent {
private final boolean rescheduled;
-
- public TaskAttemptScheduleEvent(TaskAttemptId id, TaskAttemptEventType type, boolean rescheduled) {
- super(id, type);
+
+ public TaskAttemptScheduleEvent(TaskAttemptId id, boolean rescheduled) {
+ super(id, TaskAttemptEventType.TA_SCHEDULE);
this.rescheduled = rescheduled;
}
public boolean isRescheduled() {
return this.rescheduled;
}
-
+
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
index f05885f..e5e08cf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
@@ -77,8 +77,8 @@
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -219,10 +219,10 @@
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
.addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
.addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
// TODO XXX: FailRequest / KillRequest at SUCCEEDED need to consider Map / Reduce task.
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -801,6 +801,10 @@
sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
}
+ protected String[] resolveHosts(String[] src) {
+ return TaskAttemptImplHelpers.resolveHosts(src);
+ }
+
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createScheduleTransition() {
return new ScheduleTaskattempt();
@@ -836,7 +840,7 @@
for (String host : ta.dataLocalHosts) {
racks.add(RackResolver.resolve(host).getNetworkLocation());
}
- hostArray = TaskAttemptImplHelpers.resolveHosts(ta.dataLocalHosts);
+ hostArray = ta.resolveHosts(ta.dataLocalHosts);
rackArray = racks.toArray(new String[racks.size()]);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
index 2117279..4205be4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
@@ -67,7 +67,6 @@
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptScheduleEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
@@ -615,7 +614,7 @@
++numberUncompletedAttempts;
//schedule the nextAttemptNumber
eventHandler.handle(new TaskAttemptScheduleEvent(attempt.getID(),
- TaskAttemptEventType.TA_SCHEDULE, failedAttempts > 0));
+ failedAttempts > 0));
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
index 1b0d2e6..3a20de6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
@@ -31,11 +31,13 @@
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -125,4 +127,19 @@
break;
}
}
+
+ @Override
+ public Resource getAvailableResources() {
+ throw new YarnException("Unexpected call to getAvailableResource");
+ }
+
+ @Override
+ public void addContainerReq(ContainerRequest req) {
+ throw new YarnException("Unexpected call to addContainerReq");
+ }
+
+ @Override
+ public void decContainerReq(ContainerRequest req) {
+ throw new YarnException("Unexpected call to decContainerReq");
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
index 1574a69..be22762 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
@@ -1,27 +1,31 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.mapreduce.v2.app2.rm;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
public interface ContainerRequestor extends EventHandler<RMCommunicatorEvent> {
-
-
+ public Resource getAvailableResources();
+ public void addContainerReq(ContainerRequest req);
+
+ public void decContainerReq(ContainerRequest req);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
index d99ea59..016090f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
@@ -109,7 +109,7 @@
protected final Clock clock;
protected Job job;
protected final JobId jobId;
- private final RMContainerRequestor requestor;
+ private final ContainerRequestor requestor;
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
private final AMContainerMap containerMap;
@@ -180,7 +180,7 @@
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
- public RMContainerAllocator(RMContainerRequestor requestor,
+ public RMContainerAllocator(ContainerRequestor requestor,
AppContext appContext) {
super("RMContainerAllocator");
this.requestor = requestor;
@@ -188,12 +188,13 @@
this.clock = appContext.getClock();
this.eventHandler = appContext.getEventHandler();
ApplicationId appId = appContext.getApplicationID();
- // JobId should not be required here.
- // Currently used for error notification, clc construction, etc. Should not be
+ // JobId should not be required here.
+ // Currently used for error notification, clc construction, etc. Should not
+ // be
JobID id = TypeConverter.fromYarn(appId);
JobId jobId = TypeConverter.toYarn(id);
this.jobId = jobId;
-
+
this.containerMap = appContext.getAllContainers();
}
@@ -531,10 +532,11 @@
AMSchedulerTALaunchRequestEvent event, TaskType taskType,
int prevComputedSize) {
if (prevComputedSize == 0) {
- int supportedMaxContainerCapability = requestor
+ int supportedMaxContainerCapability = appContext.getClusterInfo()
.getMaxContainerCapability().getMemory();
prevComputedSize = event.getCapability().getMemory();
- int minSlotMemSize = requestor.getMinContainerCapability().getMemory();
+ int minSlotMemSize = appContext.getClusterInfo()
+ .getMinContainerCapability().getMemory();
prevComputedSize = (int) Math.ceil((float) prevComputedSize
/ minSlotMemSize)
* minSlotMemSize;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
index 4d546e3..64ebc92 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
@@ -143,7 +143,8 @@
super.stop();
}
- protected Resource getAvailableResources() {
+ @Override
+ public Resource getAvailableResources() {
return availableResources;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
index 1f864ff..19acb72 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
@@ -60,6 +60,8 @@
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+
public class AMContainerHelpers {
private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
@@ -221,7 +223,8 @@
return container;
}
- static ContainerLaunchContext createContainerLaunchContext(
+ @VisibleForTesting
+ public static ContainerLaunchContext createContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs,
ContainerId containerID, JobConf jobConf, TaskType taskType,
Token<JobTokenIdentifier> jobToken,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
index 35c4af0..bd00474 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
@@ -1 +1 @@
-org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo
+org.apache.hadoop.mapreduce.v2.app2.MRClientSecurityInfo
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
index c8d6803..55f245c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
@@ -65,6 +65,7 @@
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
@@ -516,8 +517,8 @@
}
// appAcls and attemptToContainerIdMap shared between various mocks.
- private Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
- private Map<TaskAttemptId, ContainerId> attemptToContainerIdMap = new HashMap<TaskAttemptId, ContainerId>();
+ protected Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+ protected Map<TaskAttemptId, ContainerId> attemptToContainerIdMap = new HashMap<TaskAttemptId, ContainerId>();
protected class MockContainerLauncher implements ContainerLauncher {
@@ -620,7 +621,8 @@
return new MRAppAMScheduler();
}
- protected class MRAppAMScheduler extends AbstractService implements ContainerAllocator{
+ protected class MRAppAMScheduler extends AbstractService implements
+ ContainerAllocator {
private int containerCount;
MRAppAMScheduler() {
@@ -845,8 +847,7 @@
"Kill requested"));
} else if (finalState == TaskAttemptState.FAILED) {
getContext().getEventHandler().handle(
- new TaskAttemptEvent(taskAttemptId,
- TaskAttemptEventType.TA_FAIL_REQUEST));
+ new TaskAttemptEventFailRequest(taskAttemptId, null));
}
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
index e4d96a2..779e1f0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
@@ -24,14 +24,26 @@
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
-import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -46,8 +58,6 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -58,8 +68,6 @@
public class MRAppBenchmark {
- private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
/**
* Runs memory and time benchmark with Mock MRApp.
*/
@@ -108,8 +116,8 @@
}
@Override
- protected ContainerAllocator createContainerAllocator(
- ClientService clientService, AppContext context) {
+ protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+ AppContext appContext) {
return new ThrottledContainerAllocator();
}
@@ -117,13 +125,13 @@
implements ContainerAllocator {
private int containerCount;
private Thread thread;
- private BlockingQueue<ContainerAllocatorEvent> eventQueue =
- new LinkedBlockingQueue<ContainerAllocatorEvent>();
+ private BlockingQueue<AMSchedulerEvent> eventQueue =
+ new LinkedBlockingQueue<AMSchedulerEvent>();
public ThrottledContainerAllocator() {
super("ThrottledContainerAllocator");
}
@Override
- public void handle(ContainerAllocatorEvent event) {
+ public void handle(AMSchedulerEvent event) {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
@@ -133,34 +141,72 @@
@Override
public void start() {
thread = new Thread(new Runnable() {
+ @SuppressWarnings("unchecked")
@Override
public void run() {
- ContainerAllocatorEvent event = null;
+ AMSchedulerEvent event = null;
while (!Thread.currentThread().isInterrupted()) {
try {
if (concurrentRunningTasks < maxConcurrentRunningTasks) {
event = eventQueue.take();
- ContainerId cId =
- recordFactory.newRecordInstance(ContainerId.class);
- cId.setApplicationAttemptId(
- getContext().getApplicationAttemptId());
- cId.setId(containerCount++);
- //System.out.println("Allocating " + containerCount);
-
- Container container =
- recordFactory.newRecordInstance(Container.class);
- container.setId(cId);
- NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
- nodeId.setHost("dummy");
- nodeId.setPort(1234);
- container.setNodeId(nodeId);
- container.setContainerToken(null);
- container.setNodeHttpAddress("localhost:8042");
- getContext().getEventHandler()
- .handle(
- new TaskAttemptContainerAssignedEvent(event
- .getAttemptID(), container, null));
- concurrentRunningTasks++;
+ switch(event.getType()) {
+ case S_TA_LAUNCH_REQUEST:
+ AMSchedulerTALaunchRequestEvent lEvent = (AMSchedulerTALaunchRequestEvent)event;
+ ContainerId cId = Records.newRecord(ContainerId.class);
+ cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
+ cId.setId(containerCount++);
+ NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
+ Container container = BuilderUtils.newContainer(cId, nodeId,
+ NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+
+ getContext().getAllContainers().addContainerIfNew(container);
+ getContext().getAllNodes().nodeSeen(nodeId);
+
+ JobID id = TypeConverter.fromYarn(getContext().getApplicationID());
+ JobId jobId = TypeConverter.toYarn(id);
+
+ attemptToContainerIdMap.put(lEvent.getAttemptID(), cId);
+ if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
+
+ AMContainerLaunchRequestEvent lrEvent = new AMContainerLaunchRequestEvent(
+ cId, jobId, lEvent.getAttemptID().getTaskId().getTaskType(),
+ lEvent.getJobToken(), lEvent.getCredentials(), false,
+ new JobConf(getContext().getJob(jobId).getConf()));
+ getContext().getEventHandler().handle(lrEvent);
+ }
+
+ getContext().getEventHandler().handle(
+ new AMContainerAssignTAEvent(cId, lEvent.getAttemptID(), lEvent
+ .getRemoteTask()));
+ concurrentRunningTasks++;
+ break;
+
+ case S_TA_ENDED:
+ // Send out a Container_stop_request.
+ AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) event;
+ switch (sEvent.getState()) {
+ case FAILED:
+ case KILLED:
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(attemptToContainerIdMap.remove(sEvent
+ .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+ break;
+ case SUCCEEDED:
+ // No re-use in MRApp. Stop the container.
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(attemptToContainerIdMap.remove(sEvent
+ .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+ break;
+ default:
+ throw new YarnException("Unexpected state: " + sEvent.getState());
+ }
+ case S_CONTAINERS_ALLOCATED:
+ break;
+ case S_CONTAINER_COMPLETED:
+ break;
+ default:
+ break;
+ }
} else {
Thread.sleep(1000);
}
@@ -192,9 +238,16 @@
run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
@Override
- protected ContainerAllocator createContainerAllocator(
- ClientService clientService, AppContext context) {
- return new RMContainerAllocator(clientService, context) {
+ protected ContainerAllocator createAMScheduler(
+ ContainerRequestor requestor, AppContext appContext) {
+ return new RMContainerAllocator((RMContainerRequestor) requestor,
+ appContext);
+ }
+
+ @Override
+ protected ContainerRequestor createContainerRequestor(
+ ClientService clientService, AppContext appContext) {
+ return new RMContainerRequestor(clientService, appContext) {
@Override
protected AMRMProtocol createSchedulerProxy() {
return new AMRMProtocol() {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java
index 3afa023..73f3df7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.mapreduce.v2.app2.job.Task;
import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
@@ -123,10 +124,8 @@
/////////// Play some games with the TaskAttempts of the first task //////
//send the fail signal to the 1st map task attempt
- app.getContext().getEventHandler().handle(
- new TaskAttemptEvent(
- task1Attempt1.getID(),
- TaskAttemptEventType.TA_FAIL_REQUEST));
+ app.getContext().getEventHandler()
+ .handle(new TaskAttemptEventFailRequest(task1Attempt1.getID(), null));
app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java
index ff4cefe..f1bf372 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -28,7 +29,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -36,23 +36,18 @@
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapTaskAttemptImpl;
-import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapred.MapTaskAttemptImpl2;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -66,33 +61,34 @@
import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app2.MRApp;
import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
import org.apache.hadoop.mapreduce.v2.app2.job.Task;
import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptScheduleEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -101,82 +97,6 @@
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestTaskAttempt{
- @Test
- public void testAttemptContainerRequest() throws Exception {
- //WARNING: This test must run first. This is because there is an
- // optimization where the credentials passed in are cached statically so
- // they do not need to be recomputed when creating a new
- // ContainerLaunchContext. if other tests run first this code will cache
- // their credentials and this test will fail trying to look for the
- // credentials it inserted in.
- final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
- final byte[] SECRET_KEY = ("secretkey").getBytes();
- Map<ApplicationAccessType, String> acls =
- new HashMap<ApplicationAccessType, String>(1);
- acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
- ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
- JobId jobId = MRBuilderUtils.newJobId(appId, 1);
- TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
- Path jobFile = mock(Path.class);
-
- EventHandler eventHandler = mock(EventHandler.class);
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-
- JobConf jobConf = new JobConf();
- jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
- jobConf.setBoolean("fs.file.impl.disable.cache", true);
- jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-
- // setup UGI for security so tokens and keys are preserved
- jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- UserGroupInformation.setConfiguration(jobConf);
-
- Credentials credentials = new Credentials();
- credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
- Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
- ("tokenid").getBytes(), ("tokenpw").getBytes(),
- new Text("tokenkind"), new Text("tokenservice"));
-
- TaskAttemptImpl taImpl =
- new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
- mock(TaskSplitMetaInfo.class), jobConf, taListener,
- mock(OutputCommitter.class), jobToken, credentials,
- new SystemClock(), null);
-
- jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
- ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
-
- ContainerLaunchContext launchCtx =
- TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
- jobConf, jobToken, taImpl.createRemoteTask(),
- TypeConverter.fromYarn(jobId), mock(Resource.class),
- mock(WrappedJvmID.class), taListener,
- credentials);
-
- Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
- Credentials launchCredentials = new Credentials();
-
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(launchCtx.getContainerTokens());
- launchCredentials.readTokenStorageStream(dibb);
-
- // verify all tokens specified for the task attempt are in the launch context
- for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
- Token<? extends TokenIdentifier> launchToken =
- launchCredentials.getToken(token.getService());
- Assert.assertNotNull("Token " + token.getService() + " is missing",
- launchToken);
- Assert.assertEquals("Token " + token.getService() + " mismatch",
- token, launchToken);
- }
-
- // verify the secret key is in the launch context
- Assert.assertNotNull("Secret key missing",
- launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
- Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY,
- launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
- }
static public class StubbedFS extends RawLocalFileSystem {
@Override
@@ -199,67 +119,66 @@
@Test
public void testSingleRackRequest() throws Exception {
- TaskAttemptImpl.RequestContainerTransition rct =
- new TaskAttemptImpl.RequestContainerTransition(false);
+ TaskAttemptImpl.ScheduleTaskattempt sta = new TaskAttemptImpl.ScheduleTaskattempt();
EventHandler eventHandler = mock(EventHandler.class);
String[] hosts = new String[3];
hosts[0] = "host1";
hosts[1] = "host2";
hosts[2] = "host3";
- TaskSplitMetaInfo splitInfo =
- new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+ TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0,
+ 128 * 1024 * 1024l);
- TaskAttemptImpl mockTaskAttempt =
- createMapTaskAttemptImplForTest(eventHandler, splitInfo);
- TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+ TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImpl2ForTest(
+ eventHandler, splitInfo);
+ TaskAttemptScheduleEvent mockTAEvent = mock(TaskAttemptScheduleEvent.class);
+ doReturn(false).when(mockTAEvent).isRescheduled();
- rct.transition(mockTaskAttempt, mockTAEvent);
+ sta.transition(mockTaskAttempt, mockTAEvent);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(2)).handle(arg.capture());
- if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+ if (!(arg.getAllValues().get(1) instanceof AMSchedulerTALaunchRequestEvent)) {
Assert.fail("Second Event not of type ContainerRequestEvent");
}
- ContainerRequestEvent cre =
- (ContainerRequestEvent) arg.getAllValues().get(1);
- String[] requestedRacks = cre.getRacks();
- //Only a single occurrence of /DefaultRack
+ AMSchedulerTALaunchRequestEvent tlrE = (AMSchedulerTALaunchRequestEvent) arg
+ .getAllValues().get(1);
+ String[] requestedRacks = tlrE.getRacks();
+ // Only a single occurrence of /DefaultRack
assertEquals(1, requestedRacks.length);
}
-
+
@Test
public void testHostResolveAttempt() throws Exception {
- TaskAttemptImpl.RequestContainerTransition rct =
- new TaskAttemptImpl.RequestContainerTransition(false);
+ TaskAttemptImpl.ScheduleTaskattempt sta = new TaskAttemptImpl.ScheduleTaskattempt();
EventHandler eventHandler = mock(EventHandler.class);
- String[] hosts = new String[3];
- hosts[0] = "192.168.1.1";
- hosts[1] = "host2";
- hosts[2] = "host3";
+ String hosts[] = new String[] {"192.168.1.1", "host2", "host3"};
+ String resolved[] = new String[] {"host1", "host2", "host3"};
TaskSplitMetaInfo splitInfo =
new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
TaskAttemptImpl mockTaskAttempt =
- createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+ createMapTaskAttemptImpl2ForTest(eventHandler, splitInfo);
TaskAttemptImpl spyTa = spy(mockTaskAttempt);
- when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
+ when(spyTa.resolveHosts(hosts)).thenReturn(resolved);
- TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
- rct.transition(spyTa, mockTAEvent);
- verify(spyTa).resolveHost(hosts[0]);
+ TaskAttemptScheduleEvent mockTAEvent = mock(TaskAttemptScheduleEvent.class);
+ doReturn(false).when(mockTAEvent).isRescheduled();
+
+ sta.transition(spyTa, mockTAEvent);
+ verify(spyTa).resolveHosts(hosts);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(2)).handle(arg.capture());
- if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+ if (!(arg.getAllValues().get(1) instanceof AMSchedulerTALaunchRequestEvent)) {
Assert.fail("Second Event not of type ContainerRequestEvent");
}
Map<String, Boolean> expected = new HashMap<String, Boolean>();
expected.put("host1", true);
expected.put("host2", true);
expected.put("host3", true);
- ContainerRequestEvent cre =
- (ContainerRequestEvent) arg.getAllValues().get(1);
+ AMSchedulerTALaunchRequestEvent cre =
+ (AMSchedulerTALaunchRequestEvent) arg.getAllValues().get(1);
String[] requestedHosts = cre.getHosts();
for (String h : requestedHosts) {
expected.remove(h);
@@ -326,13 +245,13 @@
.getValue());
}
- private TaskAttemptImpl createMapTaskAttemptImplForTest(
+ private TaskAttemptImpl createMapTaskAttemptImpl2ForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = new SystemClock();
- return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
+ return createMapTaskAttemptImpl2ForTest(eventHandler, taskSplitMetaInfo, clock);
}
- private TaskAttemptImpl createMapTaskAttemptImplForTest(
+ private TaskAttemptImpl createMapTaskAttemptImpl2ForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
@@ -342,9 +261,9 @@
JobConf jobConf = new JobConf();
OutputCommitter outputCommitter = mock(OutputCommitter.class);
TaskAttemptImpl taImpl =
- new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ new MapTaskAttemptImpl2(taskId, 1, eventHandler, jobFile, 1,
taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
- null, clock, null);
+ null, clock, mock(TaskHeartbeatHandler.class), null);
return taImpl;
}
@@ -384,9 +303,10 @@
new TaskAttemptDiagnosticsUpdateEvent(attemptID,
"Test Diagnostic Event"));
getContext().getEventHandler().handle(
- new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptEventFailRequest(attemptID, "Test Diagnostic Event"));
}
+ // TODO XXX: This will execute in a separate thread. The assert is not very useful.
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new EventHandler<JobHistoryEvent>() {
@@ -402,7 +322,7 @@
};
}
}
-
+
@Test
public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
@@ -426,11 +346,14 @@
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+ AppContext mockAppContext = mock(AppContext.class);
+ doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
+
TaskAttemptImpl taImpl =
- new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ new MapTaskAttemptImpl2(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
- new SystemClock(), null);
+ new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
@@ -438,24 +361,20 @@
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_SCHEDULE));
- taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
- container, mock(Map.class)));
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_KILL));
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+ taImpl.handle(new TaskAttemptScheduleEvent(attemptId, false));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptEventKillRequest(attemptId, null));
+ // At some KILLING state.
+ taImpl.handle(new TaskAttemptEventContainerTerminating(attemptId, null));
assertFalse(eventHandler.internalError);
}
-
+
+ // TODO Add a similar test for TERMINATING.
@Test
- public void testContainerCleanedWhileRunning() throws Exception {
+ public void testContainerTerminatedWhileRunning() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
- ApplicationAttemptId appAttemptId =
- BuilderUtils.newApplicationAttemptId(appId, 0);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@@ -463,7 +382,8 @@
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -472,20 +392,7 @@
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
- when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-
- AppContext appCtx = mock(AppContext.class);
- ClusterInfo clusterInfo = mock(ClusterInfo.class);
- Resource resource = mock(Resource.class);
- when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
- when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
- when(resource.getMemory()).thenReturn(1024);
-
- TaskAttemptImpl taImpl =
- new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
- splits, jobConf, taListener,
- mock(OutputCommitter.class), mock(Token.class), new Credentials(),
- new SystemClock(), appCtx);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
@@ -494,83 +401,96 @@
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_SCHEDULE));
- taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
- container, mock(Map.class)));
- taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ AppContext appCtx = mock(AppContext.class);
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ appCtx);
+ containers.addContainerIfNew(container);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl2(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+ mock(Token.class), new Credentials(), new SystemClock(),
+ mock(TaskHeartbeatHandler.class), appCtx);
+
+ taImpl.handle(new TaskAttemptScheduleEvent(attemptId, false));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptRemoteStartEvent(attemptId, contId, null, -1));
+ assertEquals("Task attempt is not in running state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+ taImpl.handle(new TaskAttemptEventContainerTerminated(attemptId, null));
+ assertFalse(
+ "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
+ eventHandler.internalError);
+ }
+
+ @Test
+ public void testContainerTerminatedWhileCommitting() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AppContext appCtx = mock(AppContext.class);
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ appCtx);
+ containers.addContainerIfNew(container);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl2(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+ mock(Token.class), new Credentials(), new SystemClock(),
+ mock(TaskHeartbeatHandler.class), appCtx);
+
+ taImpl.handle(new TaskAttemptScheduleEvent(attemptId, false));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptRemoteStartEvent(attemptId, contId, null, -1));
assertEquals("Task attempt is not in running state", taImpl.getState(),
TaskAttemptState.RUNNING);
taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
- assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
- eventHandler.internalError);
- }
-
- @Test
- public void testContainerCleanedWhileCommitting() throws Exception {
- ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
- ApplicationAttemptId appAttemptId =
- BuilderUtils.newApplicationAttemptId(appId, 0);
- JobId jobId = MRBuilderUtils.newJobId(appId, 1);
- TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
- TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
- Path jobFile = mock(Path.class);
-
- MockEventHandler eventHandler = new MockEventHandler();
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-
- JobConf jobConf = new JobConf();
- jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
- jobConf.setBoolean("fs.file.impl.disable.cache", true);
- jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
- jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-
- TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
- when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-
- AppContext appCtx = mock(AppContext.class);
- ClusterInfo clusterInfo = mock(ClusterInfo.class);
- Resource resource = mock(Resource.class);
- when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
- when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
- when(resource.getMemory()).thenReturn(1024);
-
- TaskAttemptImpl taImpl =
- new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
- splits, jobConf, taListener,
- mock(OutputCommitter.class), mock(Token.class), new Credentials(),
- new SystemClock(), appCtx);
-
- NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
- ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
- Container container = mock(Container.class);
- when(container.getId()).thenReturn(contId);
- when(container.getNodeId()).thenReturn(nid);
- when(container.getNodeHttpAddress()).thenReturn("localhost:0");
-
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_SCHEDULE));
- taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
- container, mock(Map.class)));
- taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
- taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_COMMIT_PENDING));
-
- assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
- TaskAttemptState.COMMIT_PENDING);
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
- assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ assertEquals("Task attempt is not in commit pending state",
+ taImpl.getState(), TaskAttemptState.COMMIT_PENDING);
+ taImpl.handle(new TaskAttemptEventContainerTerminated(attemptId, null));
+ assertFalse(
+ "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
eventHandler.internalError);
}
-
+
@Test
- public void testDoubleTooManyFetchFailure() throws Exception {
+ public void testMultipleTooManyFetchFailures() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
- ApplicationAttemptId appAttemptId =
- BuilderUtils.newApplicationAttemptId(appId, 0);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@@ -578,7 +498,8 @@
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -587,20 +508,7 @@
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
- when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-
- AppContext appCtx = mock(AppContext.class);
- ClusterInfo clusterInfo = mock(ClusterInfo.class);
- Resource resource = mock(Resource.class);
- when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
- when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
- when(resource.getMemory()).thenReturn(1024);
-
- TaskAttemptImpl taImpl =
- new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
- splits, jobConf, taListener,
- mock(OutputCommitter.class), mock(Token.class), new Credentials(),
- new SystemClock(), appCtx);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
@@ -609,27 +517,36 @@
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_SCHEDULE));
- taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
- container, mock(Map.class)));
- taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_DONE));
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
-
+ AppContext appCtx = mock(AppContext.class);
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ appCtx);
+ containers.addContainerIfNew(container);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl2(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+ mock(Token.class), new Credentials(), new SystemClock(),
+ mock(TaskHeartbeatHandler.class), appCtx);
+
+ taImpl.handle(new TaskAttemptScheduleEvent(attemptId, false));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptRemoteStartEvent(attemptId, contId, null, -1));
+ taImpl
+ .handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
- assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
- TaskAttemptState.FAILED);
- assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
+ assertEquals("Task attempt is not in FAILED state, still",
+ taImpl.getState(), TaskAttemptState.FAILED);
+ assertFalse("InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES",
eventHandler.internalError);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
index 0e452d4..ec8010a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
@@ -71,6 +71,7 @@
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -801,7 +802,7 @@
static class RMContainerAllocatorForTest extends RMContainerAllocator {
- public RMContainerAllocatorForTest(RMContainerRequestor requestor,
+ public RMContainerAllocatorForTest(ContainerRequestor requestor,
AppContext appContext) {
super(requestor, appContext);
}
@@ -840,7 +841,7 @@
boolean recalculatedReduceSchedule = false;
- public RecalculateContainerAllocator(RMContainerRequestor requestor,
+ public RecalculateContainerAllocator(ContainerRequestor requestor,
AppContext appContext) {
super(requestor, appContext);
}
@@ -899,7 +900,7 @@
}
@Override
- protected Resource getAvailableResources() {
+ public Resource getAvailableResources() {
return BuilderUtils.newResource(0);
}
@@ -995,7 +996,13 @@
when(appContext.getJob(jobId)).thenReturn(mockJob);
when(appContext.getClock()).thenReturn(clock);
when(appContext.getAllNodes()).thenReturn(amNodeMap);
+ when(appContext.getClusterInfo()).thenReturn(
+ new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
+ .newResource(10240)));
return appContext;
}
+
+ // TODO Add a unit test to verify a correct launchContainer invocation with
+ // security.
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/TestAMContainerHelpers.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/TestAMContainerHelpers.java
new file mode 100644
index 0000000..af0b8d1
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/TestAMContainerHelpers.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestAMContainerHelpers {
+
+ // WARNING: This test must be the only test in this file. This is because
+ // there is an optimization where the credentials passed in are cached
+ // statically so they do not need to be recomputed when creating a new
+ // ContainerLaunchContext. if other tests run first this code will cache
+ // their credentials and this test will fail trying to look for the
+ // credentials it inserted in.
+
+ @Test
+ public void testCLCConstruction() throws Exception {
+ final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
+ final byte[] SECRET_KEY = ("secretkey").getBytes();
+ Map<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>(
+ 1);
+ acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+
+ // setup UGI for security so tokens and keys are preserved
+ jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(jobConf);
+
+ Credentials credentials = new Credentials();
+ credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
+ Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+ ("tokenid").getBytes(), ("tokenpw").getBytes(), new Text("tokenkind"),
+ new Text("tokenservice"));
+
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, BuilderUtils
+ .newApplicationAttemptId(appId, 1).toString());
+ ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+
+ ContainerLaunchContext launchCtx = AMContainerHelpers
+ .createContainerLaunchContext(acls, containerId, jobConf, TaskType.MAP,
+ jobToken, TypeConverter.fromYarn(jobId), mock(Resource.class),
+ mock(WrappedJvmID.class), taListener, credentials, false);
+
+ Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
+ Credentials launchCredentials = new Credentials();
+
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(launchCtx.getContainerTokens());
+ launchCredentials.readTokenStorageStream(dibb);
+
+ // verify all tokens specified for the task attempt are in the launch
+ // context
+ for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
+ Token<? extends TokenIdentifier> launchToken = launchCredentials
+ .getToken(token.getService());
+ Assert.assertNotNull("Token " + token.getService() + " is missing",
+ launchToken);
+ Assert.assertEquals("Token " + token.getService() + " mismatch", token,
+ launchToken);
+ }
+
+ // verify the secret key is in the launch context
+ Assert.assertNotNull("Secret key missing",
+ launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
+ Assert.assertTrue(
+ "Secret key mismatch",
+ Arrays.equals(SECRET_KEY,
+ launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
+ }
+
+ static public class StubbedFS extends RawLocalFileSystem {
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return new FileStatus(1, false, 1, 1, 1, f);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebApp.java
index 7e177fe..25eee55 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebApp.java
@@ -32,9 +32,12 @@
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
import org.apache.hadoop.mapreduce.v2.app2.job.Task;
import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -113,6 +116,21 @@
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return null;
+ }
+
+ @Override
+ public AMContainerMap getAllContainers() {
+ return null;
+ }
+
+ @Override
+ public AMNodeMap getAllNodes() {
+ return null;
+ }
}
@Test public void testAppControllerIndex() {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServices.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServices.java
index d60340a..954595e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServices.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServices.java
@@ -34,8 +34,11 @@
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
import org.apache.hadoop.mapreduce.v2.app2.MockJobs;
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -141,6 +144,21 @@
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return null;
+ }
+
+ @Override
+ public AMContainerMap getAllContainers() {
+ return null;
+ }
+
+ @Override
+ public AMNodeMap getAllNodes() {
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesAttempts.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesAttempts.java
index 8492497..77ca7eb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesAttempts.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesAttempts.java
@@ -40,9 +40,12 @@
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
import org.apache.hadoop.mapreduce.v2.app2.job.Task;
import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -151,6 +154,21 @@
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return null;
+ }
+
+ @Override
+ public AMContainerMap getAllContainers() {
+ return null;
+ }
+
+ @Override
+ public AMNodeMap getAllNodes() {
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobConf.java
index 108ff56..3bfb2b0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobConf.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobConf.java
@@ -42,9 +42,12 @@
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
import org.apache.hadoop.mapreduce.v2.app2.MockJobs;
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -152,6 +155,21 @@
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return null;
+ }
+
+ @Override
+ public AMContainerMap getAllContainers() {
+ return null;
+ }
+
+ @Override
+ public AMNodeMap getAllNodes() {
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobs.java
index 31ed4db..01c065b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobs.java
@@ -40,10 +40,13 @@
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
import org.apache.hadoop.mapreduce.v2.app2.MockJobs;
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -154,6 +157,21 @@
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return null;
+ }
+
+ @Override
+ public AMContainerMap getAllContainers() {
+ return null;
+ }
+
+ @Override
+ public AMNodeMap getAllNodes() {
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesTasks.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesTasks.java
index 1637581..2ba9a58 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesTasks.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesTasks.java
@@ -38,9 +38,12 @@
import org.apache.hadoop.mapreduce.v2.app2.MockJobs;
import org.apache.hadoop.mapreduce.v2.app2.job.Job;
import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -148,6 +151,21 @@
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return null;
+ }
+
+ @Override
+ public AMContainerMap getAllContainers() {
+ return null;
+ }
+
+ @Override
+ public AMNodeMap getAllNodes() {
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {