YARN-4091. Add REST API to retrieve scheduler activity. (Chen Ge and Sunil G via wangda)
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 7c19c5e..a5c0f71 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -519,6 +519,7 @@
<Or>
<Field name="rmContext" />
<Field name="applications" />
+ <Field name="activitiesManager" />
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
@@ -552,4 +553,15 @@
<Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivityNodeInfo"/>
+ <Or>
+ <Field name="allocationState" />
+ <Field name="diagnostic" />
+ <Field name="name" />
+ <Field name="requestPriority" />
+ <Field name="appPriority" />
+ </Or>
+ <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
+ </Match>
</FindBugsFilter>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 64eb777..755defd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
@@ -97,6 +98,8 @@
private volatile Priority maxClusterLevelAppPriority;
+ protected ActivitiesManager activitiesManager;
+
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
@@ -789,4 +792,9 @@
}
return schedulerChangeRequests;
}
+
+ public ActivitiesManager getActivitiesManager() {
+ return this.activitiesManager;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
new file mode 100644
index 0000000..8fa1bb5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * Utility for logging scheduler activities
+ */
+public class ActivitiesLogger {
+ private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class);
+
+ /**
+ * Methods for recording activities from an app
+ */
+ public static class APP {
+
+ /*
+ * Record skipped application activity when no container allocated /
+ * reserved / re-reserved. Scheduler will look at following applications
+ * within the same leaf queue.
+ */
+ public static void recordSkippedAppActivityWithoutAllocation(
+ ActivitiesManager activitiesManager, SchedulerNode node,
+ SchedulerApplicationAttempt application, Priority priority,
+ String diagnostic) {
+ recordAppActivityWithoutAllocation(activitiesManager, node, application,
+ priority, diagnostic, ActivityState.SKIPPED);
+ }
+
+ /*
+ * Record application activity when rejected because of queue maximum
+ * capacity or user limit.
+ */
+ public static void recordRejectedAppActivityFromLeafQueue(
+ ActivitiesManager activitiesManager, SchedulerNode node,
+ SchedulerApplicationAttempt application, Priority priority,
+ String diagnostic) {
+ String type = "app";
+ recordActivity(activitiesManager, node, application.getQueueName(),
+ application.getApplicationId().toString(), priority,
+ ActivityState.REJECTED, diagnostic, type);
+ finishSkippedAppAllocationRecording(activitiesManager,
+ application.getApplicationId(), ActivityState.REJECTED, diagnostic);
+ }
+
+ /*
+ * Record application activity when no container allocated /
+ * reserved / re-reserved. Scheduler will look at following applications
+ * within the same leaf queue.
+ */
+ public static void recordAppActivityWithoutAllocation(
+ ActivitiesManager activitiesManager, SchedulerNode node,
+ SchedulerApplicationAttempt application, Priority priority,
+ String diagnostic, ActivityState appState) {
+ if (activitiesManager == null) {
+ return;
+ }
+ if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+ String type = "container";
+ // Add application-container activity into specific node allocation.
+ activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ application.getApplicationId().toString(), null,
+ priority.toString(), ActivityState.SKIPPED, diagnostic, type);
+ type = "app";
+ // Add queue-application activity into specific node allocation.
+ activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ application.getQueueName(),
+ application.getApplicationId().toString(),
+ application.getPriority().toString(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.EMPTY, type);
+ }
+ // Add application-container activity into specific application allocation
+ // Under this condition, it fails to allocate a container to this
+ // application, so containerId is null.
+ if (activitiesManager.shouldRecordThisApp(
+ application.getApplicationId())) {
+ String type = "container";
+ activitiesManager.addSchedulingActivityForApp(
+ application.getApplicationId(), null, priority.toString(), appState,
+ diagnostic, type);
+ }
+ }
+
+ /*
+ * Record application activity when container allocated / reserved /
+ * re-reserved
+ */
+ public static void recordAppActivityWithAllocation(
+ ActivitiesManager activitiesManager, SchedulerNode node,
+ SchedulerApplicationAttempt application, Container updatedContainer,
+ ActivityState activityState) {
+ if (activitiesManager == null) {
+ return;
+ }
+ if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+ String type = "container";
+ // Add application-container activity into specific node allocation.
+ activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ application.getApplicationId().toString(),
+ updatedContainer.getId().toString(),
+ updatedContainer.getPriority().toString(), activityState,
+ ActivityDiagnosticConstant.EMPTY, type);
+ type = "app";
+ // Add queue-application activity into specific node allocation.
+ activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ application.getQueueName(),
+ application.getApplicationId().toString(),
+ application.getPriority().toString(), ActivityState.ACCEPTED,
+ ActivityDiagnosticConstant.EMPTY, type);
+ }
+ // Add application-container activity into specific application allocation
+ if (activitiesManager.shouldRecordThisApp(
+ application.getApplicationId())) {
+ String type = "container";
+ activitiesManager.addSchedulingActivityForApp(
+ application.getApplicationId(), updatedContainer.getId().toString(),
+ updatedContainer.getPriority().toString(), activityState,
+ ActivityDiagnosticConstant.EMPTY, type);
+ }
+ }
+
+ /*
+ * Invoked when scheduler starts to look at this application within one node
+ * update.
+ */
+ public static void startAppAllocationRecording(
+ ActivitiesManager activitiesManager, NodeId nodeId, long currentTime,
+ SchedulerApplicationAttempt application) {
+ if (activitiesManager == null) {
+ return;
+ }
+ activitiesManager.startAppAllocationRecording(nodeId, currentTime,
+ application);
+ }
+
+ /*
+ * Invoked when scheduler finishes looking at this application within one
+ * node update, and the app has any container allocated/reserved during
+ * this allocation.
+ */
+ public static void finishAllocatedAppAllocationRecording(
+ ActivitiesManager activitiesManager, ApplicationId applicationId,
+ ContainerId containerId, ActivityState containerState,
+ String diagnostic) {
+ if (activitiesManager == null) {
+ return;
+ }
+
+ if (activitiesManager.shouldRecordThisApp(applicationId)) {
+ activitiesManager.finishAppAllocationRecording(applicationId,
+ containerId, containerState, diagnostic);
+ }
+ }
+
+ /*
+ * Invoked when scheduler finishes looking at this application within one
+ * node update, and the app DOESN'T have any container allocated/reserved
+ * during this allocation.
+ */
+ public static void finishSkippedAppAllocationRecording(
+ ActivitiesManager activitiesManager, ApplicationId applicationId,
+ ActivityState containerState, String diagnostic) {
+ finishAllocatedAppAllocationRecording(activitiesManager, applicationId,
+ null, containerState, diagnostic);
+ }
+ }
+
+ /**
+ * Methods for recording activities from a queue
+ */
+ public static class QUEUE {
+ /*
+ * Record activities of a queue
+ */
+ public static void recordQueueActivity(ActivitiesManager activitiesManager,
+ SchedulerNode node, String parentQueueName, String queueName,
+ ActivityState state, String diagnostic) {
+ recordActivity(activitiesManager, node, parentQueueName, queueName, null,
+ state, diagnostic, null);
+ }
+ }
+
+ /**
+ * Methods for recording overall activities from one node update
+ */
+ public static class NODE {
+
+ /*
+ * Invoked when node allocation finishes, and there's NO container
+ * allocated or reserved during the allocation
+ */
+ public static void finishSkippedNodeAllocation(
+ ActivitiesManager activitiesManager, SchedulerNode node) {
+ finishAllocatedNodeAllocation(activitiesManager, node, null,
+ AllocationState.SKIPPED);
+ }
+
+ /*
+ * Invoked when node allocation finishes, and there's any container
+ * allocated or reserved during the allocation
+ */
+ public static void finishAllocatedNodeAllocation(
+ ActivitiesManager activitiesManager, SchedulerNode node,
+ ContainerId containerId, AllocationState containerState) {
+ if (activitiesManager == null) {
+ return;
+ }
+ if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+ activitiesManager.updateAllocationFinalState(node.getNodeID(),
+ containerId, containerState);
+ }
+ }
+
+ /*
+ * Invoked when node heartbeat finishes
+ */
+ public static void finishNodeUpdateRecording(
+ ActivitiesManager activitiesManager, NodeId nodeID) {
+ if (activitiesManager == null) {
+ return;
+ }
+ activitiesManager.finishNodeUpdateRecording(nodeID);
+ }
+
+ /*
+ * Invoked when node heartbeat starts
+ */
+ public static void startNodeUpdateRecording(
+ ActivitiesManager activitiesManager, NodeId nodeID) {
+ if (activitiesManager == null) {
+ return;
+ }
+ activitiesManager.startNodeUpdateRecording(nodeID);
+ }
+ }
+
+ // Add queue, application or container activity into specific node allocation.
+ private static void recordActivity(ActivitiesManager activitiesManager,
+ SchedulerNode node, String parentName, String childName,
+ Priority priority, ActivityState state, String diagnostic, String type) {
+ if (activitiesManager == null) {
+ return;
+ }
+ if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+ activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ parentName, childName, priority != null ? priority.toString() : null,
+ state, diagnostic, type);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
new file mode 100644
index 0000000..4fa5feb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+import java.util.Set;
+import java.util.*;
+import java.util.ArrayList;
+
+/**
+ * A class to store node or application allocations.
+ * It mainly contains operations for allocation start, add, update and finish.
+ */
+public class ActivitiesManager extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(ActivitiesManager.class);
+ private ConcurrentMap<NodeId, List<NodeAllocation>> recordingNodesAllocation;
+ private ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
+ private Set<NodeId> activeRecordedNodes;
+ private ConcurrentMap<ApplicationId, Long>
+ recordingAppActivitiesUntilSpecifiedTime;
+ private ConcurrentMap<ApplicationId, AppAllocation> appsAllocation;
+ private ConcurrentMap<ApplicationId, List<AppAllocation>>
+ completedAppAllocations;
+ private boolean recordNextAvailableNode = false;
+ private List<NodeAllocation> lastAvailableNodeActivities = null;
+ private Thread cleanUpThread;
+ private int timeThreshold = 600 * 1000;
+ private final RMContext rmContext;
+
+ public ActivitiesManager(RMContext rmContext) {
+ super(ActivitiesManager.class.getName());
+ recordingNodesAllocation = new ConcurrentHashMap<>();
+ completedNodeAllocations = new ConcurrentHashMap<>();
+ appsAllocation = new ConcurrentHashMap<>();
+ completedAppAllocations = new ConcurrentHashMap<>();
+ activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
+ this.rmContext = rmContext;
+ }
+
+ public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
+ if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus()
+ == FinalApplicationStatus.UNDEFINED) {
+ List<AppAllocation> allocations = completedAppAllocations.get(
+ applicationId);
+
+ return new AppActivitiesInfo(allocations, applicationId);
+ } else {
+ return new AppActivitiesInfo(
+ "fail to get application activities after finished",
+ applicationId.toString());
+ }
+ }
+
+ public ActivitiesInfo getActivitiesInfo(String nodeId) {
+ List<NodeAllocation> allocations;
+ if (nodeId == null) {
+ allocations = lastAvailableNodeActivities;
+ } else {
+ allocations = completedNodeAllocations.get(NodeId.fromString(nodeId));
+ }
+ return new ActivitiesInfo(allocations, nodeId);
+ }
+
+ public void recordNextNodeUpdateActivities(String nodeId) {
+ if (nodeId == null) {
+ recordNextAvailableNode = true;
+ } else {
+ activeRecordedNodes.add(NodeId.fromString(nodeId));
+ }
+ }
+
+ public void turnOnAppActivitiesRecording(ApplicationId applicationId,
+ double maxTime) {
+ long startTS = SystemClock.getInstance().getTime();
+ long endTS = startTS + (long) (maxTime * 1000);
+ recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ cleanUpThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
+ completedNodeAllocations.entrySet().iterator();
+ while (ite.hasNext()) {
+ Map.Entry<NodeId, List<NodeAllocation>> nodeAllocation = ite.next();
+ List<NodeAllocation> allocations = nodeAllocation.getValue();
+ long currTS = SystemClock.getInstance().getTime();
+ if (allocations.size() > 0 && allocations.get(0).getTimeStamp()
+ - currTS > timeThreshold) {
+ ite.remove();
+ }
+ }
+
+ Iterator<Map.Entry<ApplicationId, List<AppAllocation>>> iteApp =
+ completedAppAllocations.entrySet().iterator();
+ while (iteApp.hasNext()) {
+ Map.Entry<ApplicationId, List<AppAllocation>> appAllocation =
+ iteApp.next();
+ if (rmContext.getRMApps().get(appAllocation.getKey())
+ .getFinalApplicationStatus()
+ != FinalApplicationStatus.UNDEFINED) {
+ iteApp.remove();
+ }
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ });
+
+ cleanUpThread.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ cleanUpThread.interrupt();
+ super.serviceStop();
+ }
+
+ void startNodeUpdateRecording(NodeId nodeID) {
+ if (recordNextAvailableNode) {
+ recordNextNodeUpdateActivities(nodeID.toString());
+ }
+ if (activeRecordedNodes.contains(nodeID)) {
+ List<NodeAllocation> nodeAllocation = new ArrayList<>();
+ recordingNodesAllocation.put(nodeID, nodeAllocation);
+ }
+ }
+
+ void startAppAllocationRecording(NodeId nodeID, long currTS,
+ SchedulerApplicationAttempt application) {
+ ApplicationId applicationId = application.getApplicationId();
+
+ if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+ && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+ > currTS) {
+ appsAllocation.put(applicationId,
+ new AppAllocation(application.getPriority(), nodeID,
+ application.getQueueName()));
+ }
+
+ if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+ && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+ <= currTS) {
+ turnOffActivityMonitoringForApp(applicationId);
+ }
+ }
+
+ // Add queue, application or container activity into specific node allocation.
+ void addSchedulingActivityForNode(NodeId nodeID, String parentName,
+ String childName, String priority, ActivityState state, String diagnostic,
+ String type) {
+ if (shouldRecordThisNode(nodeID)) {
+ NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
+ nodeAllocation.addAllocationActivity(parentName, childName, priority,
+ state, diagnostic, type);
+ }
+ }
+
+ // Add queue, application or container activity into specific application
+ // allocation.
+ void addSchedulingActivityForApp(ApplicationId applicationId,
+ String containerId, String priority, ActivityState state,
+ String diagnostic, String type) {
+ if (shouldRecordThisApp(applicationId)) {
+ AppAllocation appAllocation = appsAllocation.get(applicationId);
+ appAllocation.addAppAllocationActivity(containerId, priority, state,
+ diagnostic, type);
+ }
+ }
+
+ // Update container allocation meta status for this node allocation.
+ // It updates general container status but not the detailed activity state
+ // in updateActivityState.
+ void updateAllocationFinalState(NodeId nodeID, ContainerId containerId,
+ AllocationState containerState) {
+ if (shouldRecordThisNode(nodeID)) {
+ NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
+ nodeAllocation.updateContainerState(containerId, containerState);
+ }
+ }
+
+ void finishAppAllocationRecording(ApplicationId applicationId,
+ ContainerId containerId, ActivityState appState, String diagnostic) {
+ if (shouldRecordThisApp(applicationId)) {
+ long currTS = SystemClock.getInstance().getTime();
+ AppAllocation appAllocation = appsAllocation.remove(applicationId);
+ appAllocation.updateAppContainerStateAndTime(containerId, appState,
+ currTS, diagnostic);
+
+ List<AppAllocation> appAllocations;
+ if (completedAppAllocations.containsKey(applicationId)) {
+ appAllocations = completedAppAllocations.get(applicationId);
+ } else {
+ appAllocations = new ArrayList<>();
+ completedAppAllocations.put(applicationId, appAllocations);
+ }
+ if (appAllocations.size() == 1000) {
+ appAllocations.remove(0);
+ }
+ appAllocations.add(appAllocation);
+
+ if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+ <= currTS) {
+ turnOffActivityMonitoringForApp(applicationId);
+ }
+ }
+ }
+
+ void finishNodeUpdateRecording(NodeId nodeID) {
+ List<NodeAllocation> value = recordingNodesAllocation.get(nodeID);
+ long timeStamp = SystemClock.getInstance().getTime();
+
+ if (value != null) {
+ if (value.size() > 0) {
+ lastAvailableNodeActivities = value;
+ for (NodeAllocation allocation : lastAvailableNodeActivities) {
+ allocation.transformToTree();
+ allocation.setTimeStamp(timeStamp);
+ }
+ if (recordNextAvailableNode) {
+ recordNextAvailableNode = false;
+ }
+ }
+
+ if (shouldRecordThisNode(nodeID)) {
+ recordingNodesAllocation.remove(nodeID);
+ completedNodeAllocations.put(nodeID, value);
+ stopRecordNodeUpdateActivities(nodeID);
+ }
+ }
+ }
+
+ boolean shouldRecordThisApp(ApplicationId applicationId) {
+ return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+ && appsAllocation.containsKey(applicationId);
+ }
+
+ boolean shouldRecordThisNode(NodeId nodeID) {
+ return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation
+ .containsKey(nodeID);
+ }
+
+ private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) {
+ List<NodeAllocation> nodeAllocations = recordingNodesAllocation.get(nodeID);
+ NodeAllocation nodeAllocation;
+ // When this node has already stored allocation activities, get the
+ // last allocation for this node.
+ if (nodeAllocations.size() != 0) {
+ nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1);
+ // When final state in last allocation is not DEFAULT, it means
+ // last allocation has finished. Create a new allocation for this node,
+ // and add it to the allocation list. Return this new allocation.
+ //
+ // When final state in last allocation is DEFAULT,
+ // it means last allocation has not finished. Just get last allocation.
+ if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) {
+ nodeAllocation = new NodeAllocation(nodeID);
+ nodeAllocations.add(nodeAllocation);
+ }
+ }
+ // When this node has not stored allocation activities,
+ // create a new allocation for this node, and add it to the allocation list.
+ // Return this new allocation.
+ else {
+ nodeAllocation = new NodeAllocation(nodeID);
+ nodeAllocations.add(nodeAllocation);
+ }
+ return nodeAllocation;
+ }
+
+ private void stopRecordNodeUpdateActivities(NodeId nodeId) {
+ activeRecordedNodes.remove(nodeId);
+ }
+
+ private void turnOffActivityMonitoringForApp(ApplicationId applicationId) {
+ recordingAppActivitiesUntilSpecifiedTime.remove(applicationId);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
new file mode 100644
index 0000000..fc4738e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of diagnostics.
+ */
+public class ActivityDiagnosticConstant {
+ // EMPTY means it does not have any diagnostic to display.
+ // In order not to show "diagnostic" line in frontend,
+ // we set the value to null.
+ public final static String EMPTY = null;
+ public final static String NOT_ABLE_TO_ACCESS_PARTITION =
+ "Not able to access partition";
+ public final static String QUEUE_DO_NOT_NEED_MORE_RESOURCE =
+ "Queue does not need more resource";
+ public final static String QUEUE_MAX_CAPACITY_LIMIT =
+ "Hit queue max-capacity limit";
+ public final static String USER_CAPACITY_MAXIMUM_LIMIT =
+ "Hit user capacity maximum limit";
+ public final static String SKIP_BLACK_LISTED_NODE = "Skip black listed node";
+ public final static String PRIORITY_SKIPPED = "Priority skipped";
+ public final static String PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST =
+ "Priority skipped because off-switch request is null";
+ public final static String
+ PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST =
+ "Priority skipped because partition of node doesn't match request";
+ public final static String SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY =
+ "Priority skipped because of relax locality is not allowed";
+ public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE =
+ "Skipping assigning to Node in Ignore Exclusivity mode";
+ public final static String DO_NOT_NEED_ALLOCATIONATTEMPTINFOS =
+ "Doesn't need containers based on reservation algo!";
+ public final static String QUEUE_SKIPPED_HEADROOM =
+ "Queue skipped because of headroom";
+ public final static String NON_PARTITIONED_PARTITION_FIRST =
+ "Non-partitioned resource request should be scheduled to "
+ + "non-partitioned partition first";
+ public final static String SKIP_NODE_LOCAL_REQUEST =
+ "Skip node-local request";
+ public final static String SKIP_RACK_LOCAL_REQUEST =
+ "Skip rack-local request";
+ public final static String SKIP_OFF_SWITCH_REQUEST =
+ "Skip offswitch request";
+ public final static String REQUEST_CAN_NOT_ACCESS_NODE_LABEL =
+ "Resource request can not access the label";
+ public final static String NOT_SUFFICIENT_RESOURCE =
+ "Node does not have sufficient resource for request";
+ public final static String LOCALITY_SKIPPED = "Locality skipped";
+ public final static String FAIL_TO_ALLOCATE = "Fail to allocate";
+ public final static String COULD_NOT_GET_CONTAINER =
+ "Couldn't get container for allocation";
+ public final static String APPLICATION_DO_NOT_NEED_RESOURCE =
+ "Application does not need more resource";
+ public final static String APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE =
+ "Application priority does not need more resource";
+ public final static String SKIPPED_ALL_PRIORITIES =
+ "All priorities are skipped of the app";
+ public final static String RESPECT_FIFO = "To respect FIFO of applications, "
+ + "skipped following applications in the queue";
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java
new file mode 100644
index 0000000..a03814c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/*
+ * It represents tree node in "NodeAllocation" tree structure.
+ * Each node may represent queue, application or container in allocation activity.
+ * Node may have children node if successfully allocated to next level.
+ */
+public class ActivityNode {
+ private String activityNodeName;
+ private String parentName;
+ private String appPriority;
+ private String requestPriority;
+ private ActivityState state;
+ private String diagnostic;
+
+ private List<ActivityNode> childNode;
+
+ public ActivityNode(String activityNodeName, String parentName,
+ String priority, ActivityState state, String diagnostic, String type) {
+ this.activityNodeName = activityNodeName;
+ this.parentName = parentName;
+ if (type != null) {
+ if (type.equals("app")) {
+ this.appPriority = priority;
+ } else if (type.equals("container")) {
+ this.requestPriority = priority;
+ }
+ }
+ this.state = state;
+ this.diagnostic = diagnostic;
+ this.childNode = new LinkedList<>();
+ }
+
+ public String getName() {
+ return this.activityNodeName;
+ }
+
+ public String getParentName() {
+ return this.parentName;
+ }
+
+ public void addChild(ActivityNode node) {
+ childNode.add(0, node);
+ }
+
+ public List<ActivityNode> getChildren() {
+ return this.childNode;
+ }
+
+ public ActivityState getState() {
+ return this.state;
+ }
+
+ public String getDiagnostic() {
+ return this.diagnostic;
+ }
+
+ public String getAppPriority() {
+ return appPriority;
+ }
+
+ public String getRequestPriority() {
+ return requestPriority;
+ }
+
+ public boolean getType() {
+ if (appPriority != null) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.activityNodeName + " ");
+ sb.append(this.appPriority + " ");
+ sb.append(this.state + " ");
+ if (!this.diagnostic.equals("")) {
+ sb.append(this.diagnostic + "\n");
+ }
+ sb.append("\n");
+ for (ActivityNode child : childNode) {
+ sb.append(child.toString() + "\n");
+ }
+ return sb.toString();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java
new file mode 100644
index 0000000..bce1fc9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of activity operation states.
+ */
+public enum ActivityState {
+ // default state when adding a new activity in node allocation
+ DEFAULT,
+ // container is allocated to sub-queues/applications or this queue/application
+ ACCEPTED,
+ // queue or application voluntarily give up to use the resource OR
+ // nothing allocated
+ SKIPPED,
+ // container could not be allocated to sub-queues or this application
+ REJECTED,
+ ALLOCATED, // successfully allocate a new non-reserved container
+ RESERVED, // successfully reserve a new container
+ RE_RESERVED // successfully reserve a new container
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java
new file mode 100644
index 0000000..da768e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * It records an activity operation in allocation,
+ * which can be classified as queue, application or container activity.
+ * Other information include state, diagnostic, priority.
+ */
+public class AllocationActivity {
+ private String childName = null;
+ private String parentName = null;
+ private String appPriority = null;
+ private String requestPriority = null;
+ private ActivityState state;
+ private String diagnostic = null;
+
+ private static final Log LOG = LogFactory.getLog(AllocationActivity.class);
+
+ public AllocationActivity(String parentName, String queueName,
+ String priority, ActivityState state, String diagnostic, String type) {
+ this.childName = queueName;
+ this.parentName = parentName;
+ if (type != null) {
+ if (type.equals("app")) {
+ this.appPriority = priority;
+ } else if (type.equals("container")) {
+ this.requestPriority = priority;
+ }
+ }
+ this.state = state;
+ this.diagnostic = diagnostic;
+ }
+
+ public ActivityNode createTreeNode() {
+ if (appPriority != null) {
+ return new ActivityNode(this.childName, this.parentName, this.appPriority,
+ this.state, this.diagnostic, "app");
+ } else if (requestPriority != null) {
+ return new ActivityNode(this.childName, this.parentName,
+ this.requestPriority, this.state, this.diagnostic, "container");
+ } else {
+ return new ActivityNode(this.childName, this.parentName, null, this.state,
+ this.diagnostic, null);
+ }
+ }
+
+ public String getName() {
+ return this.childName;
+ }
+
+ public String getState() {
+ return this.state.toString();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java
new file mode 100644
index 0000000..e38cefc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of allocation final states.
+ */
+public enum AllocationState {
+ DEFAULT,
+ // queue or application voluntarily give up to use the resource
+ // OR nothing allocated
+ SKIPPED,
+ // successfully allocate a new non-reserved container
+ ALLOCATED,
+ // successfully allocate a new container from an existing reserved container
+ ALLOCATED_FROM_RESERVED,
+ // successfully reserve a new container
+ RESERVED
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
new file mode 100644
index 0000000..15850c0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * It contains allocation information for one application within a period of
+ * time.
+ * Each application allocation may have several allocation attempts.
+ */
+public class AppAllocation {
+ private Priority priority = null;
+ private NodeId nodeId;
+ private ContainerId containerId = null;
+ private ActivityState appState = null;
+ private String diagnostic = null;
+ private String queueName = null;
+ private List<ActivityNode> allocationAttempts;
+ private long timestamp;
+
+ public AppAllocation(Priority priority, NodeId nodeId, String queueName) {
+ this.priority = priority;
+ this.nodeId = nodeId;
+ this.allocationAttempts = new ArrayList<>();
+ this.queueName = queueName;
+ }
+
+ public void updateAppContainerStateAndTime(ContainerId containerId,
+ ActivityState appState, long ts, String diagnostic) {
+ this.timestamp = ts;
+ this.containerId = containerId;
+ this.appState = appState;
+ this.diagnostic = diagnostic;
+ }
+
+ public void addAppAllocationActivity(String containerId, String priority,
+ ActivityState state, String diagnostic, String type) {
+ ActivityNode container = new ActivityNode(containerId, null, priority,
+ state, diagnostic, type);
+ this.allocationAttempts.add(container);
+ if (state == ActivityState.REJECTED) {
+ this.appState = ActivityState.SKIPPED;
+ } else {
+ this.appState = state;
+ }
+ }
+
+ public String getNodeId() {
+ return nodeId.toString();
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public ActivityState getAppState() {
+ return appState;
+ }
+
+ public String getPriority() {
+ if (priority == null) {
+ return null;
+ }
+ return priority.toString();
+ }
+
+ public String getContainerId() {
+ if (containerId == null) {
+ return null;
+ }
+ return containerId.toString();
+ }
+
+ public String getDiagnostic() {
+ return diagnostic;
+ }
+
+ public long getTime() {
+ return this.timestamp;
+ }
+
+ public List<ActivityNode> getAllocationAttempts() {
+ return allocationAttempts;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java
new file mode 100644
index 0000000..6911299
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/*
+ * It contains allocation information for one allocation in a node heartbeat.
+ * Detailed allocation activities are first stored in "AllocationActivity"
+ * as operations, then transformed to a tree structure.
+ * Tree structure starts from root queue and ends in leaf queue,
+ * application or container allocation.
+ */
+public class NodeAllocation {
+ private NodeId nodeId;
+ private long timeStamp;
+ private ContainerId containerId = null;
+ private AllocationState containerState = AllocationState.DEFAULT;
+ private List<AllocationActivity> allocationOperations;
+
+ private ActivityNode root = null;
+
+ private static final Log LOG = LogFactory.getLog(NodeAllocation.class);
+
+ public NodeAllocation(NodeId nodeId) {
+ this.nodeId = nodeId;
+ this.allocationOperations = new ArrayList<>();
+ }
+
+ public void addAllocationActivity(String parentName, String childName,
+ String priority, ActivityState state, String diagnostic, String type) {
+ AllocationActivity allocate = new AllocationActivity(parentName, childName,
+ priority, state, diagnostic, type);
+ this.allocationOperations.add(allocate);
+ }
+
+ public void updateContainerState(ContainerId containerId,
+ AllocationState containerState) {
+ this.containerId = containerId;
+ this.containerState = containerState;
+ }
+
+ // In node allocation, transform each activity to a tree-like structure
+ // for frontend activity display.
+ // eg: root
+ // / \
+ // a b
+ // / \
+ // app1 app2
+ // / \
+ // CA1 CA2
+ // CA means Container Attempt
+ public void transformToTree() {
+ List<ActivityNode> allocationTree = new ArrayList<>();
+
+ if (root == null) {
+ Set<String> names = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ ListIterator<AllocationActivity> ite = allocationOperations.listIterator(
+ allocationOperations.size());
+ while (ite.hasPrevious()) {
+ String name = ite.previous().getName();
+ if (name != null) {
+ if (!names.contains(name)) {
+ names.add(name);
+ } else {
+ ite.remove();
+ }
+ }
+ }
+
+ for (AllocationActivity allocationOperation : allocationOperations) {
+ ActivityNode node = allocationOperation.createTreeNode();
+ String name = node.getName();
+ for (int i = allocationTree.size() - 1; i > -1; i--) {
+ if (allocationTree.get(i).getParentName().equals(name)) {
+ node.addChild(allocationTree.get(i));
+ allocationTree.remove(i);
+ } else {
+ break;
+ }
+ }
+ allocationTree.add(node);
+ }
+ root = allocationTree.get(0);
+ }
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public long getTimeStamp() {
+ return this.timeStamp;
+ }
+
+ public AllocationState getFinalAllocationState() {
+ return containerState;
+ }
+
+ public String getContainerId() {
+ if (containerId == null)
+ return null;
+ return containerId.toString();
+ }
+
+ public ActivityNode getRoot() {
+ return root;
+ }
+
+ public String getNodeId() {
+ return nodeId.toString();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 9c88154..1d8f929 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -91,12 +92,15 @@
protected CapacitySchedulerContext csContext;
protected YarnAuthorizationProvider authorizer = null;
- public AbstractCSQueue(CapacitySchedulerContext cs,
+ protected ActivitiesManager activitiesManager;
+
+ public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent;
this.queueName = queueName;
this.resourceCalculator = cs.getResourceCalculator();
+ this.activitiesManager = cs.getActivitiesManager();
// must be called after parent and queueName is set
this.metrics =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index bedf455..f03a0df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -92,22 +92,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
@@ -307,6 +297,8 @@
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
+ this.activitiesManager = new ActivitiesManager(rmContext);
+ activitiesManager.init(conf);
initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
@@ -344,6 +336,7 @@
@Override
public void serviceStart() throws Exception {
startSchedulerThreads();
+ activitiesManager.start();
super.serviceStart();
}
@@ -523,7 +516,7 @@
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
- newQueues, queues, noop);
+ newQueues, queues, noop);
// Ensure all existing queues are still present
validateExistingQueues(queues, newQueues);
@@ -650,7 +643,7 @@
throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName);
}
- ParentQueue parentQueue =
+ ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
@@ -802,7 +795,7 @@
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
- application.getPriority(), isAttemptRecovering);
+ application.getPriority(), isAttemptRecovering, activitiesManager);
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(
application.getCurrentAppAttempt());
@@ -1233,6 +1226,7 @@
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
+
FiCaSchedulerApp reservedApplication =
getCurrentAttemptForContainer(reservedContainer.getContainerId());
@@ -1262,6 +1256,19 @@
tmp.getAssignmentInformation().incrAllocations();
updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
+
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ queue.getParent().getQueueName(), queue.getQueueName(),
+ ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+ ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+ node, reservedContainer.getContainerId(),
+ AllocationState.ALLOCATED_FROM_RESERVED);
+ } else {
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ queue.getParent().getQueueName(), queue.getQueueName(),
+ ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+ ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+ node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
}
}
@@ -1371,7 +1378,11 @@
setLastNodeUpdateTime(Time.now());
nodeUpdate(node);
if (!scheduleAsynchronously) {
+ ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+ node.getNodeID());
allocateContainersToNode(getNode(node.getNodeID()));
+ ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+ node.getNodeID());
}
}
break;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index b39b289..c41a7bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
@@ -80,4 +81,6 @@
* cluster.
*/
ResourceUsage getClusterResourceUsage();
+
+ ActivitiesManager getActivitiesManager();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a243e93..6bbe85e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -19,15 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -65,9 +57,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -75,6 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@@ -135,7 +130,7 @@
super(cs, queueName, parent, old);
this.scheduler = cs;
- this.activeUsersManager = new ActiveUsersManager(metrics);
+ this.activeUsersManager = new ActiveUsersManager(metrics);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
@@ -144,7 +139,7 @@
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
}
-
+
setupQueueConfigs(cs.getClusterResource());
}
@@ -862,7 +857,7 @@
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
}
-
+
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
@@ -881,6 +876,10 @@
if (reservedContainer != null) {
FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
+
+ ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+ node.getNodeID(), SystemClock.getInstance().getTime(), application);
+
synchronized (application) {
CSAssignment assignment =
application.assignContainers(clusterResource, node,
@@ -895,6 +894,10 @@
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
+ ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+ .getPartition());
return CSAssignment.NULL_ASSIGNMENT;
}
@@ -907,6 +910,9 @@
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
}
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
return CSAssignment.NULL_ASSIGNMENT;
}
@@ -914,13 +920,23 @@
orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) {
FiCaSchedulerApp application = assignmentIterator.next();
+ ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+ node.getNodeID(), SystemClock.getInstance().getTime(), application);
+
// Check queue max-capacity limit
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
currentResourceLimits, application.getCurrentReservation(),
schedulingMode)) {
+ ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+ activitiesManager, node,
+ application, application.getPriority(),
+ ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
-
+
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
node.getPartition(), schedulingMode);
@@ -930,6 +946,10 @@
application, node.getPartition(), currentResourceLimits)) {
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
"User capacity has reached its maximum limit.");
+ ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+ activitiesManager, node,
+ application, application.getPriority(),
+ ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT);
continue;
}
@@ -971,10 +991,17 @@
incReservedResource(node.getPartition(), reservedRes);
}
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED,
+ ActivityDiagnosticConstant.EMPTY);
+
// Done
return assignment;
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
+ ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+ activitiesManager, application.getApplicationId(),
+ ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
application.updateNodeInfoForAMDiagnostics(node);
} else if(assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
@@ -982,9 +1009,18 @@
} else {
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.RESPECT_FIFO);
+ ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+ activitiesManager, application.getApplicationId(),
+ ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
}
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 9ae35ee..a245e3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -42,12 +42,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -81,7 +80,7 @@
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- public ParentQueue(CapacitySchedulerContext cs,
+ public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
@@ -98,14 +97,14 @@
"capacity of " + rawCapacity + " for queue " + queueName +
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
-
+
this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
-
+
setupQueueConfigs(cs.getClusterResource());
- LOG.info("Initialized parent-queue " + queueName +
- " name=" + queueName +
- ", fullname=" + getQueuePath());
+ LOG.info("Initialized parent-queue " + queueName +
+ " name=" + queueName +
+ ", fullname=" + getQueuePath());
}
synchronized void setupQueueConfigs(Resource clusterResource)
@@ -380,6 +379,10 @@
" #applications: " + getNumApplications());
}
+ private String getParentName() {
+ return getParent() != null ? getParent().getQueueName() : "";
+ }
+
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits resourceLimits,
@@ -392,6 +395,16 @@
+ ", because it is not able to access partition=" + node
.getPartition());
}
+
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParentName(), getQueueName(), ActivityState.REJECTED,
+ ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+ .getPartition());
+ if (rootQueue) {
+ ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+ node);
+ }
+
return CSAssignment.NULL_ASSIGNMENT;
}
@@ -404,6 +417,15 @@
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
}
+
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParentName(), getQueueName(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
+ if (rootQueue) {
+ ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+ node);
+ }
+
return CSAssignment.NULL_ASSIGNMENT;
}
@@ -423,9 +445,18 @@
resourceLimits, Resources.createResource(
getMetrics().getReservedMB(), getMetrics()
.getReservedVirtualCores()), schedulingMode)) {
+
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParentName(), getQueueName(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+ if (rootQueue) {
+ ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+ node);
+ }
+
break;
}
-
+
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node, resourceLimits,
@@ -436,6 +467,29 @@
if (Resources.greaterThan(
resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) {
+
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParentName(), getQueueName(), ActivityState.ACCEPTED,
+ ActivityDiagnosticConstant.EMPTY);
+
+ if (node.getReservedContainer() == null) {
+ if (rootQueue) {
+ ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+ activitiesManager, node,
+ assignedToChild.getAssignmentInformation()
+ .getFirstAllocatedOrReservedContainerId(),
+ AllocationState.ALLOCATED);
+ }
+ } else {
+ if (rootQueue) {
+ ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+ activitiesManager, node,
+ assignedToChild.getAssignmentInformation()
+ .getFirstAllocatedOrReservedContainerId(),
+ AllocationState.RESERVED);
+ }
+ }
+
// Track resource utilization for the parent-queue
allocateResource(clusterResource, assignedToChild.getResource(),
node.getPartition(), assignedToChild.isIncreasedAllocation());
@@ -474,6 +528,15 @@
} else {
assignment.setSkippedType(assignedToChild.getSkippedType());
+
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ getParentName(), getQueueName(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.EMPTY);
+ if (rootQueue) {
+ ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+ node);
+ }
+
break;
}
@@ -631,7 +694,7 @@
resourceToSubtract);
}
}
-
+
return assignment;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
index 4d5a7dc..fa13df4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
@@ -24,6 +24,10 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -43,17 +47,25 @@
FiCaSchedulerApp application;
final ResourceCalculator rc;
final RMContext rmContext;
-
+ ActivitiesManager activitiesManager;
+
public AbstractContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
+ this(application, rc, rmContext, null);
+ }
+
+ public AbstractContainerAllocator(FiCaSchedulerApp application,
+ ResourceCalculator rc, RMContext rmContext,
+ ActivitiesManager activitiesManager) {
this.application = application;
this.rc = rc;
this.rmContext = rmContext;
+ this.activitiesManager = activitiesManager;
}
protected CSAssignment getCSAssignmentFromAllocateResult(
Resource clusterResource, ContainerAllocation result,
- RMContainer rmContainer) {
+ RMContainer rmContainer, FiCaSchedulerNode node) {
// Handle skipped
CSAssignment.SkippedType skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED) ?
@@ -61,7 +73,7 @@
CSAssignment.SkippedType.NONE;
CSAssignment assignment = new CSAssignment(skipped);
assignment.setApplication(application);
-
+
// Handle excess reservation
assignment.setExcessReservation(result.getContainerToBeUnreserved());
@@ -85,6 +97,23 @@
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
allocatedResource);
+
+ if (rmContainer != null) {
+ ActivitiesLogger.APP.recordAppActivityWithAllocation(
+ activitiesManager, node, application, updatedContainer,
+ ActivityState.RE_RESERVED);
+ ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+ activitiesManager, application.getApplicationId(),
+ ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+ } else {
+ ActivitiesLogger.APP.recordAppActivityWithAllocation(
+ activitiesManager, node, application, updatedContainer,
+ ActivityState.RESERVED);
+ ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
+ activitiesManager, application.getApplicationId(),
+ updatedContainer.getId(), ActivityState.RESERVED,
+ ActivityDiagnosticConstant.EMPTY);
+ }
} else if (result.getAllocationState() == AllocationState.ALLOCATED){
// This is a new container
// Inform the ordering policy
@@ -105,10 +134,18 @@
assignment.getAssignmentInformation().incrAllocations();
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
allocatedResource);
-
+
if (rmContainer != null) {
assignment.setFulfilledReservation(true);
}
+
+ ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager,
+ node, application, updatedContainer, ActivityState.ALLOCATED);
+ ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
+ activitiesManager, application.getApplicationId(),
+ updatedContainer.getId(), ActivityState.ACCEPTED,
+ ActivityDiagnosticConstant.EMPTY);
+
}
assignment.setContainersToKill(result.getToKillContainers());
@@ -118,13 +155,13 @@
CSAssignment.SkippedType.QUEUE_LIMIT);
}
}
-
+
return assignment;
}
-
+
/**
* allocate needs to handle following stuffs:
- *
+ *
* <ul>
* <li>Select request: Select a request to allocate. E.g. select a resource
* request based on requirement/priority/locality.</li>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index 3be8e0e..4eaa24b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -36,12 +37,17 @@
public ContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
+ this(application, rc, rmContext, null);
+ }
+
+ public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc,
+ RMContext rmContext, ActivitiesManager activitiesManager) {
super(application, rc, rmContext);
increaseContainerAllocator =
new IncreaseContainerAllocator(application, rc, rmContext);
- regularContainerAllocator =
- new RegularContainerAllocator(application, rc, rmContext);
+ regularContainerAllocator = new RegularContainerAllocator(application, rc,
+ rmContext, activitiesManager);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 29b37d8..21114f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -24,11 +24,13 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
@@ -37,6 +39,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -57,10 +65,11 @@
private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
private ResourceRequest lastResourceRequest = null;
-
+
public RegularContainerAllocator(FiCaSchedulerApp application,
- ResourceCalculator rc, RMContext rmContext) {
- super(application, rc, rmContext);
+ ResourceCalculator rc, RMContext rmContext,
+ ActivitiesManager activitiesManager) {
+ super(application, rc, rmContext, activitiesManager);
}
private boolean checkHeadroom(Resource clusterResource,
@@ -85,15 +94,23 @@
private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
+ Priority priority = schedulerKey.getPriority();
+
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
application.updateAppSkipNodeDiagnostics(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE);
return ContainerAllocation.APP_SKIPPED;
}
ResourceRequest anyRequest =
application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (null == anyRequest) {
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@@ -102,6 +119,9 @@
// Do we need containers at this 'priority'?
if (application.getTotalRequiredResources(schedulerKey) <= 0) {
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@@ -116,6 +136,9 @@
}
application.updateAppSkipNodeDiagnostics(
"Skipping assigning to Node in Ignore Exclusivity mode. ");
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE);
return ContainerAllocation.APP_SKIPPED;
}
}
@@ -126,6 +149,10 @@
if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
anyRequest.getNodeLabelExpression(), node.getPartition(),
schedulingMode)) {
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.
+ PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@@ -134,6 +161,9 @@
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS);
return ContainerAllocation.PRIORITY_SKIPPED;
}
}
@@ -143,6 +173,9 @@
LOG.debug("cannot allocate required resource=" + required
+ " because of headroom");
}
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM);
return ContainerAllocation.QUEUE_SKIPPED;
}
@@ -174,7 +207,9 @@
+ missedNonPartitionedRequestSchedulingOpportunity + " required="
+ rmContext.getScheduler().getNumClusterNodes());
}
-
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST);
return ContainerAllocation.APP_SKIPPED;
}
}
@@ -301,6 +336,9 @@
}
// Skip node-local request, go to rack-local request
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, schedulerKey.getPriority(),
+ ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST);
return ContainerAllocation.LOCALITY_SKIPPED;
}
@@ -316,6 +354,9 @@
}
// Skip rack-local request, go to off-switch request
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, schedulerKey.getPriority(),
+ ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST);
return ContainerAllocation.LOCALITY_SKIPPED;
}
@@ -332,6 +373,9 @@
application.updateAppSkipNodeDiagnostics(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY);
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, schedulerKey.getPriority(),
+ ActivityDiagnosticConstant.SKIP_OFF_SWITCH_REQUEST);
return ContainerAllocation.APP_SKIPPED;
}
@@ -339,6 +383,7 @@
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
+ Priority priority = schedulerKey.getPriority();
ContainerAllocation allocation;
@@ -364,6 +409,9 @@
application.getResourceRequest(schedulerKey, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@@ -387,6 +435,9 @@
application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
return ContainerAllocation.PRIORITY_SKIPPED;
}
if (requestType != NodeType.NODE_LOCAL
@@ -408,7 +459,9 @@
return allocation;
}
-
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.PRIORITY_SKIPPED);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@@ -416,6 +469,7 @@
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
ResourceRequest request, NodeType type, RMContainer rmContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+ Priority priority = schedulerKey.getPriority();
lastResourceRequest = request;
if (LOG.isDebugEnabled()) {
@@ -432,6 +486,10 @@
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
// We should un-reserve this container.
+ ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+ node, application, priority,
+ ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS_NODE_LABEL,
+ ActivityState.REJECTED);
return new ContainerAllocation(rmContainer, null,
AllocationState.LOCALITY_SKIPPED);
}
@@ -446,6 +504,9 @@
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
// Skip this locality request
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE);
return ContainerAllocation.LOCALITY_SKIPPED;
}
@@ -524,6 +585,9 @@
// continue.
if (null == unreservedContainer) {
// Skip the locality request
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@@ -548,6 +612,9 @@
LOG.debug("we needed to unreserve to be able to allocate");
}
// Skip the locality request
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@@ -560,6 +627,9 @@
return result;
}
// Skip the locality request
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, priority,
+ ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@@ -636,6 +706,9 @@
ContainerAllocation ret =
new ContainerAllocation(allocationResult.containerToBeUnreserved,
null, AllocationState.APP_SKIPPED);
+ ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+ node, application, schedulerKey.getPriority(),
+ ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED);
return ret;
}
@@ -662,6 +735,10 @@
application
.updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
LOG.warn("Couldn't get container for allocation!");
+ ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+ node, application, schedulerKey.getPriority(),
+ ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER,
+ ActivityState.REJECTED);
return ContainerAllocation.APP_SKIPPED;
}
@@ -741,6 +818,9 @@
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-label=" + node.getPartition());
}
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, application.getPriority(),
+ ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE);
return CSAssignment.SKIP_ASSIGNMENT;
}
@@ -755,18 +835,21 @@
continue;
}
return getCSAssignmentFromAllocateResult(clusterResource, result,
- null);
+ null, node);
}
// We will reach here if we skipped all priorities of the app, so we will
// skip the app.
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+ activitiesManager, node, application, application.getPriority(),
+ ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
return CSAssignment.SKIP_ASSIGNMENT;
} else {
ContainerAllocation result =
allocate(clusterResource, node, schedulingMode, resourceLimits,
reservedContainer.getReservedSchedulerKey(), reservedContainer);
return getCSAssignmentFromAllocateResult(clusterResource, result,
- reservedContainer);
+ reservedContainer, node);
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 67d93a4..33dee80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
@@ -108,8 +109,16 @@
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
+ this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
+ appPriority, isAttemptRecovering, null);
+ }
+
+ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
+ ActivitiesManager activitiesManager) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
-
+
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
Resource amResource;
@@ -139,8 +148,9 @@
if (scheduler.getResourceCalculator() != null) {
rc = scheduler.getResourceCalculator();
}
-
- containerAllocator = new ContainerAllocator(this, rc, rmContext);
+
+ containerAllocator = new ContainerAllocator(this, rc, rmContext,
+ activitiesManager);
if (scheduler instanceof CapacityScheduler) {
capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
@@ -189,7 +199,7 @@
return null;
}
- // Required sanity check - AM can call 'allocate' to update resource
+ // Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(schedulerKey) <= 0) {
return null;
@@ -493,7 +503,7 @@
public LeafQueue getCSLeafQueue() {
return (LeafQueue)queue;
}
-
+
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode, RMContainer reservedContainer) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 75bffc7..4305fd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -130,9 +130,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
@@ -176,6 +180,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.WebServices;
@@ -577,6 +582,124 @@
}
@GET
+ @Path("/scheduler/activities")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
+ @QueryParam("nodeId") String nodeId) {
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
+
+ if (scheduler instanceof AbstractYarnScheduler) {
+ String errMessage = "";
+
+ AbstractYarnScheduler abstractYarnScheduler =
+ (AbstractYarnScheduler) scheduler;
+
+ ActivitiesManager activitiesManager =
+ abstractYarnScheduler.getActivitiesManager();
+ if (null == activitiesManager) {
+ errMessage = "Not Capacity Scheduler";
+ return new ActivitiesInfo(errMessage, nodeId);
+ }
+
+ List<FiCaSchedulerNode> nodeList =
+ abstractYarnScheduler.getNodeTracker().getAllNodes();
+
+ boolean illegalInput = false;
+
+ if (nodeList.size() == 0) {
+ illegalInput = true;
+ errMessage = "No node manager running in the cluster";
+ } else {
+ if (nodeId != null) {
+ String hostName = nodeId;
+ String portName = "";
+ if (nodeId.contains(":")) {
+ int index = nodeId.indexOf(":");
+ hostName = nodeId.substring(0, index);
+ portName = nodeId.substring(index + 1);
+ }
+
+ boolean correctNodeId = false;
+ for (FiCaSchedulerNode node : nodeList) {
+ if ((portName.equals("") && node.getRMNode().getHostName().equals(
+ hostName)) || (!portName.equals("") && node.getRMNode()
+ .getHostName().equals(hostName) && String.valueOf(
+ node.getRMNode().getCommandPort()).equals(portName))) {
+ correctNodeId = true;
+ nodeId = node.getNodeID().toString();
+ break;
+ }
+ }
+ if (!correctNodeId) {
+ illegalInput = true;
+ errMessage = "Cannot find node manager with given node id";
+ }
+ }
+ }
+
+ if (!illegalInput) {
+ activitiesManager.recordNextNodeUpdateActivities(nodeId);
+ return activitiesManager.getActivitiesInfo(nodeId);
+ }
+
+ // Return a activities info with error message
+ return new ActivitiesInfo(errMessage, nodeId);
+ }
+
+ return null;
+ }
+
+ @GET
+ @Path("/scheduler/app-activities")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
+ @QueryParam("appId") String appId, @QueryParam("maxTime") String time) {
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
+
+ if (scheduler instanceof AbstractYarnScheduler) {
+ AbstractYarnScheduler abstractYarnScheduler =
+ (AbstractYarnScheduler) scheduler;
+
+ ActivitiesManager activitiesManager =
+ abstractYarnScheduler.getActivitiesManager();
+ if (null == activitiesManager) {
+ String errMessage = "Not Capacity Scheduler";
+ return new AppActivitiesInfo(errMessage, appId);
+ }
+
+ if(appId == null) {
+ String errMessage = "Must provide an application Id";
+ return new AppActivitiesInfo(errMessage, null);
+ }
+
+ double maxTime = 3.0;
+
+ if (time != null) {
+ if (time.contains(".")) {
+ maxTime = Double.parseDouble(time);
+ } else {
+ maxTime = Double.parseDouble(time + ".0");
+ }
+ }
+
+ ApplicationId applicationId;
+ try {
+ applicationId = ApplicationId.fromString(appId);
+ activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
+ AppActivitiesInfo appActivitiesInfo =
+ activitiesManager.getAppActivitiesInfo(applicationId);
+
+ return appActivitiesInfo;
+ } catch (Exception e) {
+ String errMessage = "Cannot find application with given appId";
+ return new AppActivitiesInfo(errMessage, appId);
+ }
+
+ }
+ return null;
+ }
+
+ @GET
@Path("/appstatistics")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ApplicationStatisticsInfo getAppStatistics(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java
new file mode 100644
index 0000000..0de340a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+
+/*
+ * DAO object to display node allocation activity.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ActivitiesInfo {
+ protected String nodeId;
+ protected String timeStamp;
+ protected String diagnostic = null;
+ protected List<NodeAllocationInfo> allocations;
+
+ private static final Log LOG = LogFactory.getLog(ActivitiesInfo.class);
+
+ public ActivitiesInfo() {
+ }
+
+ public ActivitiesInfo(String errorMessage, String nodeId) {
+ this.diagnostic = errorMessage;
+ this.nodeId = nodeId;
+ }
+
+ public ActivitiesInfo(List<NodeAllocation> nodeAllocations, String nodeId) {
+ this.nodeId = nodeId;
+ this.allocations = new ArrayList<>();
+
+ if (nodeAllocations == null) {
+ diagnostic = (nodeId != null ?
+ "waiting for display" :
+ "waiting for next allocation");
+ } else {
+ if (nodeAllocations.size() == 0) {
+ diagnostic = "do not have available resources";
+ } else {
+ this.nodeId = nodeAllocations.get(0).getNodeId();
+
+ Date date = new Date();
+ date.setTime(nodeAllocations.get(0).getTimeStamp());
+ this.timeStamp = date.toString();
+
+ for (int i = 0; i < nodeAllocations.size(); i++) {
+ NodeAllocation nodeAllocation = nodeAllocations.get(i);
+ NodeAllocationInfo allocationInfo = new NodeAllocationInfo(
+ nodeAllocation);
+ this.allocations.add(allocationInfo);
+ }
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java
new file mode 100644
index 0000000..9553a720
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * DAO object to display node information in allocation tree.
+ * It corresponds to "ActivityNode" class.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ActivityNodeInfo {
+ protected String name; // The name for activity node
+ protected String appPriority;
+ protected String requestPriority;
+ protected String allocationState;
+ protected String diagnostic;
+
+ protected List<ActivityNodeInfo> children;
+
+ ActivityNodeInfo() {
+ }
+
+ ActivityNodeInfo(ActivityNode node) {
+ this.name = node.getName();
+ getPriority(node);
+ this.allocationState = node.getState().name();
+ this.diagnostic = node.getDiagnostic();
+ this.children = new ArrayList<>();
+
+ for (ActivityNode child : node.getChildren()) {
+ ActivityNodeInfo containerInfo = new ActivityNodeInfo(child);
+ this.children.add(containerInfo);
+ }
+ }
+
+ private void getPriority(ActivityNode node) {
+ if (node.getType()) {
+ this.appPriority = node.getAppPriority();
+ } else {
+ this.requestPriority = node.getRequestPriority();
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
new file mode 100644
index 0000000..38c45a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/*
+ * DAO object to display application activity.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AppActivitiesInfo {
+ protected String applicationId;
+ protected String diagnostic;
+ protected String timeStamp;
+ protected List<AppAllocationInfo> allocations;
+
+ private static final Log LOG = LogFactory.getLog(AppActivitiesInfo.class);
+
+ public AppActivitiesInfo() {
+ }
+
+ public AppActivitiesInfo(String errorMessage, String applicationId) {
+ this.diagnostic = errorMessage;
+ this.applicationId = applicationId;
+
+ Date date = new Date();
+ date.setTime(SystemClock.getInstance().getTime());
+ this.timeStamp = date.toString();
+ }
+
+ public AppActivitiesInfo(List<AppAllocation> appAllocations,
+ ApplicationId applicationId) {
+ this.applicationId = applicationId.toString();
+ this.allocations = new ArrayList<>();
+
+ if (appAllocations == null) {
+ diagnostic = "waiting for display";
+
+ Date date = new Date();
+ date.setTime(SystemClock.getInstance().getTime());
+ this.timeStamp = date.toString();
+ } else {
+ for (int i = appAllocations.size() - 1; i > -1; i--) {
+ AppAllocation appAllocation = appAllocations.get(i);
+ AppAllocationInfo appAllocationInfo = new AppAllocationInfo(
+ appAllocation);
+ this.allocations.add(appAllocationInfo);
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
new file mode 100644
index 0000000..21d3788
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/*
+ * DAO object to display application allocation detailed information.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AppAllocationInfo {
+ protected String nodeId;
+ protected String queueName;
+ protected String appPriority;
+ protected String allocatedContainerId;
+ protected String allocationState;
+ protected String diagnostic;
+ protected String timeStamp;
+ protected List<ActivityNodeInfo> allocationAttempt;
+
+ private static final Log LOG = LogFactory.getLog(AppAllocationInfo.class);
+
+ AppAllocationInfo() {
+ }
+
+ AppAllocationInfo(AppAllocation allocation) {
+ this.allocationAttempt = new ArrayList<>();
+
+ this.nodeId = allocation.getNodeId();
+ this.queueName = allocation.getQueueName();
+ this.appPriority = allocation.getPriority();
+ this.allocatedContainerId = allocation.getContainerId();
+ this.allocationState = allocation.getAppState().name();
+ this.diagnostic = allocation.getDiagnostic();
+
+ Date date = new Date();
+ date.setTime(allocation.getTime());
+ this.timeStamp = date.toString();
+
+ for (ActivityNode attempt : allocation.getAllocationAttempts()) {
+ ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt);
+ this.allocationAttempt.add(containerInfo);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java
new file mode 100644
index 0000000..1350a76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/*
+ * DAO object to display each node allocation in node heartbeat.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class NodeAllocationInfo {
+ protected String allocatedContainerId;
+ protected String finalAllocationState;
+ protected ActivityNodeInfo root = null;
+
+ private static final Log LOG = LogFactory.getLog(NodeAllocationInfo.class);
+
+ NodeAllocationInfo() {
+ }
+
+ NodeAllocationInfo(NodeAllocation allocation) {
+ this.allocatedContainerId = allocation.getContainerId();
+ this.finalAllocationState = allocation.getFinalAllocationState().name();
+
+ root = new ActivityNodeInfo(allocation.getRoot());
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
index 649d719..bbdfdd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
@@ -62,9 +62,9 @@
public class TestRMWebServicesCapacitySched extends JerseyTestBase {
- private static MockRM rm;
- private static CapacitySchedulerConfiguration csConf;
- private static YarnConfiguration conf;
+ protected static MockRM rm;
+ protected static CapacitySchedulerConfiguration csConf;
+ protected static YarnConfiguration conf;
private class QueueInfo {
float capacity;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
new file mode 100644
index 0000000..d7b0581
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
@@ -0,0 +1,777 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Test;
+
+import javax.ws.rs.core.MediaType;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestRMWebServicesSchedulerActivities
+ extends TestRMWebServicesCapacitySched {
+
+ private static final Log LOG = LogFactory.getLog(
+ TestRMWebServicesSchedulerActivities.class);
+
+ @Test
+ public void testAssignMultipleContainersPerNodeHeartbeat()
+ throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "127.0.0.1",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "/default-rack",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+ 10)), null);
+
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("nodeId", "127.0.0.1:1234");
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ nm.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 11);
+
+ JSONArray allocations = json.getJSONArray("allocations");
+ for (int i = 0; i < allocations.length(); i++) {
+ if (i != allocations.length() - 1) {
+ verifyStateOfAllocations(allocations.getJSONObject(i),
+ "finalAllocationState", "ALLOCATED");
+ verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
+ } else {
+ verifyStateOfAllocations(allocations.getJSONObject(i),
+ "finalAllocationState", "SKIPPED");
+ verifyQueueOrder(allocations.getJSONObject(i), "root-a-b");
+ }
+ }
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testAssignWithoutAvailableResource() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "127.0.0.1",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "/default-rack",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+ 10)), null);
+
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("nodeId", "127.0.0.1");
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ nm.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 0);
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testNoNM() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ try {
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("nodeId", "127.0.0.1:1234");
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ Thread.sleep(1000);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 0);
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testWrongNodeId() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "127.0.0.1",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "/default-rack",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+ 10)), null);
+
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("nodeId", "127.0.0.0");
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ nm.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 0);
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testReserveNewContainer() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
+ rm.getResourceTrackerService());
+ MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
+ rm.getResourceTrackerService());
+
+ nm1.registerNode();
+ nm2.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
+ 10)), null);
+
+ // Reserve new container
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("nodeId", "127.0.0.2");
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ nm2.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+
+ verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b3-b1");
+
+ JSONObject allocations = json.getJSONObject("allocations");
+ verifyStateOfAllocations(allocations, "finalAllocationState", "RESERVED");
+
+ // Do a node heartbeat again without releasing container from app2
+ r = resource();
+ params = new MultivaluedMapImpl();
+ params.add("nodeId", "127.0.0.2");
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ nm2.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+
+ verifyQueueOrder(json.getJSONObject("allocations"), "b1");
+
+ allocations = json.getJSONObject("allocations");
+ verifyStateOfAllocations(allocations, "finalAllocationState", "SKIPPED");
+
+ // Finish application 2
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ ContainerId containerId = ContainerId.newContainerId(
+ am2.getApplicationAttemptId(), 1);
+ cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
+ .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+ RMContainerEventType.FINISHED);
+
+ // Do a node heartbeat again
+ r = resource();
+ params = new MultivaluedMapImpl();
+ params.add("nodeId", "127.0.0.2");
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ nm2.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+
+ verifyQueueOrder(json.getJSONObject("allocations"), "b1");
+
+ allocations = json.getJSONObject("allocations");
+ verifyStateOfAllocations(allocations, "finalAllocationState",
+ "ALLOCATED_FROM_RESERVED");
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testActivityJSON() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("nodeId", "127.0.0.1");
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ nm.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+
+ JSONObject allocations = json.getJSONObject("allocations");
+ verifyStateOfAllocations(allocations, "finalAllocationState",
+ "ALLOCATED");
+
+ verifyNumberOfNodes(allocations, 6);
+
+ verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1");
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ private void verifyNumberOfNodes(JSONObject allocation, int realValue)
+ throws Exception {
+ if (allocation.isNull("root")) {
+ assertEquals("State of allocation is wrong", 0, realValue);
+ } else {
+ assertEquals("State of allocation is wrong",
+ 1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue);
+ }
+ }
+
+ private int getNumberOfNodes(JSONObject allocation) throws Exception {
+ if (!allocation.isNull("children")) {
+ Object object = allocation.get("children");
+ if (object.getClass() == JSONObject.class) {
+ return 1 + getNumberOfNodes((JSONObject) object);
+ } else {
+ int count = 0;
+ for (int i = 0; i < ((JSONArray) object).length(); i++) {
+ count += (1 + getNumberOfNodes(
+ ((JSONArray) object).getJSONObject(i)));
+ }
+ return count;
+ }
+ } else {
+ return 0;
+ }
+ }
+
+ private void verifyStateOfAllocations(JSONObject allocation,
+ String nameToCheck, String realState) throws Exception {
+ assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
+ realState);
+ }
+
+ private void verifyNumberOfAllocations(JSONObject json, int realValue)
+ throws Exception {
+ if (json.isNull("allocations")) {
+ assertEquals("Number of allocations is wrong", 0, realValue);
+ } else {
+ Object object = json.get("allocations");
+ if (object.getClass() == JSONObject.class) {
+ assertEquals("Number of allocations is wrong", 1, realValue);
+ } else if (object.getClass() == JSONArray.class) {
+ assertEquals("Number of allocations is wrong",
+ ((JSONArray) object).length(), realValue);
+ }
+ }
+ }
+
+ private void verifyQueueOrder(JSONObject json, String realOrder)
+ throws Exception {
+ String order = "";
+ if (!json.isNull("root")) {
+ JSONObject root = json.getJSONObject("root");
+ order = root.getString("name") + "-" + getQueueOrder(root);
+ }
+ assertEquals("Order of queue is wrong",
+ order.substring(0, order.length() - 1), realOrder);
+ }
+
+ private String getQueueOrder(JSONObject node) throws Exception {
+ if (!node.isNull("children")) {
+ Object children = node.get("children");
+ if (children.getClass() == JSONObject.class) {
+ if (!((JSONObject) children).isNull("appPriority")) {
+ return "";
+ }
+ return ((JSONObject) children).getString("name") + "-" + getQueueOrder(
+ (JSONObject) children);
+ } else if (children.getClass() == JSONArray.class) {
+ String order = "";
+ for (int i = 0; i < ((JSONArray) children).length(); i++) {
+ JSONObject child = (JSONObject) ((JSONArray) children).get(i);
+ if (!child.isNull("appPriority")) {
+ return "";
+ }
+ order += (child.getString("name") + "-" + getQueueOrder(child));
+ }
+ return order;
+ }
+ }
+ return "";
+ }
+
+ private void verifyNumberOfAllocationAttempts(JSONObject allocation,
+ int realValue) throws Exception {
+ if (allocation.isNull("allocationAttempt")) {
+ assertEquals("Number of allocation attempts is wrong", 0, realValue);
+ } else {
+ Object object = allocation.get("allocationAttempt");
+ if (object.getClass() == JSONObject.class) {
+ assertEquals("Number of allocations attempts is wrong", 1, realValue);
+ } else if (object.getClass() == JSONArray.class) {
+ assertEquals("Number of allocations attempts is wrong",
+ ((JSONArray) object).length(), realValue);
+ }
+ }
+ }
+
+ @Test
+ public void testAppActivityJSON() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("appId", app1.getApplicationId().toString());
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+ nm.nodeHeartbeat(true);
+ Thread.sleep(5000);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+
+ JSONObject allocations = json.getJSONObject("allocations");
+ verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
+
+ verifyNumberOfAllocationAttempts(allocations, 1);
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testAppAssignMultipleContainersPerNodeHeartbeat()
+ throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "127.0.0.1",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "/default-rack",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+ 10)), null);
+
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("appId", app1.getApplicationId().toString());
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+ nm.nodeHeartbeat(true);
+ Thread.sleep(5000);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 10);
+
+ JSONArray allocations = json.getJSONArray("allocations");
+ for (int i = 0; i < allocations.length(); i++) {
+ verifyStateOfAllocations(allocations.getJSONObject(i),
+ "allocationState", "ACCEPTED");
+ }
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testAppAssignWithoutAvailableResource() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "127.0.0.1",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "/default-rack",
+ Resources.createResource(1024), 10), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+ 10)), null);
+
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("appId", app1.getApplicationId().toString());
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+ nm.nodeHeartbeat(true);
+ Thread.sleep(5000);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 0);
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testAppNoNM() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ try {
+ RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+
+ //Get JSON
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("appId", app1.getApplicationId().toString());
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ //Get JSON
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 0);
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+ @Test
+ public void testAppReserveNewContainer() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
+ rm.getResourceTrackerService());
+ MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
+ rm.getResourceTrackerService());
+
+ nm1.registerNode();
+ nm2.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
+ 10)), null);
+
+ // Reserve new container
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("appId", app1.getApplicationId().toString());
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ nm2.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+
+ // Do a node heartbeat again without releasing container from app2
+ r = resource();
+ params = new MultivaluedMapImpl();
+ params.add("appId", app1.getApplicationId().toString());
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ nm2.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 2);
+
+ // Finish application 2
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ ContainerId containerId = ContainerId.newContainerId(
+ am2.getApplicationAttemptId(), 1);
+ cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
+ .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+ RMContainerEventType.FINISHED);
+
+ // Do a node heartbeat again
+ r = resource();
+ params = new MultivaluedMapImpl();
+ params.add("appId", app1.getApplicationId().toString());
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ nm2.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/app-activities").queryParams(params).accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 3);
+ }
+ finally {
+ rm.stop();
+ }
+ }
+
+}