YARN-4091. Add REST API to retrieve scheduler activity. (Chen Ge and Sunil G via wangda)
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 7c19c5e..a5c0f71 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -519,6 +519,7 @@
     <Or>
       <Field name="rmContext" />
       <Field name="applications" />
+      <Field name="activitiesManager" />
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
@@ -552,4 +553,15 @@
     <Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
     <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
   </Match>
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivityNodeInfo"/>
+    <Or>
+      <Field name="allocationState" />
+      <Field name="diagnostic" />
+      <Field name="name" />
+      <Field name="requestPriority" />
+      <Field name="appPriority" />
+    </Or>
+    <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
+  </Match>
 </FindBugsFilter>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 64eb777..755defd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -71,6 +71,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
     .LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
@@ -97,6 +98,8 @@
   
   private volatile Priority maxClusterLevelAppPriority;
 
+  protected ActivitiesManager activitiesManager;
+
   /*
    * All schedulers which are inheriting AbstractYarnScheduler should use
    * concurrent version of 'applications' map.
@@ -789,4 +792,9 @@
     }
     return schedulerChangeRequests;
   }
+
+  public ActivitiesManager getActivitiesManager() {
+    return this.activitiesManager;
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
new file mode 100644
index 0000000..8fa1bb5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * Utility for logging scheduler activities
+ */
+public class ActivitiesLogger {
+  private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class);
+
+  /**
+   * Methods for recording activities from an app
+   */
+  public static class APP {
+
+    /*
+     * Record skipped application activity when no container allocated /
+     * reserved / re-reserved. Scheduler will look at following applications
+     * within the same leaf queue.
+     */
+    public static void recordSkippedAppActivityWithoutAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        SchedulerApplicationAttempt application, Priority priority,
+        String diagnostic) {
+      recordAppActivityWithoutAllocation(activitiesManager, node, application,
+          priority, diagnostic, ActivityState.SKIPPED);
+    }
+
+    /*
+     * Record application activity when rejected because of queue maximum
+     * capacity or user limit.
+     */
+    public static void recordRejectedAppActivityFromLeafQueue(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        SchedulerApplicationAttempt application, Priority priority,
+        String diagnostic) {
+      String type = "app";
+      recordActivity(activitiesManager, node, application.getQueueName(),
+          application.getApplicationId().toString(), priority,
+          ActivityState.REJECTED, diagnostic, type);
+      finishSkippedAppAllocationRecording(activitiesManager,
+          application.getApplicationId(), ActivityState.REJECTED, diagnostic);
+    }
+
+    /*
+     * Record application activity when no container allocated /
+     * reserved / re-reserved. Scheduler will look at following applications
+     * within the same leaf queue.
+     */
+    public static void recordAppActivityWithoutAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        SchedulerApplicationAttempt application, Priority priority,
+        String diagnostic, ActivityState appState) {
+      if (activitiesManager == null) {
+        return;
+      }
+      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+        String type = "container";
+        // Add application-container activity into specific node allocation.
+        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+            application.getApplicationId().toString(), null,
+            priority.toString(), ActivityState.SKIPPED, diagnostic, type);
+        type = "app";
+        // Add queue-application activity into specific node allocation.
+        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+            application.getQueueName(),
+            application.getApplicationId().toString(),
+            application.getPriority().toString(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY, type);
+      }
+      // Add application-container activity into specific application allocation
+      // Under this condition, it fails to allocate a container to this
+      // application, so containerId is null.
+      if (activitiesManager.shouldRecordThisApp(
+          application.getApplicationId())) {
+        String type = "container";
+        activitiesManager.addSchedulingActivityForApp(
+            application.getApplicationId(), null, priority.toString(), appState,
+            diagnostic, type);
+      }
+    }
+
+    /*
+     * Record application activity when container allocated / reserved /
+     * re-reserved
+     */
+    public static void recordAppActivityWithAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        SchedulerApplicationAttempt application, Container updatedContainer,
+        ActivityState activityState) {
+      if (activitiesManager == null) {
+        return;
+      }
+      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+        String type = "container";
+        // Add application-container activity into specific node allocation.
+        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+            application.getApplicationId().toString(),
+            updatedContainer.getId().toString(),
+            updatedContainer.getPriority().toString(), activityState,
+            ActivityDiagnosticConstant.EMPTY, type);
+        type = "app";
+        // Add queue-application activity into specific node allocation.
+        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+            application.getQueueName(),
+            application.getApplicationId().toString(),
+            application.getPriority().toString(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY, type);
+      }
+      // Add application-container activity into specific application allocation
+      if (activitiesManager.shouldRecordThisApp(
+          application.getApplicationId())) {
+        String type = "container";
+        activitiesManager.addSchedulingActivityForApp(
+            application.getApplicationId(), updatedContainer.getId().toString(),
+            updatedContainer.getPriority().toString(), activityState,
+            ActivityDiagnosticConstant.EMPTY, type);
+      }
+    }
+
+    /*
+     * Invoked when scheduler starts to look at this application within one node
+     * update.
+     */
+    public static void startAppAllocationRecording(
+        ActivitiesManager activitiesManager, NodeId nodeId, long currentTime,
+        SchedulerApplicationAttempt application) {
+      if (activitiesManager == null) {
+        return;
+      }
+      activitiesManager.startAppAllocationRecording(nodeId, currentTime,
+          application);
+    }
+
+    /*
+     * Invoked when scheduler finishes looking at this application within one
+     * node update, and the app has any container allocated/reserved during
+     * this allocation.
+     */
+    public static void finishAllocatedAppAllocationRecording(
+        ActivitiesManager activitiesManager, ApplicationId applicationId,
+        ContainerId containerId, ActivityState containerState,
+        String diagnostic) {
+      if (activitiesManager == null) {
+        return;
+      }
+
+      if (activitiesManager.shouldRecordThisApp(applicationId)) {
+        activitiesManager.finishAppAllocationRecording(applicationId,
+            containerId, containerState, diagnostic);
+      }
+    }
+
+    /*
+     * Invoked when scheduler finishes looking at this application within one
+     * node update, and the app DOESN'T have any container allocated/reserved
+     * during this allocation.
+     */
+    public static void finishSkippedAppAllocationRecording(
+        ActivitiesManager activitiesManager, ApplicationId applicationId,
+        ActivityState containerState, String diagnostic) {
+      finishAllocatedAppAllocationRecording(activitiesManager, applicationId,
+          null, containerState, diagnostic);
+    }
+  }
+
+  /**
+   * Methods for recording activities from a queue
+   */
+  public static class QUEUE {
+    /*
+     * Record activities of a queue
+     */
+    public static void recordQueueActivity(ActivitiesManager activitiesManager,
+        SchedulerNode node, String parentQueueName, String queueName,
+        ActivityState state, String diagnostic) {
+      recordActivity(activitiesManager, node, parentQueueName, queueName, null,
+          state, diagnostic, null);
+    }
+  }
+
+  /**
+   * Methods for recording overall activities from one node update
+   */
+  public static class NODE {
+
+    /*
+     * Invoked when node allocation finishes, and there's NO container
+     * allocated or reserved during the allocation
+     */
+    public static void finishSkippedNodeAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node) {
+      finishAllocatedNodeAllocation(activitiesManager, node, null,
+          AllocationState.SKIPPED);
+    }
+
+    /*
+     * Invoked when node allocation finishes, and there's any container
+     * allocated or reserved during the allocation
+     */
+    public static void finishAllocatedNodeAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        ContainerId containerId, AllocationState containerState) {
+      if (activitiesManager == null) {
+        return;
+      }
+      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+        activitiesManager.updateAllocationFinalState(node.getNodeID(),
+            containerId, containerState);
+      }
+    }
+
+    /*
+     * Invoked when node heartbeat finishes
+     */
+    public static void finishNodeUpdateRecording(
+        ActivitiesManager activitiesManager, NodeId nodeID) {
+      if (activitiesManager == null) {
+        return;
+      }
+      activitiesManager.finishNodeUpdateRecording(nodeID);
+    }
+
+    /*
+     * Invoked when node heartbeat starts
+     */
+    public static void startNodeUpdateRecording(
+        ActivitiesManager activitiesManager, NodeId nodeID) {
+      if (activitiesManager == null) {
+        return;
+      }
+      activitiesManager.startNodeUpdateRecording(nodeID);
+    }
+  }
+
+  // Add queue, application or container activity into specific node allocation.
+  private static void recordActivity(ActivitiesManager activitiesManager,
+      SchedulerNode node, String parentName, String childName,
+      Priority priority, ActivityState state, String diagnostic, String type) {
+    if (activitiesManager == null) {
+      return;
+    }
+    if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+      activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+          parentName, childName, priority != null ? priority.toString() : null,
+          state, diagnostic, type);
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
new file mode 100644
index 0000000..4fa5feb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+import java.util.Set;
+import java.util.*;
+import java.util.ArrayList;
+
+/**
+ * A class to store node or application allocations.
+ * It mainly contains operations for allocation start, add, update and finish.
+ */
+public class ActivitiesManager extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(ActivitiesManager.class);
+  private ConcurrentMap<NodeId, List<NodeAllocation>> recordingNodesAllocation;
+  private ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
+  private Set<NodeId> activeRecordedNodes;
+  private ConcurrentMap<ApplicationId, Long>
+      recordingAppActivitiesUntilSpecifiedTime;
+  private ConcurrentMap<ApplicationId, AppAllocation> appsAllocation;
+  private ConcurrentMap<ApplicationId, List<AppAllocation>>
+      completedAppAllocations;
+  private boolean recordNextAvailableNode = false;
+  private List<NodeAllocation> lastAvailableNodeActivities = null;
+  private Thread cleanUpThread;
+  private int timeThreshold = 600 * 1000;
+  private final RMContext rmContext;
+
+  public ActivitiesManager(RMContext rmContext) {
+    super(ActivitiesManager.class.getName());
+    recordingNodesAllocation = new ConcurrentHashMap<>();
+    completedNodeAllocations = new ConcurrentHashMap<>();
+    appsAllocation = new ConcurrentHashMap<>();
+    completedAppAllocations = new ConcurrentHashMap<>();
+    activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
+    this.rmContext = rmContext;
+  }
+
+  public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
+    if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus()
+        == FinalApplicationStatus.UNDEFINED) {
+      List<AppAllocation> allocations = completedAppAllocations.get(
+          applicationId);
+
+      return new AppActivitiesInfo(allocations, applicationId);
+    } else {
+      return new AppActivitiesInfo(
+          "fail to get application activities after finished",
+          applicationId.toString());
+    }
+  }
+
+  public ActivitiesInfo getActivitiesInfo(String nodeId) {
+    List<NodeAllocation> allocations;
+    if (nodeId == null) {
+      allocations = lastAvailableNodeActivities;
+    } else {
+      allocations = completedNodeAllocations.get(NodeId.fromString(nodeId));
+    }
+    return new ActivitiesInfo(allocations, nodeId);
+  }
+
+  public void recordNextNodeUpdateActivities(String nodeId) {
+    if (nodeId == null) {
+      recordNextAvailableNode = true;
+    } else {
+      activeRecordedNodes.add(NodeId.fromString(nodeId));
+    }
+  }
+
+  public void turnOnAppActivitiesRecording(ApplicationId applicationId,
+      double maxTime) {
+    long startTS = SystemClock.getInstance().getTime();
+    long endTS = startTS + (long) (maxTime * 1000);
+    recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    cleanUpThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (true) {
+          Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
+              completedNodeAllocations.entrySet().iterator();
+          while (ite.hasNext()) {
+            Map.Entry<NodeId, List<NodeAllocation>> nodeAllocation = ite.next();
+            List<NodeAllocation> allocations = nodeAllocation.getValue();
+            long currTS = SystemClock.getInstance().getTime();
+            if (allocations.size() > 0 && allocations.get(0).getTimeStamp()
+                - currTS > timeThreshold) {
+              ite.remove();
+            }
+          }
+
+          Iterator<Map.Entry<ApplicationId, List<AppAllocation>>> iteApp =
+              completedAppAllocations.entrySet().iterator();
+          while (iteApp.hasNext()) {
+            Map.Entry<ApplicationId, List<AppAllocation>> appAllocation =
+                iteApp.next();
+            if (rmContext.getRMApps().get(appAllocation.getKey())
+                .getFinalApplicationStatus()
+                != FinalApplicationStatus.UNDEFINED) {
+              iteApp.remove();
+            }
+          }
+
+          try {
+            Thread.sleep(5000);
+          } catch (Exception e) {
+            // ignore
+          }
+        }
+      }
+    });
+
+    cleanUpThread.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    cleanUpThread.interrupt();
+    super.serviceStop();
+  }
+
+  void startNodeUpdateRecording(NodeId nodeID) {
+    if (recordNextAvailableNode) {
+      recordNextNodeUpdateActivities(nodeID.toString());
+    }
+    if (activeRecordedNodes.contains(nodeID)) {
+      List<NodeAllocation> nodeAllocation = new ArrayList<>();
+      recordingNodesAllocation.put(nodeID, nodeAllocation);
+    }
+  }
+
+  void startAppAllocationRecording(NodeId nodeID, long currTS,
+      SchedulerApplicationAttempt application) {
+    ApplicationId applicationId = application.getApplicationId();
+
+    if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+        && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+        > currTS) {
+      appsAllocation.put(applicationId,
+          new AppAllocation(application.getPriority(), nodeID,
+              application.getQueueName()));
+    }
+
+    if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+        && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+        <= currTS) {
+      turnOffActivityMonitoringForApp(applicationId);
+    }
+  }
+
+  // Add queue, application or container activity into specific node allocation.
+  void addSchedulingActivityForNode(NodeId nodeID, String parentName,
+      String childName, String priority, ActivityState state, String diagnostic,
+      String type) {
+    if (shouldRecordThisNode(nodeID)) {
+      NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
+      nodeAllocation.addAllocationActivity(parentName, childName, priority,
+          state, diagnostic, type);
+    }
+  }
+
+  // Add queue, application or container activity into specific application
+  // allocation.
+  void addSchedulingActivityForApp(ApplicationId applicationId,
+      String containerId, String priority, ActivityState state,
+      String diagnostic, String type) {
+    if (shouldRecordThisApp(applicationId)) {
+      AppAllocation appAllocation = appsAllocation.get(applicationId);
+      appAllocation.addAppAllocationActivity(containerId, priority, state,
+          diagnostic, type);
+    }
+  }
+
+  // Update container allocation meta status for this node allocation.
+  // It updates general container status but not the detailed activity state
+  // in updateActivityState.
+  void updateAllocationFinalState(NodeId nodeID, ContainerId containerId,
+      AllocationState containerState) {
+    if (shouldRecordThisNode(nodeID)) {
+      NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
+      nodeAllocation.updateContainerState(containerId, containerState);
+    }
+  }
+
+  void finishAppAllocationRecording(ApplicationId applicationId,
+      ContainerId containerId, ActivityState appState, String diagnostic) {
+    if (shouldRecordThisApp(applicationId)) {
+      long currTS = SystemClock.getInstance().getTime();
+      AppAllocation appAllocation = appsAllocation.remove(applicationId);
+      appAllocation.updateAppContainerStateAndTime(containerId, appState,
+          currTS, diagnostic);
+
+      List<AppAllocation> appAllocations;
+      if (completedAppAllocations.containsKey(applicationId)) {
+        appAllocations = completedAppAllocations.get(applicationId);
+      } else {
+        appAllocations = new ArrayList<>();
+        completedAppAllocations.put(applicationId, appAllocations);
+      }
+      if (appAllocations.size() == 1000) {
+        appAllocations.remove(0);
+      }
+      appAllocations.add(appAllocation);
+
+      if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+          <= currTS) {
+        turnOffActivityMonitoringForApp(applicationId);
+      }
+    }
+  }
+
+  void finishNodeUpdateRecording(NodeId nodeID) {
+    List<NodeAllocation> value = recordingNodesAllocation.get(nodeID);
+    long timeStamp = SystemClock.getInstance().getTime();
+
+    if (value != null) {
+      if (value.size() > 0) {
+        lastAvailableNodeActivities = value;
+        for (NodeAllocation allocation : lastAvailableNodeActivities) {
+          allocation.transformToTree();
+          allocation.setTimeStamp(timeStamp);
+        }
+        if (recordNextAvailableNode) {
+          recordNextAvailableNode = false;
+        }
+      }
+
+      if (shouldRecordThisNode(nodeID)) {
+        recordingNodesAllocation.remove(nodeID);
+        completedNodeAllocations.put(nodeID, value);
+        stopRecordNodeUpdateActivities(nodeID);
+      }
+    }
+  }
+
+  boolean shouldRecordThisApp(ApplicationId applicationId) {
+    return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+        && appsAllocation.containsKey(applicationId);
+  }
+
+  boolean shouldRecordThisNode(NodeId nodeID) {
+    return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation
+        .containsKey(nodeID);
+  }
+
+  private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) {
+    List<NodeAllocation> nodeAllocations = recordingNodesAllocation.get(nodeID);
+    NodeAllocation nodeAllocation;
+    // When this node has already stored allocation activities, get the
+    // last allocation for this node.
+    if (nodeAllocations.size() != 0) {
+      nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1);
+      // When final state in last allocation is not DEFAULT, it means
+      // last allocation has finished. Create a new allocation for this node,
+      // and add it to the allocation list. Return this new allocation.
+      //
+      // When final state in last allocation is DEFAULT,
+      // it means last allocation has not finished. Just get last allocation.
+      if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) {
+        nodeAllocation = new NodeAllocation(nodeID);
+        nodeAllocations.add(nodeAllocation);
+      }
+    }
+    // When this node has not stored allocation activities,
+    // create a new allocation for this node, and add it to the allocation list.
+    // Return this new allocation.
+    else {
+      nodeAllocation = new NodeAllocation(nodeID);
+      nodeAllocations.add(nodeAllocation);
+    }
+    return nodeAllocation;
+  }
+
+  private void stopRecordNodeUpdateActivities(NodeId nodeId) {
+    activeRecordedNodes.remove(nodeId);
+  }
+
+  private void turnOffActivityMonitoringForApp(ApplicationId applicationId) {
+    recordingAppActivitiesUntilSpecifiedTime.remove(applicationId);
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
new file mode 100644
index 0000000..fc4738e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of diagnostics.
+ */
+public class ActivityDiagnosticConstant {
+  // EMPTY means it does not have any diagnostic to display.
+  // In order not to show "diagnostic" line in frontend,
+  // we set the value to null.
+  public final static String EMPTY = null;
+  public final static String NOT_ABLE_TO_ACCESS_PARTITION =
+      "Not able to access partition";
+  public final static String QUEUE_DO_NOT_NEED_MORE_RESOURCE =
+      "Queue does not need more resource";
+  public final static String QUEUE_MAX_CAPACITY_LIMIT =
+      "Hit queue max-capacity limit";
+  public final static String USER_CAPACITY_MAXIMUM_LIMIT =
+      "Hit user capacity maximum limit";
+  public final static String SKIP_BLACK_LISTED_NODE = "Skip black listed node";
+  public final static String PRIORITY_SKIPPED = "Priority skipped";
+  public final static String PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST =
+      "Priority skipped because off-switch request is null";
+  public final static String
+      PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST =
+      "Priority skipped because partition of node doesn't match request";
+  public final static String SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY =
+      "Priority skipped because of relax locality is not allowed";
+  public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE =
+      "Skipping assigning to Node in Ignore Exclusivity mode";
+  public final static String DO_NOT_NEED_ALLOCATIONATTEMPTINFOS =
+      "Doesn't need containers based on reservation algo!";
+  public final static String QUEUE_SKIPPED_HEADROOM =
+      "Queue skipped because of headroom";
+  public final static String NON_PARTITIONED_PARTITION_FIRST =
+      "Non-partitioned resource request should be scheduled to "
+          + "non-partitioned partition first";
+  public final static String SKIP_NODE_LOCAL_REQUEST =
+      "Skip node-local request";
+  public final static String SKIP_RACK_LOCAL_REQUEST =
+      "Skip rack-local request";
+  public final static String SKIP_OFF_SWITCH_REQUEST =
+      "Skip offswitch request";
+  public final static String REQUEST_CAN_NOT_ACCESS_NODE_LABEL =
+      "Resource request can not access the label";
+  public final static String NOT_SUFFICIENT_RESOURCE =
+      "Node does not have sufficient resource for request";
+  public final static String LOCALITY_SKIPPED = "Locality skipped";
+  public final static String FAIL_TO_ALLOCATE = "Fail to allocate";
+  public final static String COULD_NOT_GET_CONTAINER =
+      "Couldn't get container for allocation";
+  public final static String APPLICATION_DO_NOT_NEED_RESOURCE =
+      "Application does not need more resource";
+  public final static String APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE =
+      "Application priority does not need more resource";
+  public final static String SKIPPED_ALL_PRIORITIES =
+      "All priorities are skipped of the app";
+  public final static String RESPECT_FIFO = "To respect FIFO of applications, "
+      + "skipped following applications in the queue";
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java
new file mode 100644
index 0000000..a03814c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/*
+ * It represents tree node in "NodeAllocation" tree structure.
+ * Each node may represent queue, application or container in allocation activity.
+ * Node may have children node if successfully allocated to next level.
+ */
+public class ActivityNode {
+  private String activityNodeName;
+  private String parentName;
+  private String appPriority;
+  private String requestPriority;
+  private ActivityState state;
+  private String diagnostic;
+
+  private List<ActivityNode> childNode;
+
+  public ActivityNode(String activityNodeName, String parentName,
+      String priority, ActivityState state, String diagnostic, String type) {
+    this.activityNodeName = activityNodeName;
+    this.parentName = parentName;
+    if (type != null) {
+      if (type.equals("app")) {
+        this.appPriority = priority;
+      } else if (type.equals("container")) {
+        this.requestPriority = priority;
+      }
+    }
+    this.state = state;
+    this.diagnostic = diagnostic;
+    this.childNode = new LinkedList<>();
+  }
+
+  public String getName() {
+    return this.activityNodeName;
+  }
+
+  public String getParentName() {
+    return this.parentName;
+  }
+
+  public void addChild(ActivityNode node) {
+    childNode.add(0, node);
+  }
+
+  public List<ActivityNode> getChildren() {
+    return this.childNode;
+  }
+
+  public ActivityState getState() {
+    return this.state;
+  }
+
+  public String getDiagnostic() {
+    return this.diagnostic;
+  }
+
+  public String getAppPriority() {
+    return appPriority;
+  }
+
+  public String getRequestPriority() {
+    return requestPriority;
+  }
+
+  public boolean getType() {
+    if (appPriority != null) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(this.activityNodeName + " ");
+    sb.append(this.appPriority + " ");
+    sb.append(this.state + " ");
+    if (!this.diagnostic.equals("")) {
+      sb.append(this.diagnostic + "\n");
+    }
+    sb.append("\n");
+    for (ActivityNode child : childNode) {
+      sb.append(child.toString() + "\n");
+    }
+    return sb.toString();
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java
new file mode 100644
index 0000000..bce1fc9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of activity operation states.
+ */
+public enum ActivityState {
+  // default state when adding a new activity in node allocation
+  DEFAULT,
+  // container is allocated to sub-queues/applications or this queue/application
+  ACCEPTED,
+  // queue or application voluntarily give up to use the resource OR
+  // nothing allocated
+  SKIPPED,
+  // container could not be allocated to sub-queues or this application
+  REJECTED,
+  ALLOCATED, // successfully allocate a new non-reserved container
+  RESERVED,  // successfully reserve a new container
+  RE_RESERVED  // successfully reserve a new container
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java
new file mode 100644
index 0000000..da768e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * It records an activity operation in allocation,
+ * which can be classified as queue, application or container activity.
+ * Other information include state, diagnostic, priority.
+ */
+public class AllocationActivity {
+  private String childName = null;
+  private String parentName = null;
+  private String appPriority = null;
+  private String requestPriority = null;
+  private ActivityState state;
+  private String diagnostic = null;
+
+  private static final Log LOG = LogFactory.getLog(AllocationActivity.class);
+
+  public AllocationActivity(String parentName, String queueName,
+      String priority, ActivityState state, String diagnostic, String type) {
+    this.childName = queueName;
+    this.parentName = parentName;
+    if (type != null) {
+      if (type.equals("app")) {
+        this.appPriority = priority;
+      } else if (type.equals("container")) {
+        this.requestPriority = priority;
+      }
+    }
+    this.state = state;
+    this.diagnostic = diagnostic;
+  }
+
+  public ActivityNode createTreeNode() {
+    if (appPriority != null) {
+      return new ActivityNode(this.childName, this.parentName, this.appPriority,
+          this.state, this.diagnostic, "app");
+    } else if (requestPriority != null) {
+      return new ActivityNode(this.childName, this.parentName,
+          this.requestPriority, this.state, this.diagnostic, "container");
+    } else {
+      return new ActivityNode(this.childName, this.parentName, null, this.state,
+          this.diagnostic, null);
+    }
+  }
+
+  public String getName() {
+    return this.childName;
+  }
+
+  public String getState() {
+    return this.state.toString();
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java
new file mode 100644
index 0000000..e38cefc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of allocation final states.
+ */
+public enum AllocationState {
+  DEFAULT,
+  // queue or application voluntarily give up to use the resource
+  // OR nothing allocated
+  SKIPPED,
+  // successfully allocate a new non-reserved container
+  ALLOCATED,
+  // successfully allocate a new container from an existing reserved container
+  ALLOCATED_FROM_RESERVED,
+  // successfully reserve a new container
+  RESERVED
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
new file mode 100644
index 0000000..15850c0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * It contains allocation information for one application within a period of
+ * time.
+ * Each application allocation may have several allocation attempts.
+ */
+public class AppAllocation {
+  private Priority priority = null;
+  private NodeId nodeId;
+  private ContainerId containerId = null;
+  private ActivityState appState = null;
+  private String diagnostic = null;
+  private String queueName = null;
+  private List<ActivityNode> allocationAttempts;
+  private long timestamp;
+
+  public AppAllocation(Priority priority, NodeId nodeId, String queueName) {
+    this.priority = priority;
+    this.nodeId = nodeId;
+    this.allocationAttempts = new ArrayList<>();
+    this.queueName = queueName;
+  }
+
+  public void updateAppContainerStateAndTime(ContainerId containerId,
+      ActivityState appState, long ts, String diagnostic) {
+    this.timestamp = ts;
+    this.containerId = containerId;
+    this.appState = appState;
+    this.diagnostic = diagnostic;
+  }
+
+  public void addAppAllocationActivity(String containerId, String priority,
+      ActivityState state, String diagnostic, String type) {
+    ActivityNode container = new ActivityNode(containerId, null, priority,
+        state, diagnostic, type);
+    this.allocationAttempts.add(container);
+    if (state == ActivityState.REJECTED) {
+      this.appState = ActivityState.SKIPPED;
+    } else {
+      this.appState = state;
+    }
+  }
+
+  public String getNodeId() {
+    return nodeId.toString();
+  }
+
+  public String getQueueName() {
+    return queueName;
+  }
+
+  public ActivityState getAppState() {
+    return appState;
+  }
+
+  public String getPriority() {
+    if (priority == null) {
+      return null;
+    }
+    return priority.toString();
+  }
+
+  public String getContainerId() {
+    if (containerId == null) {
+      return null;
+    }
+    return containerId.toString();
+  }
+
+  public String getDiagnostic() {
+    return diagnostic;
+  }
+
+  public long getTime() {
+    return this.timestamp;
+  }
+
+  public List<ActivityNode> getAllocationAttempts() {
+    return allocationAttempts;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java
new file mode 100644
index 0000000..6911299
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/*
+ * It contains allocation information for one allocation in a node heartbeat.
+ * Detailed allocation activities are first stored in "AllocationActivity"
+ * as operations, then transformed to a tree structure.
+ * Tree structure starts from root queue and ends in leaf queue,
+ * application or container allocation.
+ */
+public class NodeAllocation {
+  private NodeId nodeId;
+  private long timeStamp;
+  private ContainerId containerId = null;
+  private AllocationState containerState = AllocationState.DEFAULT;
+  private List<AllocationActivity> allocationOperations;
+
+  private ActivityNode root = null;
+
+  private static final Log LOG = LogFactory.getLog(NodeAllocation.class);
+
+  public NodeAllocation(NodeId nodeId) {
+    this.nodeId = nodeId;
+    this.allocationOperations = new ArrayList<>();
+  }
+
+  public void addAllocationActivity(String parentName, String childName,
+      String priority, ActivityState state, String diagnostic, String type) {
+    AllocationActivity allocate = new AllocationActivity(parentName, childName,
+        priority, state, diagnostic, type);
+    this.allocationOperations.add(allocate);
+  }
+
+  public void updateContainerState(ContainerId containerId,
+      AllocationState containerState) {
+    this.containerId = containerId;
+    this.containerState = containerState;
+  }
+
+  // In node allocation, transform each activity to a tree-like structure
+  // for frontend activity display.
+  // eg:    root
+  //         / \
+  //        a   b
+  //       / \
+  //    app1 app2
+  //    / \
+  //  CA1 CA2
+  // CA means Container Attempt
+  public void transformToTree() {
+    List<ActivityNode> allocationTree = new ArrayList<>();
+
+    if (root == null) {
+      Set<String> names = Collections.newSetFromMap(new ConcurrentHashMap<>());
+      ListIterator<AllocationActivity> ite = allocationOperations.listIterator(
+          allocationOperations.size());
+      while (ite.hasPrevious()) {
+        String name = ite.previous().getName();
+        if (name != null) {
+          if (!names.contains(name)) {
+            names.add(name);
+          } else {
+            ite.remove();
+          }
+        }
+      }
+
+      for (AllocationActivity allocationOperation : allocationOperations) {
+        ActivityNode node = allocationOperation.createTreeNode();
+        String name = node.getName();
+        for (int i = allocationTree.size() - 1; i > -1; i--) {
+          if (allocationTree.get(i).getParentName().equals(name)) {
+            node.addChild(allocationTree.get(i));
+            allocationTree.remove(i);
+          } else {
+            break;
+          }
+        }
+        allocationTree.add(node);
+      }
+      root = allocationTree.get(0);
+    }
+  }
+
+  public void setTimeStamp(long timeStamp) {
+    this.timeStamp = timeStamp;
+  }
+
+  public long getTimeStamp() {
+    return this.timeStamp;
+  }
+
+  public AllocationState getFinalAllocationState() {
+    return containerState;
+  }
+
+  public String getContainerId() {
+    if (containerId == null)
+      return null;
+    return containerId.toString();
+  }
+
+  public ActivityNode getRoot() {
+    return root;
+  }
+
+  public String getNodeId() {
+    return nodeId.toString();
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 9c88154..1d8f929 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -48,6 +48,7 @@
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -91,12 +92,15 @@
   protected CapacitySchedulerContext csContext;
   protected YarnAuthorizationProvider authorizer = null;
 
-  public AbstractCSQueue(CapacitySchedulerContext cs, 
+  protected ActivitiesManager activitiesManager;
+
+  public AbstractCSQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     this.labelManager = cs.getRMContext().getNodeLabelManager();
     this.parent = parent;
     this.queueName = queueName;
     this.resourceCalculator = cs.getResourceCalculator();
+    this.activitiesManager = cs.getActivitiesManager();
     
     // must be called after parent and queueName is set
     this.metrics =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index bedf455..f03a0df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -92,22 +92,12 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
@@ -307,6 +297,8 @@
     this.applications = new ConcurrentHashMap<>();
     this.labelManager = rmContext.getNodeLabelManager();
     authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
+    this.activitiesManager = new ActivitiesManager(rmContext);
+    activitiesManager.init(conf);
     initializeQueues(this.conf);
     this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
 
@@ -344,6 +336,7 @@
   @Override
   public void serviceStart() throws Exception {
     startSchedulerThreads();
+    activitiesManager.start();
     super.serviceStart();
   }
 
@@ -523,7 +516,7 @@
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
     CSQueue newRoot = 
         parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
-            newQueues, queues, noop); 
+            newQueues, queues, noop);
     
     // Ensure all existing queues are still present
     validateExistingQueues(queues, newQueues);
@@ -650,7 +643,7 @@
         throw new IllegalStateException(
             "Only Leaf Queues can be reservable for " + queueName);
       }
-      ParentQueue parentQueue = 
+      ParentQueue parentQueue =
         new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
 
       // Used only for unit tests
@@ -802,7 +795,7 @@
 
     FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
         application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
-            application.getPriority(), isAttemptRecovering);
+            application.getPriority(), isAttemptRecovering, activitiesManager);
     if (transferStateFromPreviousAttempt) {
       attempt.transferStateFromPreviousAttempt(
           application.getCurrentAppAttempt());
@@ -1233,6 +1226,7 @@
 
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
+
       FiCaSchedulerApp reservedApplication =
           getCurrentAttemptForContainer(reservedContainer.getContainerId());
 
@@ -1262,6 +1256,19 @@
         tmp.getAssignmentInformation().incrAllocations();
         updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
         schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            queue.getParent().getQueueName(), queue.getQueueName(),
+            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+        ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+            node, reservedContainer.getContainerId(),
+            AllocationState.ALLOCATED_FROM_RESERVED);
+      } else {
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            queue.getParent().getQueueName(), queue.getQueueName(),
+            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+        ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+            node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
       }
     }
 
@@ -1371,7 +1378,11 @@
       setLastNodeUpdateTime(Time.now());
       nodeUpdate(node);
       if (!scheduleAsynchronously) {
+        ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+            node.getNodeID());
         allocateContainersToNode(getNode(node.getNodeID()));
+        ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+            node.getNodeID());
       }
     }
     break;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index b39b289..c41a7bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
@@ -80,4 +81,6 @@
    *         cluster.
    */
   ResourceUsage getClusterResourceUsage();
+
+  ActivitiesManager getActivitiesManager();
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a243e93..6bbe85e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -19,15 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -65,9 +57,11 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -75,6 +69,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -135,7 +130,7 @@
     super(cs, queueName, parent, old);
     this.scheduler = cs;
 
-    this.activeUsersManager = new ActiveUsersManager(metrics); 
+    this.activeUsersManager = new ActiveUsersManager(metrics);
 
     // One time initialization is enough since it is static ordering policy
     this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
@@ -144,7 +139,7 @@
       LOG.debug("LeafQueue:" + " name=" + queueName
         + ", fullname=" + getQueuePath());
     }
-    
+
     setupQueueConfigs(cs.getClusterResource());
   }
 
@@ -862,7 +857,7 @@
     float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
     limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
   }
-  
+
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
@@ -881,6 +876,10 @@
     if (reservedContainer != null) {
       FiCaSchedulerApp application =
           getApplication(reservedContainer.getApplicationAttemptId());
+
+      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+          node.getNodeID(), SystemClock.getInstance().getTime(), application);
+
       synchronized (application) {
         CSAssignment assignment =
             application.assignContainers(clusterResource, node,
@@ -895,6 +894,10 @@
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
         && !accessibleToPartition(node.getPartition())) {
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+              .getPartition());
       return CSAssignment.NULL_ASSIGNMENT;
     }
 
@@ -907,6 +910,9 @@
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-partition=" + node.getPartition());
       }
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
       return CSAssignment.NULL_ASSIGNMENT;
     }
 
@@ -914,13 +920,23 @@
         orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) {
       FiCaSchedulerApp application = assignmentIterator.next();
 
+      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+          node.getNodeID(), SystemClock.getInstance().getTime(), application);
+
       // Check queue max-capacity limit
       if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
           currentResourceLimits, application.getCurrentReservation(),
           schedulingMode)) {
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+            activitiesManager, node,
+            application, application.getPriority(),
+            ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY);
         return CSAssignment.NULL_ASSIGNMENT;
       }
-      
+
       Resource userLimit =
           computeUserLimitAndSetHeadroom(application, clusterResource,
               node.getPartition(), schedulingMode);
@@ -930,6 +946,10 @@
           application, node.getPartition(), currentResourceLimits)) {
         application.updateAMContainerDiagnostics(AMState.ACTIVATED,
             "User capacity has reached its maximum limit.");
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+            activitiesManager, node,
+            application, application.getPriority(),
+            ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT);
         continue;
       }
 
@@ -971,10 +991,17 @@
           incReservedResource(node.getPartition(), reservedRes);
         }
 
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY);
+
         // Done
         return assignment;
       } else if (assignment.getSkippedType()
           == CSAssignment.SkippedType.OTHER) {
+        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+            activitiesManager, application.getApplicationId(),
+            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
         application.updateNodeInfoForAMDiagnostics(node);
       } else if(assignment.getSkippedType()
           == CSAssignment.SkippedType.QUEUE_LIMIT) {
@@ -982,9 +1009,18 @@
       } else {
         // If we don't allocate anything, and it is not skipped by application,
         // we will return to respect FIFO of applications
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.RESPECT_FIFO);
+        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+            activitiesManager, application.getApplicationId(),
+            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
         return CSAssignment.NULL_ASSIGNMENT;
       }
     }
+    ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+        getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+        ActivityDiagnosticConstant.EMPTY);
 
     return CSAssignment.NULL_ASSIGNMENT;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 9ae35ee..a245e3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -42,12 +42,11 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -81,7 +80,7 @@
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
-  public ParentQueue(CapacitySchedulerContext cs, 
+  public ParentQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
     this.scheduler = cs;
@@ -98,14 +97,14 @@
           "capacity of " + rawCapacity + " for queue " + queueName +
           ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
     }
-    
+
     this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
-    
+
     setupQueueConfigs(cs.getClusterResource());
 
-    LOG.info("Initialized parent-queue " + queueName + 
-        " name=" + queueName + 
-        ", fullname=" + getQueuePath()); 
+    LOG.info("Initialized parent-queue " + queueName +
+        " name=" + queueName +
+        ", fullname=" + getQueuePath());
   }
 
   synchronized void setupQueueConfigs(Resource clusterResource)
@@ -380,6 +379,10 @@
         " #applications: " + getNumApplications());
   }
 
+  private String getParentName() {
+    return getParent() != null ? getParent().getQueueName() : "";
+  }
+
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits resourceLimits,
@@ -392,6 +395,16 @@
             + ", because it is not able to access partition=" + node
             .getPartition());
       }
+
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParentName(), getQueueName(), ActivityState.REJECTED,
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+              .getPartition());
+      if (rootQueue) {
+        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+            node);
+      }
+
       return CSAssignment.NULL_ASSIGNMENT;
     }
     
@@ -404,6 +417,15 @@
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-partition=" + node.getPartition());
       }
+
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParentName(), getQueueName(), ActivityState.SKIPPED,
+          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
+      if (rootQueue) {
+        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+            node);
+      }
+
       return CSAssignment.NULL_ASSIGNMENT;
     }
     
@@ -423,9 +445,18 @@
           resourceLimits, Resources.createResource(
               getMetrics().getReservedMB(), getMetrics()
                   .getReservedVirtualCores()), schedulingMode)) {
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+        if (rootQueue) {
+          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+              node);
+        }
+
         break;
       }
-      
+
       // Schedule
       CSAssignment assignedToChild =
           assignContainersToChildQueues(clusterResource, node, resourceLimits,
@@ -436,6 +467,29 @@
       if (Resources.greaterThan(
               resourceCalculator, clusterResource, 
               assignedToChild.getResource(), Resources.none())) {
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY);
+
+        if (node.getReservedContainer() == null) {
+          if (rootQueue) {
+            ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+                activitiesManager, node,
+                assignedToChild.getAssignmentInformation()
+                    .getFirstAllocatedOrReservedContainerId(),
+                AllocationState.ALLOCATED);
+          }
+        } else {
+          if (rootQueue) {
+            ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+                activitiesManager, node,
+                assignedToChild.getAssignmentInformation()
+                    .getFirstAllocatedOrReservedContainerId(),
+                AllocationState.RESERVED);
+          }
+        }
+
         // Track resource utilization for the parent-queue
         allocateResource(clusterResource, assignedToChild.getResource(),
             node.getPartition(), assignedToChild.isIncreasedAllocation());
@@ -474,6 +528,15 @@
 
       } else {
         assignment.setSkippedType(assignedToChild.getSkippedType());
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY);
+        if (rootQueue) {
+          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+              node);
+        }
+
         break;
       }
 
@@ -631,7 +694,7 @@
             resourceToSubtract);
       }
     }
-    
+
     return assignment;
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
index 4d5a7dc..fa13df4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
@@ -24,6 +24,10 @@
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -43,17 +47,25 @@
   FiCaSchedulerApp application;
   final ResourceCalculator rc;
   final RMContext rmContext;
-  
+  ActivitiesManager activitiesManager;
+
   public AbstractContainerAllocator(FiCaSchedulerApp application,
       ResourceCalculator rc, RMContext rmContext) {
+    this(application, rc, rmContext, null);
+  }
+
+  public AbstractContainerAllocator(FiCaSchedulerApp application,
+      ResourceCalculator rc, RMContext rmContext,
+      ActivitiesManager activitiesManager) {
     this.application = application;
     this.rc = rc;
     this.rmContext = rmContext;
+    this.activitiesManager = activitiesManager;
   }
 
   protected CSAssignment getCSAssignmentFromAllocateResult(
       Resource clusterResource, ContainerAllocation result,
-      RMContainer rmContainer) {
+      RMContainer rmContainer, FiCaSchedulerNode node) {
     // Handle skipped
     CSAssignment.SkippedType skipped =
         (result.getAllocationState() == AllocationState.APP_SKIPPED) ?
@@ -61,7 +73,7 @@
         CSAssignment.SkippedType.NONE;
     CSAssignment assignment = new CSAssignment(skipped);
     assignment.setApplication(application);
-    
+
     // Handle excess reservation
     assignment.setExcessReservation(result.getContainerToBeUnreserved());
 
@@ -85,6 +97,23 @@
         assignment.getAssignmentInformation().incrReservations();
         Resources.addTo(assignment.getAssignmentInformation().getReserved(),
             allocatedResource);
+
+        if (rmContainer != null) {
+          ActivitiesLogger.APP.recordAppActivityWithAllocation(
+              activitiesManager, node, application, updatedContainer,
+              ActivityState.RE_RESERVED);
+          ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+              activitiesManager, application.getApplicationId(),
+              ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+        } else {
+          ActivitiesLogger.APP.recordAppActivityWithAllocation(
+              activitiesManager, node, application, updatedContainer,
+              ActivityState.RESERVED);
+          ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
+              activitiesManager, application.getApplicationId(),
+              updatedContainer.getId(), ActivityState.RESERVED,
+              ActivityDiagnosticConstant.EMPTY);
+        }
       } else if (result.getAllocationState() == AllocationState.ALLOCATED){
         // This is a new container
         // Inform the ordering policy
@@ -105,10 +134,18 @@
         assignment.getAssignmentInformation().incrAllocations();
         Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
             allocatedResource);
-        
+
         if (rmContainer != null) {
           assignment.setFulfilledReservation(true);
         }
+
+        ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager,
+            node, application, updatedContainer, ActivityState.ALLOCATED);
+        ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
+            activitiesManager, application.getApplicationId(),
+            updatedContainer.getId(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY);
+
       }
 
       assignment.setContainersToKill(result.getToKillContainers());
@@ -118,13 +155,13 @@
             CSAssignment.SkippedType.QUEUE_LIMIT);
       }
     }
-    
+
     return assignment;
   }
-  
+
   /**
    * allocate needs to handle following stuffs:
-   * 
+   *
    * <ul>
    * <li>Select request: Select a request to allocate. E.g. select a resource
    * request based on requirement/priority/locality.</li>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index 3be8e0e..4eaa24b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -36,12 +37,17 @@
 
   public ContainerAllocator(FiCaSchedulerApp application,
       ResourceCalculator rc, RMContext rmContext) {
+    this(application, rc, rmContext, null);
+  }
+
+  public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc,
+      RMContext rmContext, ActivitiesManager activitiesManager) {
     super(application, rc, rmContext);
 
     increaseContainerAllocator =
         new IncreaseContainerAllocator(application, rc, rmContext);
-    regularContainerAllocator =
-        new RegularContainerAllocator(application, rc, rmContext);
+    regularContainerAllocator = new RegularContainerAllocator(application, rc,
+        rmContext, activitiesManager);
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 29b37d8..21114f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -24,11 +24,13 @@
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
@@ -37,6 +39,12 @@
 
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -57,10 +65,11 @@
   private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
   
   private ResourceRequest lastResourceRequest = null;
-  
+
   public RegularContainerAllocator(FiCaSchedulerApp application,
-      ResourceCalculator rc, RMContext rmContext) {
-    super(application, rc, rmContext);
+      ResourceCalculator rc, RMContext rmContext,
+      ActivitiesManager activitiesManager) {
+    super(application, rc, rmContext, activitiesManager);
   }
   
   private boolean checkHeadroom(Resource clusterResource,
@@ -85,15 +94,23 @@
   private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
       FiCaSchedulerNode node, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
+    Priority priority = schedulerKey.getPriority();
+
     if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
       application.updateAppSkipNodeDiagnostics(
           CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE);
       return ContainerAllocation.APP_SKIPPED;
     }
 
     ResourceRequest anyRequest =
         application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
     if (null == anyRequest) {
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
       return ContainerAllocation.PRIORITY_SKIPPED;
     }
 
@@ -102,6 +119,9 @@
 
     // Do we need containers at this 'priority'?
     if (application.getTotalRequiredResources(schedulerKey) <= 0) {
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
       return ContainerAllocation.PRIORITY_SKIPPED;
     }
 
@@ -116,6 +136,9 @@
         }
         application.updateAppSkipNodeDiagnostics(
             "Skipping assigning to Node in Ignore Exclusivity mode. ");
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE);
         return ContainerAllocation.APP_SKIPPED;
       }
     }
@@ -126,6 +149,10 @@
     if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
         anyRequest.getNodeLabelExpression(), node.getPartition(),
         schedulingMode)) {
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.
+              PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST);
       return ContainerAllocation.PRIORITY_SKIPPED;
     }
 
@@ -134,6 +161,9 @@
         if (LOG.isDebugEnabled()) {
           LOG.debug("doesn't need containers based on reservation algo!");
         }
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS);
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
     }
@@ -143,6 +173,9 @@
         LOG.debug("cannot allocate required resource=" + required
             + " because of headroom");
       }
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM);
       return ContainerAllocation.QUEUE_SKIPPED;
     }
 
@@ -174,7 +207,9 @@
               + missedNonPartitionedRequestSchedulingOpportunity + " required="
               + rmContext.getScheduler().getNumClusterNodes());
         }
-
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST);
         return ContainerAllocation.APP_SKIPPED;
       }
     }
@@ -301,6 +336,9 @@
     }
 
     // Skip node-local request, go to rack-local request
+    ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+        activitiesManager, node, application, schedulerKey.getPriority(),
+        ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST);
     return ContainerAllocation.LOCALITY_SKIPPED;
   }
 
@@ -316,6 +354,9 @@
     }
 
     // Skip rack-local request, go to off-switch request
+    ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+        activitiesManager, node, application, schedulerKey.getPriority(),
+        ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST);
     return ContainerAllocation.LOCALITY_SKIPPED;
   }
 
@@ -332,6 +373,9 @@
 
     application.updateAppSkipNodeDiagnostics(
         CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY);
+    ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+        activitiesManager, node, application, schedulerKey.getPriority(),
+        ActivityDiagnosticConstant.SKIP_OFF_SWITCH_REQUEST);
     return ContainerAllocation.APP_SKIPPED;
   }
 
@@ -339,6 +383,7 @@
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer, SchedulingMode schedulingMode,
       ResourceLimits currentResoureLimits) {
+    Priority priority = schedulerKey.getPriority();
 
     ContainerAllocation allocation;
 
@@ -364,6 +409,9 @@
         application.getResourceRequest(schedulerKey, node.getRackName());
     if (rackLocalResourceRequest != null) {
       if (!rackLocalResourceRequest.getRelaxLocality()) {
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
 
@@ -387,6 +435,9 @@
         application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
     if (offSwitchResourceRequest != null) {
       if (!offSwitchResourceRequest.getRelaxLocality()) {
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, priority,
+            ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
       if (requestType != NodeType.NODE_LOCAL
@@ -408,7 +459,9 @@
 
       return allocation;
     }
-
+    ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+        activitiesManager, node, application, priority,
+        ActivityDiagnosticConstant.PRIORITY_SKIPPED);
     return ContainerAllocation.PRIORITY_SKIPPED;
   }
 
@@ -416,6 +469,7 @@
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       ResourceRequest request, NodeType type, RMContainer rmContainer,
       SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+    Priority priority = schedulerKey.getPriority();
     lastResourceRequest = request;
     
     if (LOG.isDebugEnabled()) {
@@ -432,6 +486,10 @@
       // this is a reserved container, but we cannot allocate it now according
       // to label not match. This can be caused by node label changed
       // We should un-reserve this container.
+      ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+          node, application, priority,
+          ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS_NODE_LABEL,
+          ActivityState.REJECTED);
       return new ContainerAllocation(rmContainer, null,
           AllocationState.LOCALITY_SKIPPED);
     }
@@ -446,6 +504,9 @@
           + " does not have sufficient resource for request : " + request
           + " node total capability : " + node.getTotalResource());
       // Skip this locality request
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE);
       return ContainerAllocation.LOCALITY_SKIPPED;
     }
 
@@ -524,6 +585,9 @@
           // continue.
           if (null == unreservedContainer) {
             // Skip the locality request
+            ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+                activitiesManager, node, application, priority,
+                ActivityDiagnosticConstant.LOCALITY_SKIPPED);
             return ContainerAllocation.LOCALITY_SKIPPED;
           }
         }
@@ -548,6 +612,9 @@
               LOG.debug("we needed to unreserve to be able to allocate");
             }
             // Skip the locality request
+            ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+                activitiesManager, node, application, priority,
+                ActivityDiagnosticConstant.LOCALITY_SKIPPED);
             return ContainerAllocation.LOCALITY_SKIPPED;          
           }
         }
@@ -560,6 +627,9 @@
         return result;
       }
       // Skip the locality request
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.LOCALITY_SKIPPED);
       return ContainerAllocation.LOCALITY_SKIPPED;    
     }
   }
@@ -636,6 +706,9 @@
       ContainerAllocation ret =
           new ContainerAllocation(allocationResult.containerToBeUnreserved,
               null, AllocationState.APP_SKIPPED);
+      ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+          node, application, schedulerKey.getPriority(),
+          ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED);
       return ret;
     }
 
@@ -662,6 +735,10 @@
       application
           .updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
       LOG.warn("Couldn't get container for allocation!");
+      ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
+          node, application, schedulerKey.getPriority(),
+          ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER,
+          ActivityState.REJECTED);
       return ContainerAllocation.APP_SKIPPED;
     }
 
@@ -741,6 +818,9 @@
               + ", because it doesn't need more resource, schedulingMode="
               + schedulingMode.name() + " node-label=" + node.getPartition());
         }
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, application.getPriority(),
+            ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE);
         return CSAssignment.SKIP_ASSIGNMENT;
       }
       
@@ -755,18 +835,21 @@
           continue;
         }
         return getCSAssignmentFromAllocateResult(clusterResource, result,
-            null);
+            null, node);
       }
 
       // We will reach here if we skipped all priorities of the app, so we will
       // skip the app.
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, application.getPriority(),
+          ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
       return CSAssignment.SKIP_ASSIGNMENT;
     } else {
       ContainerAllocation result =
           allocate(clusterResource, node, schedulingMode, resourceLimits,
               reservedContainer.getReservedSchedulerKey(), reservedContainer);
       return getCSAssignmentFromAllocateResult(clusterResource, result,
-          reservedContainer);
+          reservedContainer, node);
     }
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 67d93a4..33dee80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -56,6 +56,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
@@ -108,8 +109,16 @@
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
+    this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
+        appPriority, isAttemptRecovering, null);
+  }
+
+  public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
+      ActivitiesManager activitiesManager) {
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
-    
+
     RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
 
     Resource amResource;
@@ -139,8 +148,9 @@
     if (scheduler.getResourceCalculator() != null) {
       rc = scheduler.getResourceCalculator();
     }
-    
-    containerAllocator = new ContainerAllocator(this, rc, rmContext);
+
+    containerAllocator = new ContainerAllocator(this, rc, rmContext,
+        activitiesManager);
 
     if (scheduler instanceof CapacityScheduler) {
       capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
@@ -189,7 +199,7 @@
       return null;
     }
     
-    // Required sanity check - AM can call 'allocate' to update resource 
+    // Required sanity check - AM can call 'allocate' to update resource
     // request without locking the scheduler, hence we need to check
     if (getTotalRequiredResources(schedulerKey) <= 0) {
       return null;
@@ -493,7 +503,7 @@
   public LeafQueue getCSLeafQueue() {
     return (LeafQueue)queue;
   }
-  
+
   public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
       SchedulingMode schedulingMode, RMContainer reservedContainer) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 75bffc7..4305fd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -130,9 +130,13 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
@@ -176,6 +180,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webapp.WebServices;
@@ -577,6 +582,124 @@
   }
 
   @GET
+  @Path("/scheduler/activities")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
+      @QueryParam("nodeId") String nodeId) {
+    YarnScheduler scheduler = rm.getRMContext().getScheduler();
+
+    if (scheduler instanceof AbstractYarnScheduler) {
+      String errMessage = "";
+
+      AbstractYarnScheduler abstractYarnScheduler =
+          (AbstractYarnScheduler) scheduler;
+
+      ActivitiesManager activitiesManager =
+          abstractYarnScheduler.getActivitiesManager();
+      if (null == activitiesManager) {
+        errMessage = "Not Capacity Scheduler";
+        return new ActivitiesInfo(errMessage, nodeId);
+      }
+
+      List<FiCaSchedulerNode> nodeList =
+          abstractYarnScheduler.getNodeTracker().getAllNodes();
+
+      boolean illegalInput = false;
+
+      if (nodeList.size() == 0) {
+        illegalInput = true;
+        errMessage = "No node manager running in the cluster";
+      } else {
+        if (nodeId != null) {
+          String hostName = nodeId;
+          String portName = "";
+          if (nodeId.contains(":")) {
+            int index = nodeId.indexOf(":");
+            hostName = nodeId.substring(0, index);
+            portName = nodeId.substring(index + 1);
+          }
+
+          boolean correctNodeId = false;
+          for (FiCaSchedulerNode node : nodeList) {
+            if ((portName.equals("") && node.getRMNode().getHostName().equals(
+                hostName)) || (!portName.equals("") && node.getRMNode()
+                .getHostName().equals(hostName) && String.valueOf(
+                node.getRMNode().getCommandPort()).equals(portName))) {
+              correctNodeId = true;
+              nodeId = node.getNodeID().toString();
+              break;
+            }
+          }
+          if (!correctNodeId) {
+            illegalInput = true;
+            errMessage = "Cannot find node manager with given node id";
+          }
+        }
+      }
+
+      if (!illegalInput) {
+        activitiesManager.recordNextNodeUpdateActivities(nodeId);
+        return activitiesManager.getActivitiesInfo(nodeId);
+      }
+
+      // Return a activities info with error message
+      return new ActivitiesInfo(errMessage, nodeId);
+    }
+
+    return null;
+  }
+
+  @GET
+  @Path("/scheduler/app-activities")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
+      @QueryParam("appId") String appId, @QueryParam("maxTime") String time) {
+    YarnScheduler scheduler = rm.getRMContext().getScheduler();
+
+    if (scheduler instanceof AbstractYarnScheduler) {
+      AbstractYarnScheduler abstractYarnScheduler =
+          (AbstractYarnScheduler) scheduler;
+
+      ActivitiesManager activitiesManager =
+          abstractYarnScheduler.getActivitiesManager();
+      if (null == activitiesManager) {
+        String errMessage = "Not Capacity Scheduler";
+        return new AppActivitiesInfo(errMessage, appId);
+      }
+
+      if(appId == null) {
+        String errMessage = "Must provide an application Id";
+        return new AppActivitiesInfo(errMessage, null);
+      }
+
+      double maxTime = 3.0;
+
+      if (time != null) {
+        if (time.contains(".")) {
+          maxTime = Double.parseDouble(time);
+        } else {
+          maxTime = Double.parseDouble(time + ".0");
+        }
+      }
+
+      ApplicationId applicationId;
+      try {
+        applicationId = ApplicationId.fromString(appId);
+        activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
+        AppActivitiesInfo appActivitiesInfo =
+            activitiesManager.getAppActivitiesInfo(applicationId);
+
+        return appActivitiesInfo;
+      } catch (Exception e) {
+        String errMessage = "Cannot find application with given appId";
+        return new AppActivitiesInfo(errMessage, appId);
+      }
+
+    }
+    return null;
+  }
+
+  @GET
   @Path("/appstatistics")
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public ApplicationStatisticsInfo getAppStatistics(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java
new file mode 100644
index 0000000..0de340a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+
+/*
+ * DAO object to display node allocation activity.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ActivitiesInfo {
+  protected String nodeId;
+  protected String timeStamp;
+  protected String diagnostic = null;
+  protected List<NodeAllocationInfo> allocations;
+
+  private static final Log LOG = LogFactory.getLog(ActivitiesInfo.class);
+
+  public ActivitiesInfo() {
+  }
+
+  public ActivitiesInfo(String errorMessage, String nodeId) {
+    this.diagnostic = errorMessage;
+    this.nodeId = nodeId;
+  }
+
+  public ActivitiesInfo(List<NodeAllocation> nodeAllocations, String nodeId) {
+    this.nodeId = nodeId;
+    this.allocations = new ArrayList<>();
+
+    if (nodeAllocations == null) {
+      diagnostic = (nodeId != null ?
+          "waiting for display" :
+          "waiting for next allocation");
+    } else {
+      if (nodeAllocations.size() == 0) {
+        diagnostic = "do not have available resources";
+      } else {
+        this.nodeId = nodeAllocations.get(0).getNodeId();
+
+        Date date = new Date();
+        date.setTime(nodeAllocations.get(0).getTimeStamp());
+        this.timeStamp = date.toString();
+
+        for (int i = 0; i < nodeAllocations.size(); i++) {
+          NodeAllocation nodeAllocation = nodeAllocations.get(i);
+          NodeAllocationInfo allocationInfo = new NodeAllocationInfo(
+              nodeAllocation);
+          this.allocations.add(allocationInfo);
+        }
+      }
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java
new file mode 100644
index 0000000..9553a720
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * DAO object to display node information in allocation tree.
+ * It corresponds to "ActivityNode" class.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ActivityNodeInfo {
+  protected String name;  // The name for activity node
+  protected String appPriority;
+  protected String requestPriority;
+  protected String allocationState;
+  protected String diagnostic;
+
+  protected List<ActivityNodeInfo> children;
+
+  ActivityNodeInfo() {
+  }
+
+  ActivityNodeInfo(ActivityNode node) {
+    this.name = node.getName();
+    getPriority(node);
+    this.allocationState = node.getState().name();
+    this.diagnostic = node.getDiagnostic();
+    this.children = new ArrayList<>();
+
+    for (ActivityNode child : node.getChildren()) {
+      ActivityNodeInfo containerInfo = new ActivityNodeInfo(child);
+      this.children.add(containerInfo);
+    }
+  }
+
+  private void getPriority(ActivityNode node) {
+    if (node.getType()) {
+      this.appPriority = node.getAppPriority();
+    } else {
+      this.requestPriority = node.getRequestPriority();
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
new file mode 100644
index 0000000..38c45a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/*
+ * DAO object to display application activity.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AppActivitiesInfo {
+  protected String applicationId;
+  protected String diagnostic;
+  protected String timeStamp;
+  protected List<AppAllocationInfo> allocations;
+
+  private static final Log LOG = LogFactory.getLog(AppActivitiesInfo.class);
+
+  public AppActivitiesInfo() {
+  }
+
+  public AppActivitiesInfo(String errorMessage, String applicationId) {
+    this.diagnostic = errorMessage;
+    this.applicationId = applicationId;
+
+    Date date = new Date();
+    date.setTime(SystemClock.getInstance().getTime());
+    this.timeStamp = date.toString();
+  }
+
+  public AppActivitiesInfo(List<AppAllocation> appAllocations,
+      ApplicationId applicationId) {
+    this.applicationId = applicationId.toString();
+    this.allocations = new ArrayList<>();
+
+    if (appAllocations == null) {
+      diagnostic = "waiting for display";
+
+      Date date = new Date();
+      date.setTime(SystemClock.getInstance().getTime());
+      this.timeStamp = date.toString();
+    } else {
+      for (int i = appAllocations.size() - 1; i > -1; i--) {
+        AppAllocation appAllocation = appAllocations.get(i);
+        AppAllocationInfo appAllocationInfo = new AppAllocationInfo(
+            appAllocation);
+        this.allocations.add(appAllocationInfo);
+      }
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
new file mode 100644
index 0000000..21d3788
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/*
+ * DAO object to display application allocation detailed information.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AppAllocationInfo {
+  protected String nodeId;
+  protected String queueName;
+  protected String appPriority;
+  protected String allocatedContainerId;
+  protected String allocationState;
+  protected String diagnostic;
+  protected String timeStamp;
+  protected List<ActivityNodeInfo> allocationAttempt;
+
+  private static final Log LOG = LogFactory.getLog(AppAllocationInfo.class);
+
+  AppAllocationInfo() {
+  }
+
+  AppAllocationInfo(AppAllocation allocation) {
+    this.allocationAttempt = new ArrayList<>();
+
+    this.nodeId = allocation.getNodeId();
+    this.queueName = allocation.getQueueName();
+    this.appPriority = allocation.getPriority();
+    this.allocatedContainerId = allocation.getContainerId();
+    this.allocationState = allocation.getAppState().name();
+    this.diagnostic = allocation.getDiagnostic();
+
+    Date date = new Date();
+    date.setTime(allocation.getTime());
+    this.timeStamp = date.toString();
+
+    for (ActivityNode attempt : allocation.getAllocationAttempts()) {
+      ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt);
+      this.allocationAttempt.add(containerInfo);
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java
new file mode 100644
index 0000000..1350a76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/*
+ * DAO object to display each node allocation in node heartbeat.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class NodeAllocationInfo {
+  protected String allocatedContainerId;
+  protected String finalAllocationState;
+  protected ActivityNodeInfo root = null;
+
+  private static final Log LOG = LogFactory.getLog(NodeAllocationInfo.class);
+
+  NodeAllocationInfo() {
+  }
+
+  NodeAllocationInfo(NodeAllocation allocation) {
+    this.allocatedContainerId = allocation.getContainerId();
+    this.finalAllocationState = allocation.getFinalAllocationState().name();
+
+    root = new ActivityNodeInfo(allocation.getRoot());
+
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
index 649d719..bbdfdd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
@@ -62,9 +62,9 @@
 
 public class TestRMWebServicesCapacitySched extends JerseyTestBase {
 
-  private static MockRM rm;
-  private static CapacitySchedulerConfiguration csConf;
-  private static YarnConfiguration conf;
+  protected static MockRM rm;
+  protected static CapacitySchedulerConfiguration csConf;
+  protected static YarnConfiguration conf;
 
   private class QueueInfo {
     float capacity;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
new file mode 100644
index 0000000..d7b0581
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
@@ -0,0 +1,777 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Test;
+
+import javax.ws.rs.core.MediaType;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestRMWebServicesSchedulerActivities
+    extends TestRMWebServicesCapacitySched {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestRMWebServicesSchedulerActivities.class);
+
+  @Test
+  public void testAssignMultipleContainersPerNodeHeartbeat()
+      throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.1:1234");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 11);
+
+      JSONArray allocations = json.getJSONArray("allocations");
+      for (int i = 0; i < allocations.length(); i++) {
+        if (i != allocations.length() - 1) {
+          verifyStateOfAllocations(allocations.getJSONObject(i),
+              "finalAllocationState", "ALLOCATED");
+          verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
+        } else {
+          verifyStateOfAllocations(allocations.getJSONObject(i),
+              "finalAllocationState", "SKIPPED");
+          verifyQueueOrder(allocations.getJSONObject(i), "root-a-b");
+        }
+      }
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAssignWithoutAvailableResource() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.1");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testNoNM() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    try {
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.1:1234");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testWrongNodeId() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.0");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testReserveNewContainer() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
+        rm.getResourceTrackerService());
+    MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
+        rm.getResourceTrackerService());
+
+    nm1.registerNode();
+    nm2.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+      RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
+      MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
+              10)), null);
+
+      // Reserve new container
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.2");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b3-b1");
+
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "finalAllocationState", "RESERVED");
+
+      // Do a node heartbeat again without releasing container from app2
+      r = resource();
+      params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.2");
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      verifyQueueOrder(json.getJSONObject("allocations"), "b1");
+
+      allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "finalAllocationState", "SKIPPED");
+
+      // Finish application 2
+      CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+      ContainerId containerId = ContainerId.newContainerId(
+          am2.getApplicationAttemptId(), 1);
+      cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
+              .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+          RMContainerEventType.FINISHED);
+
+      // Do a node heartbeat again
+      r = resource();
+      params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.2");
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      verifyQueueOrder(json.getJSONObject("allocations"), "b1");
+
+      allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "finalAllocationState",
+          "ALLOCATED_FROM_RESERVED");
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testActivityJSON() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("nodeId", "127.0.0.1");
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "finalAllocationState",
+          "ALLOCATED");
+
+      verifyNumberOfNodes(allocations, 6);
+
+      verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1");
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  private void verifyNumberOfNodes(JSONObject allocation, int realValue)
+      throws Exception {
+    if (allocation.isNull("root")) {
+      assertEquals("State of allocation is wrong", 0, realValue);
+    } else {
+      assertEquals("State of allocation is wrong",
+          1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue);
+    }
+  }
+
+  private int getNumberOfNodes(JSONObject allocation) throws Exception {
+    if (!allocation.isNull("children")) {
+      Object object = allocation.get("children");
+      if (object.getClass() == JSONObject.class) {
+        return 1 + getNumberOfNodes((JSONObject) object);
+      } else {
+        int count = 0;
+        for (int i = 0; i < ((JSONArray) object).length(); i++) {
+          count += (1 + getNumberOfNodes(
+              ((JSONArray) object).getJSONObject(i)));
+        }
+        return count;
+      }
+    } else {
+      return 0;
+    }
+  }
+
+  private void verifyStateOfAllocations(JSONObject allocation,
+      String nameToCheck, String realState) throws Exception {
+    assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
+        realState);
+  }
+
+  private void verifyNumberOfAllocations(JSONObject json, int realValue)
+      throws Exception {
+    if (json.isNull("allocations")) {
+      assertEquals("Number of allocations is wrong", 0, realValue);
+    } else {
+      Object object = json.get("allocations");
+      if (object.getClass() == JSONObject.class) {
+        assertEquals("Number of allocations is wrong", 1, realValue);
+      } else if (object.getClass() == JSONArray.class) {
+        assertEquals("Number of allocations is wrong",
+            ((JSONArray) object).length(), realValue);
+      }
+    }
+  }
+
+  private void verifyQueueOrder(JSONObject json, String realOrder)
+      throws Exception {
+    String order = "";
+    if (!json.isNull("root")) {
+      JSONObject root = json.getJSONObject("root");
+      order = root.getString("name") + "-" + getQueueOrder(root);
+    }
+    assertEquals("Order of queue is wrong",
+        order.substring(0, order.length() - 1), realOrder);
+  }
+
+  private String getQueueOrder(JSONObject node) throws Exception {
+    if (!node.isNull("children")) {
+      Object children = node.get("children");
+      if (children.getClass() == JSONObject.class) {
+        if (!((JSONObject) children).isNull("appPriority")) {
+          return "";
+        }
+        return ((JSONObject) children).getString("name") + "-" + getQueueOrder(
+            (JSONObject) children);
+      } else if (children.getClass() == JSONArray.class) {
+        String order = "";
+        for (int i = 0; i < ((JSONArray) children).length(); i++) {
+          JSONObject child = (JSONObject) ((JSONArray) children).get(i);
+          if (!child.isNull("appPriority")) {
+            return "";
+          }
+          order += (child.getString("name") + "-" + getQueueOrder(child));
+        }
+        return order;
+      }
+    }
+    return "";
+  }
+
+  private void verifyNumberOfAllocationAttempts(JSONObject allocation,
+      int realValue) throws Exception {
+    if (allocation.isNull("allocationAttempt")) {
+      assertEquals("Number of allocation attempts is wrong", 0, realValue);
+    } else {
+      Object object = allocation.get("allocationAttempt");
+      if (object.getClass() == JSONObject.class) {
+        assertEquals("Number of allocations attempts is wrong", 1, realValue);
+      } else if (object.getClass() == JSONArray.class) {
+        assertEquals("Number of allocations attempts is wrong",
+            ((JSONArray) object).length(), realValue);
+      }
+    }
+  }
+
+  @Test
+  public void testAppActivityJSON() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+      nm.nodeHeartbeat(true);
+      Thread.sleep(5000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
+
+      verifyNumberOfAllocationAttempts(allocations, 1);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAppAssignMultipleContainersPerNodeHeartbeat()
+      throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+      nm.nodeHeartbeat(true);
+      Thread.sleep(5000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 10);
+
+      JSONArray allocations = json.getJSONArray("allocations");
+      for (int i = 0; i < allocations.length(); i++) {
+        verifyStateOfAllocations(allocations.getJSONObject(i),
+            "allocationState", "ACCEPTED");
+      }
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAppAssignWithoutAvailableResource() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 10), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              10)), null);
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+      nm.nodeHeartbeat(true);
+      Thread.sleep(5000);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAppNoNM() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
+
+      //Get JSON
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      //Get JSON
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 0);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testAppReserveNewContainer() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
+        rm.getResourceTrackerService());
+    MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
+        rm.getResourceTrackerService());
+
+    nm1.registerNode();
+    nm2.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+      RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
+      MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
+              10)), null);
+
+      // Reserve new container
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      // Do a node heartbeat again without releasing container from app2
+      r = resource();
+      params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 2);
+
+      // Finish application 2
+      CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+      ContainerId containerId = ContainerId.newContainerId(
+          am2.getApplicationAttemptId(), 1);
+      cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
+              .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+          RMContainerEventType.FINISHED);
+
+      // Do a node heartbeat again
+      r = resource();
+      params = new MultivaluedMapImpl();
+      params.add("appId", app1.getApplicationId().toString());
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      nm2.nodeHeartbeat(true);
+      Thread.sleep(1000);
+
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/app-activities").queryParams(params).accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 3);
+    }
+    finally {
+      rm.stop();
+    }
+  }
+
+}