YARN-5113. Refactoring and other clean-up for distributed scheduling. (Konstantinos Karanasos via asuresh)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3bb73f5..6c921cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -301,53 +301,60 @@
   /** ACL used in case none is found. Allows nothing. */
   public static final String DEFAULT_YARN_APP_ACL = " ";
 
-  /** Is Distributed Scheduling Enabled. */
+  /** Setting that controls whether distributed scheduling is enabled or not. */
   public static final String DIST_SCHEDULING_ENABLED =
       YARN_PREFIX + "distributed-scheduling.enabled";
   public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
 
-  /** Mininum allocatable container memory for Distributed Scheduling. */
-  public static final String DIST_SCHEDULING_MIN_MEMORY =
-      YARN_PREFIX + "distributed-scheduling.min-memory";
-  public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
+  /** Minimum memory (in MB) used for allocating a container through distributed
+   * scheduling. */
+  public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB =
+      YARN_PREFIX + "distributed-scheduling.min-container-memory-mb";
+  public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512;
 
-  /** Mininum allocatable container vcores for Distributed Scheduling. */
-  public static final String DIST_SCHEDULING_MIN_VCORES =
-      YARN_PREFIX + "distributed-scheduling.min-vcores";
-  public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
+  /** Minimum virtual CPU cores used for allocating a container through
+   * distributed scheduling. */
+  public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES =
+      YARN_PREFIX + "distributed-scheduling.min-container-vcores";
+  public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1;
 
-  /** Maximum allocatable container memory for Distributed Scheduling. */
-  public static final String DIST_SCHEDULING_MAX_MEMORY =
-      YARN_PREFIX + "distributed-scheduling.max-memory";
-  public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
+  /** Maximum memory (in MB) used for allocating a container through distributed
+   * scheduling. */
+  public static final String DIST_SCHEDULING_MAX_MEMORY_MB =
+      YARN_PREFIX + "distributed-scheduling.max-container-memory-mb";
+  public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048;
 
-  /** Maximum allocatable container vcores for Distributed Scheduling. */
-  public static final String DIST_SCHEDULING_MAX_VCORES =
-      YARN_PREFIX + "distributed-scheduling.max-vcores";
-  public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
+  /** Maximum virtual CPU cores used for allocating a container through
+   * distributed scheduling. */
+  public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES =
+      YARN_PREFIX + "distributed-scheduling.max-container-vcores";
+  public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;
 
-  /** Incremental allocatable container memory for Distributed Scheduling. */
-  public static final String DIST_SCHEDULING_INCR_MEMORY =
-      YARN_PREFIX + "distributed-scheduling.incr-memory";
-  public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
+  /** Incremental memory (in MB) used for allocating a container through
+   * distributed scheduling. */
+  public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB =
+      YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb";
+  public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT =
+      512;
 
-  /** Incremental allocatable container vcores for Distributed Scheduling. */
-  public static final String DIST_SCHEDULING_INCR_VCORES =
+  /** Incremental virtual CPU cores used for allocating a container through
+   * distributed scheduling. */
+  public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES =
       YARN_PREFIX + "distributed-scheduling.incr-vcores";
-  public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
+  public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1;
 
-  /** Container token expiry for container allocated via Distributed
-   * Scheduling. */
+  /** Container token expiry for container allocated via distributed
+   * scheduling. */
   public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
-      YARN_PREFIX + "distributed-scheduling.container-token-expiry";
+      YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms";
   public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
       600000;
 
-  /** K least loaded nodes to be provided to the LocalScheduler of a
-   * NodeManager for Distributed Scheduling. */
-  public static final String DIST_SCHEDULING_TOP_K =
-      YARN_PREFIX + "distributed-scheduling.top-k";
-  public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
+  /** Number of nodes to be used by the LocalScheduler of a NodeManager for
+   * dispatching containers during distributed scheduling. */
+  public static final String DIST_SCHEDULING_NODES_NUMBER_USED =
+      YARN_PREFIX + "distributed-scheduling.nodes-used";
+  public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10;
 
   /** Frequency for computing least loaded NMs. */
   public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
@@ -355,7 +362,7 @@
   public static final long
       NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000;
 
-  /** Comparator for determining Node Load for Distributed Scheduling. */
+  /** Comparator for determining node load for Distributed Scheduling. */
   public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
       YARN_PREFIX + "nm-container-queuing.load-comparator";
   public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT =
@@ -378,13 +385,13 @@
       YARN_PREFIX + "nm-container-queuing.max-queue-length";
   public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
 
-  /** Min wait time of container queue at NodeManager. */
+  /** Min queue wait time for a container at a NodeManager. */
   public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
       YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
   public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT =
       1;
 
-  /** Max wait time of container queue at NodeManager. */
+  /** Max queue wait time for a container queue at a NodeManager. */
   public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
       YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
   public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT =
@@ -1691,17 +1698,21 @@
   public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
       + "application.classpath";
 
+  /** The setting that controls whether AMRMProxy is enabled or not. */
   public static final String AMRM_PROXY_ENABLED = NM_PREFIX
-      + "amrmproxy.enable";
+      + "amrmproxy.enabled";
   public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+
   public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
       + "amrmproxy.address";
   public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
   public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
       + DEFAULT_AMRM_PROXY_PORT;
+
   public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
       + "amrmproxy.client.thread-count";
   public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+
   public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
       NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
   public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index a4e5b0a..668821d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -124,41 +124,6 @@
     configurationPrefixToSkipCompare
         .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
 
-    // Ignore Distributed Scheduling Related Configurations.
-    // Since it is still a "work in progress" feature
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_ENABLED);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
-    configurationPrefixToSkipCompare
-        .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV);
-
     // Set by container-executor.cfg
     configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index c649071..71321e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -82,9 +82,9 @@
 /**
  * Validates End2End Distributed Scheduling flow which includes the AM
  * specifying OPPORTUNISTIC containers in its resource requests,
- * the AMRMProxyService on the NM, the LocalScheduler RequestInterceptor on
- * the NM and the DistributedSchedulingProtocol used by the framework to talk
- * to the DistributedSchedulingService running on the RM.
+ * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
+ * on the NM and the DistributedSchedulingProtocol used by the framework to talk
+ * to the DistributedSchedulingAMService running on the RM.
  */
 public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c8bc741..3ebdc99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2718,10 +2718,10 @@
 
   <property>
     <description>
-    Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
-    calls from the application masters to the resource manager.
+    Enable/Disable AMRMProxyService in the node manager. This service is used to
+    intercept calls from the application masters to the resource manager.
     </description>
-    <name>yarn.nodemanager.amrmproxy.enable</name>
+    <name>yarn.nodemanager.amrmproxy.enabled</name>
     <value>false</value>
   </property>
 
@@ -2743,8 +2743,9 @@
 
   <property>
     <description>
-    The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
-    AMRMProxyService to create the request processing pipeline for applications.
+    The comma separated list of class names that implement the
+    RequestInterceptor interface. This is used by the AMRMProxyService to create
+    the request processing pipeline for applications.
     </description>
     <name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
     <value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
@@ -2752,6 +2753,141 @@
 
   <property>
     <description>
+    Setting that controls whether distributed scheduling is enabled.
+    </description>
+    <name>yarn.distributed-scheduling.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+    Minimum memory (in MB) used for allocating a container through distributed
+    scheduling.
+    </description>
+    <name>yarn.distributed-scheduling.min-container-memory-mb</name>
+    <value>512</value>
+  </property>
+
+  <property>
+    <description>
+    Minimum virtual CPU cores used for allocating a container through
+    distributed scheduling.
+    </description>
+    <name>yarn.distributed-scheduling.min-container-vcores</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>
+    Maximum memory (in MB) used for allocating a container through distributed
+    scheduling.
+    </description>
+    <name>yarn.distributed-scheduling.max-container-memory-mb</name>
+    <value>2048</value>
+  </property>
+
+  <property>
+    <description>
+    Maximum virtual CPU cores used for allocating a container through
+    distributed scheduling.
+    </description>
+    <name>yarn.distributed-scheduling.max-container-vcores</name>
+    <value>4</value>
+  </property>
+
+  <property>
+    <description>
+    Incremental memory (in MB) used for allocating a container through
+    distributed scheduling.
+    </description>
+    <name>yarn.distributed-scheduling.incr-container-memory-mb</name>
+    <value>512</value>
+  </property>
+
+  <property>
+    <description>
+    Incremental virtual CPU cores used for allocating a container through
+    distributed scheduling.
+    </description>
+    <name>yarn.distributed-scheduling.incr-vcores</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>
+    Container token expiry for container allocated via distributed scheduling.
+    </description>
+    <name>yarn.distributed-scheduling.container-token-expiry-ms</name>
+    <value>600000</value>
+  </property>
+
+  <property>
+    <description>
+    Number of nodes to be used by the LocalScheduler of a NodeManager for
+    dispatching containers during distributed scheduling.
+    </description>
+    <name>yarn.distributed-scheduling.nodes-used</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>
+    Frequency for computing least loaded NMs.
+    </description>
+    <name>yarn.nm-container-queuing.sorting-nodes-interval-ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>
+    Comparator for determining node load for Distributed Scheduling.
+    </description>
+    <name>yarn.nm-container-queuing.load-comparator</name>
+    <value>QUEUE_LENGTH</value>
+  </property>
+
+  <property>
+    <description>
+    Value of standard deviation used for calculation of queue limit thresholds.
+    </description>
+    <name>yarn.nm-container-queuing.queue-limit-stdev</name>
+    <value>1.0f</value>
+  </property>
+
+  <property>
+    <description>
+    Min length of container queue at NodeManager.
+    </description>
+    <name>yarn.nm-container-queuing.min-queue-length</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>
+    Max length of container queue at NodeManager.
+    </description>
+    <name>yarn.nm-container-queuing.max-queue-length</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>
+    Min queue wait time for a container at a NodeManager.
+    </description>
+    <name>yarn.nm-container-queuing.min-queue-wait-time-ms</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>
+    Max queue wait time for a container queue at a NodeManager.
+    </description>
+    <name>yarn.nm-container-queuing.max-queue-wait-time-ms</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>
     Error filename pattern, to identify the file in the container's
     Log directory which contain the container's error log. As error file
     redirection is done by client/AM and yarn will not be aware of the error
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index f8330e3..b9e10ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -144,7 +144,7 @@
               <source>
                 <directory>${basedir}/src/main/proto</directory>
                 <includes>
-                  <include>distributed_scheduler_protocol.proto</include>
+                  <include>distributed_scheduling_am_protocol.proto</include>
                   <include>yarn_server_common_protos.proto</include>
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>yarn_server_common_service_protos.proto</include>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
deleted file mode 100644
index 26faa8f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.io.retry.Idempotent;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-import java.io.IOException;
-
-/**
- * <p>This protocol extends the <code>ApplicationMasterProtocol</code>. It is
- * used by the <code>LocalScheduler</code> running on the NodeManager to wrap
- * the request / response objects of the <code>registerApplicationMaster</code>
- * and <code>allocate</code> methods of the protocol with addition information
- * required to perform Distributed Scheduling.
- * </p>
- */
-public interface DistributedSchedulerProtocol
-    extends ApplicationMasterProtocol {
-
-  /**
-   * <p> Extends the <code>registerApplicationMaster</code> to wrap the response
-   * with additional metadata.</p>
-   *
-   * @param request ApplicationMaster registration request
-   * @return A <code>DistSchedRegisterResponse</code> that contains a standard
-   *         AM registration response along with additional information required
-   *         for Distributed Scheduling
-   * @throws YarnException
-   * @throws IOException
-   */
-  @Public
-  @Unstable
-  @Idempotent
-  DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
-      RegisterApplicationMasterRequest request)
-      throws YarnException, IOException;
-
-  /**
-   * <p> Extends the <code>allocate</code> to wrap the response with additional
-   * metadata.</p>
-   *
-   * @param request ApplicationMaster allocate request
-   * @return A <code>DistSchedAllocateResponse</code> that contains a standard
-   *         AM allocate response along with additional information required
-   *         for Distributed Scheduling
-   * @throws YarnException
-   * @throws IOException
-   */
-  @Public
-  @Unstable
-  @Idempotent
-  DistSchedAllocateResponse allocateForDistributedScheduling(
-      DistSchedAllocateRequest request) throws YarnException, IOException;
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocol.java
new file mode 100644
index 0000000..d1ed1fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocol.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * <p>
+ * This protocol extends the <code>ApplicationMasterProtocol</code>. It is used
+ * by the <code>DistributedScheduler</code> running on the NodeManager to wrap
+ * the request / response objects of the <code>registerApplicationMaster</code>
+ * and <code>allocate</code> methods of the protocol with additional information
+ * required to perform distributed scheduling.
+ * </p>
+ */
+public interface DistributedSchedulingAMProtocol
+    extends ApplicationMasterProtocol {
+
+  /**
+   * <p>
+   * Extends the <code>registerApplicationMaster</code> to wrap the response
+   * with additional metadata.
+   * </p>
+   *
+   * @param request
+   *          ApplicationMaster registration request
+   * @return A <code>RegisterDistributedSchedulingAMResponse</code> that
+   *         contains a standard AM registration response along with additional
+   *         information required for distributed scheduling
+   * @throws YarnException YarnException
+   * @throws IOException IOException
+   */
+  @Public
+  @Unstable
+  @Idempotent
+  RegisterDistributedSchedulingAMResponse
+      registerApplicationMasterForDistributedScheduling(
+            RegisterApplicationMasterRequest request)
+            throws YarnException, IOException;
+
+  /**
+   * <p>
+   * Extends the <code>allocate</code> to wrap the response with additional
+   * metadata.
+   * </p>
+   *
+   * @param request
+   *          ApplicationMaster allocate request
+   * @return A <code>DistributedSchedulingAllocateResponse</code> that contains
+   *         a standard AM allocate response along with additional information
+   *         required for distributed scheduling
+   * @throws YarnException YarnException
+   * @throws IOException IOException
+   */
+  @Public
+  @Unstable
+  @Idempotent
+  DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocolPB.java
similarity index 77%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocolPB.java
index ce7911c..674d4e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocolPB.java
@@ -23,14 +23,15 @@
 import org.apache.hadoop.ipc.ProtocolInfo;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
 import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
-import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol.DistributedSchedulerProtocolService;
+import org.apache.hadoop.yarn.proto.DistributedSchedulingAMProtocol.DistributedSchedulingAMProtocolService;
 
 @Private
 @Unstable
-@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB",
     protocolVersion = 1)
-public interface DistributedSchedulerProtocolPB extends
-    DistributedSchedulerProtocolService.BlockingInterface,
+public interface DistributedSchedulingAMProtocolPB extends
+    DistributedSchedulingAMProtocolService.BlockingInterface,
     ApplicationMasterProtocolService.BlockingInterface,
-    ApplicationMasterProtocolPB {
+    ApplicationMasterProtocolPB  {
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
index c23e27c..8555fc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
@@ -78,7 +78,7 @@
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
-    } else if (protocol == DistributedSchedulerProtocol.class) {
+    } else if (protocol == DistributedSchedulingAMProtocol.class) {
       return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
           YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
           YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulingAMProtocolPBClientImpl.java
similarity index 63%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulingAMProtocolPBClientImpl.java
index 0ca61df..66893b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulingAMProtocolPBClientImpl.java
@@ -23,51 +23,48 @@
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-
-
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-public class DistributedSchedulerProtocolPBClientImpl implements
-    DistributedSchedulerProtocol, Closeable {
+/**
+ * Implementation of {@link DistributedSchedulingAMProtocol}, used when
+ * distributed scheduling is enabled.
+ */
+public class DistributedSchedulingAMProtocolPBClientImpl implements
+    DistributedSchedulingAMProtocol, Closeable {
 
-  private DistributedSchedulerProtocolPB proxy;
+  private DistributedSchedulingAMProtocolPB proxy;
 
-  public DistributedSchedulerProtocolPBClientImpl(long clientVersion,
-      InetSocketAddress addr,
-      Configuration conf) throws IOException {
-    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+  public DistributedSchedulingAMProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
         ProtobufRpcEngine.class);
-    proxy = RPC.getProxy(DistributedSchedulerProtocolPB.class, clientVersion,
+    proxy = RPC.getProxy(DistributedSchedulingAMProtocolPB.class, clientVersion,
         addr, conf);
   }
 
@@ -79,14 +76,14 @@
   }
 
   @Override
-  public DistSchedRegisterResponse
+  public RegisterDistributedSchedulingAMResponse
       registerApplicationMasterForDistributedScheduling(
-          RegisterApplicationMasterRequest request)
-          throws YarnException, IOException {
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
     YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
         ((RegisterApplicationMasterRequestPBImpl) request).getProto();
     try {
-      return new DistSchedRegisterResponsePBImpl(
+      return new RegisterDistributedSchedulingAMResponsePBImpl(
           proxy.registerApplicationMasterForDistributedScheduling(
               null, requestProto));
     } catch (ServiceException e) {
@@ -96,12 +93,14 @@
   }
 
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling(
-      DistSchedAllocateRequest request) throws YarnException, IOException {
-    YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto =
-        ((DistSchedAllocateRequestPBImpl) request).getProto();
+  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException {
+    YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProto
+        requestProto =
+        ((DistributedSchedulingAllocateRequestPBImpl) request).getProto();
     try {
-      return new DistSchedAllocateResponsePBImpl(
+      return new DistributedSchedulingAllocateResponsePBImpl(
           proxy.allocateForDistributedScheduling(null, requestProto));
     } catch (ServiceException e) {
       RPCUtil.unwrapAndThrowException(e);
@@ -110,9 +109,9 @@
   }
 
   @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
     YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
         ((RegisterApplicationMasterRequestPBImpl) request).getProto();
     try {
@@ -125,9 +124,9 @@
   }
 
   @Override
-  public FinishApplicationMasterResponse finishApplicationMaster
-      (FinishApplicationMasterRequest request) throws YarnException,
-      IOException {
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request)
+      throws YarnException, IOException {
     YarnServiceProtos.FinishApplicationMasterRequestProto requestProto =
         ((FinishApplicationMasterRequestPBImpl) request).getProto();
     try {
@@ -140,8 +139,8 @@
   }
 
   @Override
-  public AllocateResponse allocate(AllocateRequest request) throws
-      YarnException, IOException {
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
     YarnServiceProtos.AllocateRequestProto requestProto =
         ((AllocateRequestPBImpl) request).getProto();
     try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulingAMProtocolPBServiceImpl.java
similarity index 69%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulingAMProtocolPBServiceImpl.java
index 2763259..68cc077 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulingAMProtocolPBServiceImpl.java
@@ -20,24 +20,21 @@
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -47,27 +44,32 @@
 
 import java.io.IOException;
 
-public class DistributedSchedulerProtocolPBServiceImpl implements
-    DistributedSchedulerProtocolPB {
+/**
+ * Implementation of {@link DistributedSchedulingAMProtocolPB}.
+ */
+public class DistributedSchedulingAMProtocolPBServiceImpl implements
+    DistributedSchedulingAMProtocolPB {
 
-  private DistributedSchedulerProtocol real;
+  private DistributedSchedulingAMProtocol real;
 
-  public DistributedSchedulerProtocolPBServiceImpl(
-      DistributedSchedulerProtocol impl) {
+  public DistributedSchedulingAMProtocolPBServiceImpl(
+      DistributedSchedulingAMProtocol impl) {
     this.real = impl;
   }
 
   @Override
-  public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
-  registerApplicationMasterForDistributedScheduling(RpcController controller,
-      RegisterApplicationMasterRequestProto proto) throws
-      ServiceException {
+  public YarnServerCommonServiceProtos.
+      RegisterDistributedSchedulingAMResponseProto
+      registerApplicationMasterForDistributedScheduling(
+      RpcController controller, RegisterApplicationMasterRequestProto proto)
+      throws ServiceException {
     RegisterApplicationMasterRequestPBImpl request = new
         RegisterApplicationMasterRequestPBImpl(proto);
     try {
-      DistSchedRegisterResponse response =
+      RegisterDistributedSchedulingAMResponse response =
           real.registerApplicationMasterForDistributedScheduling(request);
-      return ((DistSchedRegisterResponsePBImpl) response).getProto();
+      return ((RegisterDistributedSchedulingAMResponsePBImpl) response)
+          .getProto();
     } catch (YarnException e) {
       throw new ServiceException(e);
     } catch (IOException e) {
@@ -76,16 +78,19 @@
   }
 
   @Override
-  public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
-  allocateForDistributedScheduling(RpcController controller,
-      YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto)
+  public YarnServerCommonServiceProtos.
+      DistributedSchedulingAllocateResponseProto
+      allocateForDistributedScheduling(RpcController controller,
+      YarnServerCommonServiceProtos.
+          DistributedSchedulingAllocateRequestProto proto)
       throws ServiceException {
-    DistSchedAllocateRequestPBImpl request =
-        new DistSchedAllocateRequestPBImpl(proto);
+    DistributedSchedulingAllocateRequestPBImpl request =
+        new DistributedSchedulingAllocateRequestPBImpl(proto);
     try {
-      DistSchedAllocateResponse response = real
+      DistributedSchedulingAllocateResponse response = real
           .allocateForDistributedScheduling(request);
-      return ((DistSchedAllocateResponsePBImpl) response).getProto();
+      return ((DistributedSchedulingAllocateResponsePBImpl) response)
+          .getProto();
     } catch (YarnException e) {
       throw new ServiceException(e);
     } catch (IOException e) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateRequest.java
similarity index 85%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateRequest.java
index 10ff95b..ac40592 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateRequest.java
@@ -26,12 +26,14 @@
 import java.util.List;
 
 /**
- * Request for a distributed scheduler to notify allocation of containers to
- * the Resource Manager.
+ * Object used by the Application Master when distributed scheduling is enabled,
+ * in order to forward the {@link AllocateRequest} for GUARANTEED containers to
+ * the Resource Manager, and to notify the Resource Manager about the allocation
+ * of OPPORTUNISTIC containers through the Distributed Scheduler.
  */
 @Public
 @Evolving
-public abstract class DistSchedAllocateRequest {
+public abstract class DistributedSchedulingAllocateRequest {
 
   /**
    * Get the underlying <code>AllocateRequest</code> object.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
similarity index 71%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
index 5f6e069..7a40449 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
@@ -26,16 +26,24 @@
 
 import java.util.List;
 
+/**
+ * This is the response of the Resource Manager to the
+ * {@link DistributedSchedulingAllocateRequest}, when distributed scheduling is
+ * enabled. It includes the {@link AllocateResponse} for the GUARANTEED
+ * containers allocated by the Resource Manager. Moreover, it includes a list
+ * with the nodes that can be used by the Distributed Scheduler when allocating
+ * containers.
+ */
 @Public
 @Unstable
-public abstract class DistSchedAllocateResponse {
+public abstract class DistributedSchedulingAllocateResponse {
 
   @Public
   @Unstable
-  public static DistSchedAllocateResponse newInstance(AllocateResponse
-      allResp) {
-    DistSchedAllocateResponse response =
-        Records.newRecord(DistSchedAllocateResponse.class);
+  public static DistributedSchedulingAllocateResponse newInstance(
+      AllocateResponse allResp) {
+    DistributedSchedulingAllocateResponse response =
+        Records.newRecord(DistributedSchedulingAllocateResponse.class);
     response.setAllocateResponse(allResp);
     return  response;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
similarity index 67%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
index e4e5138..a0a0e38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
@@ -20,24 +20,30 @@
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 
 import java.util.List;
 
+/**
+ * This is the response to registering an Application Master when distributed
+ * scheduling is enabled. Apart from the
+ * {@link RegisterApplicationMasterResponse}, it includes various parameters
+ * to be used during distributed scheduling, such as the min and max resources
+ * that can be requested by containers.
+ */
 @Public
 @Unstable
-public abstract class DistSchedRegisterResponse {
+public abstract class RegisterDistributedSchedulingAMResponse {
 
   @Public
   @Unstable
-  public static DistSchedRegisterResponse newInstance
+  public static RegisterDistributedSchedulingAMResponse newInstance
       (RegisterApplicationMasterResponse regAMResp) {
-    DistSchedRegisterResponse response =
-        Records.newRecord(DistSchedRegisterResponse.class);
+    RegisterDistributedSchedulingAMResponse response =
+        Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
     response.setRegisterResponse(regAMResp);
     return response;
   }
@@ -53,27 +59,27 @@
 
   @Public
   @Unstable
-  public abstract void setMinAllocatableCapabilty(Resource minResource);
+  public abstract void setMinContainerResource(Resource minResource);
 
   @Public
   @Unstable
-  public abstract Resource getMinAllocatableCapabilty();
+  public abstract Resource getMinContainerResource();
 
   @Public
   @Unstable
-  public abstract void setMaxAllocatableCapabilty(Resource maxResource);
+  public abstract void setMaxContainerResource(Resource maxResource);
 
   @Public
   @Unstable
-  public abstract Resource getMaxAllocatableCapabilty();
+  public abstract Resource getMaxContainerResource();
 
   @Public
   @Unstable
-  public abstract void setIncrAllocatableCapabilty(Resource maxResource);
+  public abstract void setIncrContainerResource(Resource maxResource);
 
   @Public
   @Unstable
-  public abstract Resource getIncrAllocatableCapabilty();
+  public abstract Resource getIncrContainerResource();
 
   @Public
   @Unstable
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
deleted file mode 100644
index eec62da..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-
-import com.google.protobuf.TextFormat;
-
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .RegisterApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.server.api.protocolrecords
-    .DistSchedRegisterResponse;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
-
-  YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto =
-      YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance();
-  YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null;
-  boolean viaProto = false;
-
-  private Resource maxAllocatableCapability;
-  private Resource minAllocatableCapability;
-  private Resource incrAllocatableCapability;
-  private List<NodeId> nodesForScheduling;
-  private RegisterApplicationMasterResponse registerApplicationMasterResponse;
-
-  public DistSchedRegisterResponsePBImpl() {
-    builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder();
-  }
-
-  public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) {
-    this.proto = proto;
-    viaProto = true;
-  }
-
-  public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() {
-    mergeLocalToProto();
-    proto = viaProto ? proto : builder.build();
-    viaProto = true;
-    return proto;
-  }
-
-  private void maybeInitBuilder() {
-    if (viaProto || builder == null) {
-      builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto);
-    }
-    viaProto = false;
-  }
-
-  private synchronized void mergeLocalToProto() {
-    if (viaProto)
-      maybeInitBuilder();
-    mergeLocalToBuilder();
-    proto = builder.build();
-    viaProto = true;
-  }
-
-  private synchronized void mergeLocalToBuilder() {
-    if (this.nodesForScheduling != null) {
-      builder.clearNodesForScheduling();
-      Iterable<YarnProtos.NodeIdProto> iterable =
-          getNodeIdProtoIterable(this.nodesForScheduling);
-      builder.addAllNodesForScheduling(iterable);
-    }
-    if (this.maxAllocatableCapability != null) {
-      builder.setMaxAllocCapability(
-          ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
-    }
-    if (this.minAllocatableCapability != null) {
-      builder.setMinAllocCapability(
-          ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
-    }
-    if (this.incrAllocatableCapability != null) {
-      builder.setIncrAllocCapability(
-          ProtoUtils.convertToProtoFormat(this.incrAllocatableCapability));
-    }
-    if (this.registerApplicationMasterResponse != null) {
-      builder.setRegisterResponse(
-          ((RegisterApplicationMasterResponsePBImpl)
-              this.registerApplicationMasterResponse).getProto());
-    }
-  }
-
-  @Override
-  public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
-    maybeInitBuilder();
-    if(registerApplicationMasterResponse == null) {
-      builder.clearRegisterResponse();
-    }
-    this.registerApplicationMasterResponse = resp;
-  }
-
-  @Override
-  public RegisterApplicationMasterResponse getRegisterResponse() {
-    if (this.registerApplicationMasterResponse != null) {
-      return this.registerApplicationMasterResponse;
-    }
-
-    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasRegisterResponse()) {
-      return null;
-    }
-
-    this.registerApplicationMasterResponse =
-        new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
-    return this.registerApplicationMasterResponse;
-  }
-
-  @Override
-  public void setMaxAllocatableCapabilty(Resource maxResource) {
-    maybeInitBuilder();
-    if(maxAllocatableCapability == null) {
-      builder.clearMaxAllocCapability();
-    }
-    this.maxAllocatableCapability = maxResource;
-  }
-
-  @Override
-  public Resource getMaxAllocatableCapabilty() {
-    if (this.maxAllocatableCapability != null) {
-      return this.maxAllocatableCapability;
-    }
-
-    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasMaxAllocCapability()) {
-      return null;
-    }
-
-    this.maxAllocatableCapability =
-        ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability());
-    return this.maxAllocatableCapability;
-  }
-
-  @Override
-  public void setMinAllocatableCapabilty(Resource minResource) {
-    maybeInitBuilder();
-    if(minAllocatableCapability == null) {
-      builder.clearMinAllocCapability();
-    }
-    this.minAllocatableCapability = minResource;
-  }
-
-  @Override
-  public Resource getMinAllocatableCapabilty() {
-    if (this.minAllocatableCapability != null) {
-      return this.minAllocatableCapability;
-    }
-
-    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasMinAllocCapability()) {
-      return null;
-    }
-
-    this.minAllocatableCapability =
-        ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability());
-    return this.minAllocatableCapability;
-  }
-
-  @Override
-  public void setIncrAllocatableCapabilty(Resource incrResource) {
-    maybeInitBuilder();
-    if(incrAllocatableCapability == null) {
-      builder.clearIncrAllocCapability();
-    }
-    this.incrAllocatableCapability = incrResource;
-  }
-
-  @Override
-  public Resource getIncrAllocatableCapabilty() {
-    if (this.incrAllocatableCapability != null) {
-      return this.incrAllocatableCapability;
-    }
-
-    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasIncrAllocCapability()) {
-      return null;
-    }
-
-    this.incrAllocatableCapability =
-        ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability());
-    return this.incrAllocatableCapability;
-  }
-
-  @Override
-  public void setContainerTokenExpiryInterval(int interval) {
-    maybeInitBuilder();
-    builder.setContainerTokenExpiryInterval(interval);
-  }
-
-  @Override
-  public int getContainerTokenExpiryInterval() {
-    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasContainerTokenExpiryInterval()) {
-      return 0;
-    }
-    return p.getContainerTokenExpiryInterval();
-  }
-
-  @Override
-  public void setContainerIdStart(long containerIdStart) {
-    maybeInitBuilder();
-    builder.setContainerIdStart(containerIdStart);
-  }
-
-  @Override
-  public long getContainerIdStart() {
-    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasContainerIdStart()) {
-      return 0;
-    }
-    return p.getContainerIdStart();
-  }
-
-
-  @Override
-  public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
-    maybeInitBuilder();
-    if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
-      if (this.nodesForScheduling != null) {
-        this.nodesForScheduling.clear();
-      }
-      builder.clearNodesForScheduling();
-      return;
-    }
-    this.nodesForScheduling = new ArrayList<>();
-    this.nodesForScheduling.addAll(nodesForScheduling);
-  }
-
-  @Override
-  public List<NodeId> getNodesForScheduling() {
-    if (nodesForScheduling != null) {
-      return nodesForScheduling;
-    }
-    initLocalNodesForSchedulingList();
-    return nodesForScheduling;
-  }
-
-  private synchronized void initLocalNodesForSchedulingList() {
-    YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
-    nodesForScheduling = new ArrayList<>();
-    if (list != null) {
-      for (YarnProtos.NodeIdProto t : list) {
-        nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
-      }
-    }
-  }
-  private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
-      final List<NodeId> nodeList) {
-    maybeInitBuilder();
-    return new Iterable<YarnProtos.NodeIdProto>() {
-      @Override
-      public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
-        return new Iterator<YarnProtos.NodeIdProto>() {
-
-          Iterator<NodeId> iter = nodeList.iterator();
-
-          @Override
-          public boolean hasNext() {
-            return iter.hasNext();
-          }
-
-          @Override
-          public YarnProtos.NodeIdProto next() {
-            return ProtoUtils.convertToProtoFormat(iter.next());
-          }
-
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-        };
-      }
-    };
-  }
-
-  @Override
-  public String toString() {
-    return TextFormat.shortDebugString(getProto());
-  }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateRequestPBImpl.java
similarity index 82%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateRequestPBImpl.java
index be386b6..d99c85e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateRequestPBImpl.java
@@ -26,39 +26,40 @@
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
 
 import java.util.Iterator;
 import java.util.List;
 
 /**
- * Implementation of {@link DistSchedAllocateRequest} for a distributed
- * scheduler to notify about the allocation of containers to the Resource
- * Manager.
+ * Implementation of {@link DistributedSchedulingAllocateRequest}.
  */
-public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
-  private DistSchedAllocateRequestProto.Builder builder = null;
+public class DistributedSchedulingAllocateRequestPBImpl
+    extends DistributedSchedulingAllocateRequest {
+  private DistributedSchedulingAllocateRequestProto.Builder builder = null;
   private boolean viaProto = false;
 
-  private DistSchedAllocateRequestProto proto;
+  private DistributedSchedulingAllocateRequestProto proto;
   private AllocateRequest allocateRequest;
   private List<Container> containers;
 
-  public DistSchedAllocateRequestPBImpl() {
-    builder = DistSchedAllocateRequestProto.newBuilder();
+  public DistributedSchedulingAllocateRequestPBImpl() {
+    builder = DistributedSchedulingAllocateRequestProto.newBuilder();
   }
 
-  public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) {
+  public DistributedSchedulingAllocateRequestPBImpl(
+      DistributedSchedulingAllocateRequestProto proto) {
     this.proto = proto;
     this.viaProto = true;
   }
 
   @Override
   public AllocateRequest getAllocateRequest() {
-    DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+    DistributedSchedulingAllocateRequestProtoOrBuilder p =
+        viaProto ? proto : builder;
     if (this.allocateRequest != null) {
       return this.allocateRequest;
     }
@@ -88,7 +89,8 @@
   }
 
   private void initAllocatedContainers() {
-    DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+    DistributedSchedulingAllocateRequestProtoOrBuilder p =
+        viaProto ? proto : builder;
     List<ContainerProto> list = p.getAllocatedContainersList();
     this.containers = new ArrayList<Container>();
     for (ContainerProto c : list) {
@@ -110,7 +112,7 @@
     this.containers.addAll(pContainers);
   }
 
-  public DistSchedAllocateRequestProto getProto() {
+  public DistributedSchedulingAllocateRequestProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
@@ -119,7 +121,7 @@
 
   private void maybeInitBuilder() {
     if (viaProto || builder == null) {
-      builder = DistSchedAllocateRequestProto.newBuilder(proto);
+      builder = DistributedSchedulingAllocateRequestProto.newBuilder(proto);
     }
     viaProto = false;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
similarity index 70%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
index 3ea4965..18d5073 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
@@ -20,41 +20,47 @@
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos;
-import org.apache.hadoop.yarn.server.api.protocolrecords
-    .DistSchedAllocateResponse;
-
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
+/**
+ * Implementation of {@link DistributedSchedulingAllocateResponse}.
+ */
+public class DistributedSchedulingAllocateResponsePBImpl extends
+    DistributedSchedulingAllocateResponse {
 
-  YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto =
-      YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance();
-  YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null;
+  YarnServerCommonServiceProtos.DistributedSchedulingAllocateResponseProto
+      proto = YarnServerCommonServiceProtos.
+          DistributedSchedulingAllocateResponseProto.getDefaultInstance();
+  YarnServerCommonServiceProtos.DistributedSchedulingAllocateResponseProto.
+      Builder builder = null;
   boolean viaProto = false;
 
   private AllocateResponse allocateResponse;
   private List<NodeId> nodesForScheduling;
 
-  public DistSchedAllocateResponsePBImpl() {
-    builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder();
+  public DistributedSchedulingAllocateResponsePBImpl() {
+    builder = YarnServerCommonServiceProtos.
+        DistributedSchedulingAllocateResponseProto.newBuilder();
   }
 
-  public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) {
+  public DistributedSchedulingAllocateResponsePBImpl(
+      YarnServerCommonServiceProtos.
+      DistributedSchedulingAllocateResponseProto proto) {
     this.proto = proto;
     viaProto = true;
   }
 
-  public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() {
+  public YarnServerCommonServiceProtos.
+      DistributedSchedulingAllocateResponseProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
@@ -63,7 +69,8 @@
 
   private void maybeInitBuilder() {
     if (viaProto || builder == null) {
-      builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto);
+      builder = YarnServerCommonServiceProtos.
+          DistributedSchedulingAllocateResponseProto.newBuilder(proto);
     }
     viaProto = false;
   }
@@ -79,19 +86,20 @@
   private synchronized void mergeLocalToBuilder() {
     if (this.nodesForScheduling != null) {
       builder.clearNodesForScheduling();
-      Iterable<YarnProtos.NodeIdProto> iterable =
-          getNodeIdProtoIterable(this.nodesForScheduling);
+      Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
+          this.nodesForScheduling);
       builder.addAllNodesForScheduling(iterable);
     }
     if (this.allocateResponse != null) {
       builder.setAllocateResponse(
-          ((AllocateResponsePBImpl)this.allocateResponse).getProto());
+          ((AllocateResponsePBImpl) this.allocateResponse).getProto());
     }
   }
+
   @Override
   public void setAllocateResponse(AllocateResponse response) {
     maybeInitBuilder();
-    if(allocateResponse == null) {
+    if (allocateResponse == null) {
       builder.clearAllocateResponse();
     }
     this.allocateResponse = response;
@@ -103,14 +111,14 @@
       return this.allocateResponse;
     }
 
-    YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
-        viaProto ? proto : builder;
+    YarnServerCommonServiceProtos.
+        DistributedSchedulingAllocateResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
     if (!p.hasAllocateResponse()) {
       return null;
     }
 
-    this.allocateResponse =
-        new AllocateResponsePBImpl(p.getAllocateResponse());
+    this.allocateResponse = new AllocateResponsePBImpl(p.getAllocateResponse());
     return this.allocateResponse;
   }
 
@@ -138,8 +146,9 @@
   }
 
   private synchronized void initLocalNodesForSchedulingList() {
-    YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
-        viaProto ? proto : builder;
+    YarnServerCommonServiceProtos.
+        DistributedSchedulingAllocateResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
     List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
     nodesForScheduling = new ArrayList<>();
     if (list != null) {
@@ -148,6 +157,7 @@
       }
     }
   }
+
   private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
       final List<NodeId> nodeList) {
     maybeInitBuilder();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java
new file mode 100644
index 0000000..4aaf99c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implementation of {@link RegisterDistributedSchedulingAMResponse}.
+ */
+public class RegisterDistributedSchedulingAMResponsePBImpl extends
+    RegisterDistributedSchedulingAMResponse {
+
+  YarnServerCommonServiceProtos.RegisterDistributedSchedulingAMResponseProto
+      proto =
+          YarnServerCommonServiceProtos.
+          RegisterDistributedSchedulingAMResponseProto
+              .getDefaultInstance();
+  YarnServerCommonServiceProtos.RegisterDistributedSchedulingAMResponseProto.
+      Builder builder = null;
+  boolean viaProto = false;
+
+  private Resource maxContainerResource;
+  private Resource minContainerResource;
+  private Resource incrContainerResource;
+  private List<NodeId> nodesForScheduling;
+  private RegisterApplicationMasterResponse registerApplicationMasterResponse;
+
+  public RegisterDistributedSchedulingAMResponsePBImpl() {
+    builder = YarnServerCommonServiceProtos.
+        RegisterDistributedSchedulingAMResponseProto.newBuilder();
+  }
+
+  public RegisterDistributedSchedulingAMResponsePBImpl(
+      YarnServerCommonServiceProtos.
+          RegisterDistributedSchedulingAMResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public YarnServerCommonServiceProtos.
+      RegisterDistributedSchedulingAMResponseProto
+          getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = YarnServerCommonServiceProtos.
+          RegisterDistributedSchedulingAMResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    if (this.nodesForScheduling != null) {
+      builder.clearNodesForScheduling();
+      Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
+          this.nodesForScheduling);
+      builder.addAllNodesForScheduling(iterable);
+    }
+    if (this.maxContainerResource != null) {
+      builder.setMaxContainerResource(ProtoUtils.convertToProtoFormat(
+          this.maxContainerResource));
+    }
+    if (this.minContainerResource != null) {
+      builder.setMinContainerResource(ProtoUtils.convertToProtoFormat(
+          this.minContainerResource));
+    }
+    if (this.incrContainerResource != null) {
+      builder.setIncrContainerResource(
+          ProtoUtils.convertToProtoFormat(this.incrContainerResource));
+    }
+    if (this.registerApplicationMasterResponse != null) {
+      builder.setRegisterResponse(
+          ((RegisterApplicationMasterResponsePBImpl)
+              this.registerApplicationMasterResponse).getProto());
+    }
+  }
+
+  @Override
+  public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
+    maybeInitBuilder();
+    if (registerApplicationMasterResponse == null) {
+      builder.clearRegisterResponse();
+    }
+    this.registerApplicationMasterResponse = resp;
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse getRegisterResponse() {
+    if (this.registerApplicationMasterResponse != null) {
+      return this.registerApplicationMasterResponse;
+    }
+
+    YarnServerCommonServiceProtos.
+        RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
+    if (!p.hasRegisterResponse()) {
+      return null;
+    }
+
+    this.registerApplicationMasterResponse =
+        new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
+    return this.registerApplicationMasterResponse;
+  }
+
+  @Override
+  public void setMaxContainerResource(Resource maxResource) {
+    maybeInitBuilder();
+    if (maxContainerResource == null) {
+      builder.clearMaxContainerResource();
+    }
+    this.maxContainerResource = maxResource;
+  }
+
+  @Override
+  public Resource getMaxContainerResource() {
+    if (this.maxContainerResource != null) {
+      return this.maxContainerResource;
+    }
+
+    YarnServerCommonServiceProtos.
+        RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
+    if (!p.hasMaxContainerResource()) {
+      return null;
+    }
+
+    this.maxContainerResource = ProtoUtils.convertFromProtoFormat(p
+        .getMaxContainerResource());
+    return this.maxContainerResource;
+  }
+
+  @Override
+  public void setMinContainerResource(Resource minResource) {
+    maybeInitBuilder();
+    if (minContainerResource == null) {
+      builder.clearMinContainerResource();
+    }
+    this.minContainerResource = minResource;
+  }
+
+  @Override
+  public Resource getMinContainerResource() {
+    if (this.minContainerResource != null) {
+      return this.minContainerResource;
+    }
+
+    YarnServerCommonServiceProtos.
+        RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
+    if (!p.hasMinContainerResource()) {
+      return null;
+    }
+
+    this.minContainerResource = ProtoUtils.convertFromProtoFormat(p
+        .getMinContainerResource());
+    return this.minContainerResource;
+  }
+
+  @Override
+  public void setIncrContainerResource(Resource incrResource) {
+    maybeInitBuilder();
+    if (incrContainerResource == null) {
+      builder.clearIncrContainerResource();
+    }
+    this.incrContainerResource = incrResource;
+  }
+
+  @Override
+  public Resource getIncrContainerResource() {
+    if (this.incrContainerResource != null) {
+      return this.incrContainerResource;
+    }
+
+    YarnServerCommonServiceProtos.
+        RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
+    if (!p.hasIncrContainerResource()) {
+      return null;
+    }
+
+    this.incrContainerResource = ProtoUtils.convertFromProtoFormat(p
+        .getIncrContainerResource());
+    return this.incrContainerResource;
+  }
+
+  @Override
+  public void setContainerTokenExpiryInterval(int interval) {
+    maybeInitBuilder();
+    builder.setContainerTokenExpiryInterval(interval);
+  }
+
+  @Override
+  public int getContainerTokenExpiryInterval() {
+    YarnServerCommonServiceProtos.
+        RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
+    if (!p.hasContainerTokenExpiryInterval()) {
+      return 0;
+    }
+    return p.getContainerTokenExpiryInterval();
+  }
+
+  @Override
+  public void setContainerIdStart(long containerIdStart) {
+    maybeInitBuilder();
+    builder.setContainerIdStart(containerIdStart);
+  }
+
+  @Override
+  public long getContainerIdStart() {
+    YarnServerCommonServiceProtos.
+        RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
+    if (!p.hasContainerIdStart()) {
+      return 0;
+    }
+    return p.getContainerIdStart();
+  }
+
+  @Override
+  public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+    maybeInitBuilder();
+    if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+      if (this.nodesForScheduling != null) {
+        this.nodesForScheduling.clear();
+      }
+      builder.clearNodesForScheduling();
+      return;
+    }
+    this.nodesForScheduling = new ArrayList<>();
+    this.nodesForScheduling.addAll(nodesForScheduling);
+  }
+
+  @Override
+  public List<NodeId> getNodesForScheduling() {
+    if (nodesForScheduling != null) {
+      return nodesForScheduling;
+    }
+    initLocalNodesForSchedulingList();
+    return nodesForScheduling;
+  }
+
+  private synchronized void initLocalNodesForSchedulingList() {
+    YarnServerCommonServiceProtos.
+        RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+            viaProto ? proto : builder;
+    List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+    nodesForScheduling = new ArrayList<>();
+    if (list != null) {
+      for (YarnProtos.NodeIdProto t : list) {
+        nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+      }
+    }
+  }
+
+  private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
+      final List<NodeId> nodeList) {
+    maybeInitBuilder();
+    return new Iterable<YarnProtos.NodeIdProto>() {
+      @Override
+      public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
+        return new Iterator<YarnProtos.NodeIdProto>() {
+
+          Iterator<NodeId> iter = nodeList.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public YarnProtos.NodeIdProto next() {
+            return ProtoUtils.convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduling_am_protocol.proto
similarity index 80%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduling_am_protocol.proto
index 818eb4a..274eaa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduling_am_protocol.proto
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+
 /**
  * These .proto interfaces are public and stable.
  * Please see http://wiki.apache.org/hadoop/Compatibility
@@ -23,7 +24,7 @@
  */
 
 option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "DistributedSchedulerProtocol";
+option java_outer_classname = "DistributedSchedulingAMProtocol";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 package hadoop.yarn;
@@ -31,9 +32,8 @@
 import "yarn_service_protos.proto";
 import "yarn_server_common_service_protos.proto";
 
-
-service DistributedSchedulerProtocolService {
-  rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
+service DistributedSchedulingAMProtocolService {
+  rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (RegisterDistributedSchedulingAMResponseProto);
+  rpc allocateForDistributedScheduling (DistributedSchedulingAllocateRequestProto) returns (DistributedSchedulingAllocateResponseProto);
   rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
-  rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto);
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 3660252..55ac875 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -26,22 +26,22 @@
 import "yarn_server_common_protos.proto";
 import "yarn_service_protos.proto";
 
-message DistSchedRegisterResponseProto {
+message RegisterDistributedSchedulingAMResponseProto {
   optional RegisterApplicationMasterResponseProto register_response = 1;
-  optional ResourceProto max_alloc_capability = 2;
-  optional ResourceProto min_alloc_capability = 3;
-  optional ResourceProto incr_alloc_capability = 4;
+  optional ResourceProto max_container_resource = 2;
+  optional ResourceProto min_container_resource = 3;
+  optional ResourceProto incr_container_resource = 4;
   optional int32 container_token_expiry_interval = 5;
   optional int64 container_id_start = 6;
   repeated NodeIdProto nodes_for_scheduling = 7;
 }
 
-message DistSchedAllocateResponseProto {
+message DistributedSchedulingAllocateResponseProto {
   optional AllocateResponseProto allocate_response = 1;
   repeated NodeIdProto nodes_for_scheduling = 2;
 }
 
-message DistSchedAllocateRequestProto {
+message DistributedSchedulingAllocateRequestProto {
   optional AllocateRequestProto allocate_request = 1;
   repeated ContainerProto allocated_containers = 2;
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index ac360f4..511db16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -65,7 +65,7 @@
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
@@ -467,10 +467,9 @@
       interceptorClassNames.add(item.trim());
     }
 
-    // Make sure LocalScheduler is present at the beginning
-    // of the chain..
+    // Make sure DistributedScheduler is present at the beginning of the chain.
     if (this.nmContext.isDistributedSchedulingEnabled()) {
-      interceptorClassNames.add(0, LocalScheduler.class.getName());
+      interceptorClassNames.add(0, DistributedScheduler.class.getName());
     }
 
     return interceptorClassNames;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 55c65f4..e6c9bbd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -21,12 +21,11 @@
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 
 import java.io.IOException;
 
@@ -118,8 +117,9 @@
    * @throws IOException
    */
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling(
-      DistSchedAllocateRequest request) throws YarnException, IOException {
+  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException {
     return (this.nextInterceptor != null) ?
         this.nextInterceptor.allocateForDistributedScheduling(request) : null;
   }
@@ -134,10 +134,10 @@
    * @throws IOException
    */
   @Override
-  public DistSchedRegisterResponse
+  public RegisterDistributedSchedulingAMResponse
       registerApplicationMasterForDistributedScheduling(
-          RegisterApplicationMasterRequest request)
-          throws YarnException, IOException {
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
     return (this.nextInterceptor != null) ? this.nextInterceptor
         .registerApplicationMasterForDistributedScheduling(request) : null;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index debff76..75fe022 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -43,12 +43,11 @@
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
-import org.apache.hadoop.yarn.server.api.protocolrecords
-    .DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +62,7 @@
     AbstractRequestInterceptor {
   private static final Logger LOG = LoggerFactory
       .getLogger(DefaultRequestInterceptor.class);
-  private DistributedSchedulerProtocol rmClient;
+  private DistributedSchedulingAMProtocol rmClient;
   private UserGroupInformation user = null;
 
   @Override
@@ -77,13 +76,13 @@
       user.addToken(appContext.getAMRMToken());
       final Configuration conf = this.getConf();
 
-      rmClient =
-          user.doAs(new PrivilegedExceptionAction<DistributedSchedulerProtocol>() {
+      rmClient = user.doAs(
+          new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
             @Override
-            public DistributedSchedulerProtocol run() throws Exception {
+            public DistributedSchedulingAMProtocol run() throws Exception {
               setAMRMTokenService(conf);
               return ServerRMProxy.createRMProxy(conf,
-                  DistributedSchedulerProtocol.class);
+                  DistributedSchedulingAMProtocol.class);
             }
           });
     } catch (IOException e) {
@@ -124,7 +123,7 @@
   }
 
   @Override
-  public DistSchedRegisterResponse
+  public RegisterDistributedSchedulingAMResponse
   registerApplicationMasterForDistributedScheduling
       (RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
@@ -134,13 +133,14 @@
   }
 
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling(
-      DistSchedAllocateRequest request) throws YarnException, IOException {
+  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Forwarding allocateForDistributedScheduling request" +
           "to the real YARN RM");
     }
-    DistSchedAllocateResponse allocateResponse =
+    DistributedSchedulingAllocateResponse allocateResponse =
         rmClient.allocateForDistributedScheduling(request);
     if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
       updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
@@ -180,10 +180,10 @@
 
   @VisibleForTesting
   public void setRMClient(final ApplicationMasterProtocol rmClient) {
-    if (rmClient instanceof DistributedSchedulerProtocol) {
-      this.rmClient = (DistributedSchedulerProtocol)rmClient;
+    if (rmClient instanceof DistributedSchedulingAMProtocol) {
+      this.rmClient = (DistributedSchedulingAMProtocol)rmClient;
     } else {
-      this.rmClient = new DistributedSchedulerProtocol() {
+      this.rmClient = new DistributedSchedulingAMProtocol() {
         @Override
         public RegisterApplicationMasterResponse registerApplicationMaster
             (RegisterApplicationMasterRequest request) throws YarnException,
@@ -205,7 +205,7 @@
         }
 
         @Override
-        public DistSchedRegisterResponse
+        public RegisterDistributedSchedulingAMResponse
         registerApplicationMasterForDistributedScheduling
             (RegisterApplicationMasterRequest request) throws YarnException,
             IOException {
@@ -213,8 +213,9 @@
         }
 
         @Override
-        public DistSchedAllocateResponse allocateForDistributedScheduling(
-            DistSchedAllocateRequest request)
+        public DistributedSchedulingAllocateResponse
+            allocateForDistributedScheduling(
+            DistributedSchedulingAllocateRequest request)
                 throws YarnException, IOException {
           throw new IOException("Not Supported !!");
         }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
index 7a73563..5995af1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
@@ -19,14 +19,14 @@
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
 
 /**
  * Defines the contract to be implemented by the request intercepter classes,
  * that can be used to intercept and inspect messages sent from the application
  * master to the resource manager.
  */
-public interface RequestInterceptor extends DistributedSchedulerProtocol,
+public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
     Configurable {
   /**
    * This method is called for initializing the intercepter. This is guaranteed
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
similarity index 82%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index ec0e8a4..bfb12ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -23,17 +23,13 @@
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -46,14 +42,12 @@
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.nodemanager.amrmproxy
-    .AMRMProxyApplicationContext;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
 
 
 
-import org.apache.hadoop.yarn.server.nodemanager.security
-    .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,19 +62,19 @@
 import java.util.TreeMap;
 
 /**
- * <p>The LocalScheduler runs on the NodeManager and is modelled as an
+ * <p>The DistributedScheduler runs on the NodeManager and is modeled as an
  * <code>AMRMProxy</code> request interceptor. It is responsible for the
- * following :</p>
+ * following:</p>
  * <ul>
  *   <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
  *   response objects to extract instructions from the
- *   <code>ClusterManager</code> running on the ResourceManager to aid in making
- *   Scheduling scheduling decisions</li>
+ *   <code>ClusterMonitor</code> running on the ResourceManager to aid in making
+ *   distributed scheduling decisions.</li>
  *   <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
- *   containers for the opportunistic resource outstandingOpReqs</li>
+ *   containers for the outstanding OPPORTUNISTIC container requests.</li>
  * </ul>
  */
-public final class LocalScheduler extends AbstractRequestInterceptor {
+public final class DistributedScheduler extends AbstractRequestInterceptor {
 
   static class PartitionedResourceRequests {
     private List<ResourceRequest> guaranteed = new ArrayList<>();
@@ -93,7 +87,7 @@
     }
   }
 
-  static class DistSchedulerParams {
+  static class DistributedSchedulerParams {
     Resource maxResource;
     Resource minResource;
     Resource incrementResource;
@@ -101,18 +95,20 @@
   }
 
   private static final Logger LOG = LoggerFactory
-      .getLogger(LocalScheduler.class);
+      .getLogger(DistributedScheduler.class);
 
   private final static RecordFactory RECORD_FACTORY =
       RecordFactoryProvider.getRecordFactory(null);
 
-  // Currently just used to keep track of allocated Containers
-  // Can be used for reporting stats later
+  // Currently just used to keep track of allocated containers.
+  // Can be used for reporting stats later.
   private Set<ContainerId> containersAllocated = new HashSet<>();
 
-  private DistSchedulerParams appParams = new DistSchedulerParams();
-  private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
-      new OpportunisticContainerAllocator.ContainerIdCounter();
+  private DistributedSchedulerParams appParams =
+      new DistributedSchedulerParams();
+  private final OpportunisticContainerAllocator.ContainerIdCounter
+      containerIdCounter =
+          new OpportunisticContainerAllocator.ContainerIdCounter();
   private Map<String, NodeId> nodeList = new LinkedHashMap<>();
 
   // Mapping of NodeId to NodeTokens. Populated either from RM response or
@@ -123,7 +119,7 @@
   // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
   // Resource Name (Host/rack/any) and capability. This mapping is required
   // to match a received Container to an outstanding OPPORTUNISTIC
-  // ResourceRequests (ask)
+  // ResourceRequest (ask).
   final TreeMap<Priority, Map<Resource, ResourceRequest>>
       outstandingOpReqs = new TreeMap<>();
 
@@ -158,8 +154,8 @@
    * @param request
    *          registration request
    * @return Allocate Response
-   * @throws YarnException
-   * @throws IOException
+   * @throws YarnException YarnException
+   * @throws IOException IOException
    */
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster
@@ -177,14 +173,14 @@
    * @param request
    *          allocation request
    * @return Allocate Response
-   * @throws YarnException
-   * @throws IOException
+   * @throws YarnException YarnException
+   * @throws IOException IOException
    */
   @Override
   public AllocateResponse allocate(AllocateRequest request) throws
       YarnException, IOException {
-    DistSchedAllocateRequest distRequest =
-        RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class);
+    DistributedSchedulingAllocateRequest distRequest = RECORD_FACTORY
+        .newRecordInstance(DistributedSchedulingAllocateRequest.class);
     distRequest.setAllocateRequest(request);
     return allocateForDistributedScheduling(distRequest).getAllocateResponse();
   }
@@ -199,9 +195,6 @@
   /**
    * Check if we already have a NMToken. if Not, generate the Token and
    * add it to the response
-   * @param response
-   * @param nmTokens
-   * @param allocatedContainers
    */
   private void updateResponseWithNMTokens(AllocateResponse response,
       List<NMToken> nmTokens, List<Container> allocatedContainers) {
@@ -235,11 +228,11 @@
   }
 
   private void updateParameters(
-      DistSchedRegisterResponse registerResponse) {
-    appParams.minResource = registerResponse.getMinAllocatableCapabilty();
-    appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
+      RegisterDistributedSchedulingAMResponse registerResponse) {
+    appParams.minResource = registerResponse.getMinContainerResource();
+    appParams.maxResource = registerResponse.getMaxContainerResource();
     appParams.incrementResource =
-        registerResponse.getIncrAllocatableCapabilty();
+        registerResponse.getIncrContainerResource();
     if (appParams.incrementResource == null) {
       appParams.incrementResource = appParams.minResource;
     }
@@ -253,11 +246,12 @@
 
   /**
    * Takes a list of ResourceRequests (asks), extracts the key information viz.
-   * (Priority, ResourceName, Capability) and adds it the outstanding
+   * (Priority, ResourceName, Capability) and adds to the outstanding
    * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
    * the current YARN constraint that only a single ResourceRequest can exist at
-   * a give Priority and Capability
-   * @param resourceAsks
+   * a give Priority and Capability.
+   *
+   * @param resourceAsks the list with the {@link ResourceRequest}s
    */
   public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
     for (ResourceRequest request : resourceAsks) {
@@ -297,11 +291,9 @@
 
   /**
    * This method matches a returned list of Container Allocations to any
-   * outstanding OPPORTUNISTIC ResourceRequest
-   * @param capability
-   * @param allocatedContainers
+   * outstanding OPPORTUNISTIC ResourceRequest.
    */
-  public void matchAllocationToOutstandingRequest(Resource capability,
+  private void matchAllocationToOutstandingRequest(Resource capability,
       List<Container> allocatedContainers) {
     for (Container c : allocatedContainers) {
       containersAllocated.add(c.getId());
@@ -333,28 +325,29 @@
   }
 
   @Override
-  public DistSchedRegisterResponse
+  public RegisterDistributedSchedulingAMResponse
       registerApplicationMasterForDistributedScheduling(
           RegisterApplicationMasterRequest request)
-              throws YarnException, IOException {
+      throws YarnException, IOException {
     LOG.info("Forwarding registration request to the" +
         "Distributed Scheduler Service on YARN RM");
-    DistSchedRegisterResponse dsResp = getNextInterceptor()
+    RegisterDistributedSchedulingAMResponse dsResp = getNextInterceptor()
         .registerApplicationMasterForDistributedScheduling(request);
     updateParameters(dsResp);
     return dsResp;
   }
 
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling(
-      DistSchedAllocateRequest request) throws YarnException, IOException {
+  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Forwarding allocate request to the" +
           "Distributed Scheduler Service on YARN RM");
     }
     // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
-    PartitionedResourceRequests partitionedAsks = partitionAskList(
-        request.getAllocateRequest().getAskList());
+    PartitionedResourceRequests partitionedAsks =
+        partitionAskList(request.getAllocateRequest().getAskList());
 
     List<ContainerId> releasedContainers =
         request.getAllocateRequest().getReleaseList();
@@ -393,11 +386,12 @@
         allocatedContainers.addAll(e.getValue());
       }
     }
+
     request.setAllocatedContainers(allocatedContainers);
 
     // Send all the GUARANTEED Reqs to RM
     request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
-    DistSchedAllocateResponse dsResp =
+    DistributedSchedulingAllocateResponse dsResp =
         getNextInterceptor().allocateForDistributedScheduling(request);
 
     // Update host to nodeId mapping
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index 22a6a24..ce5bda0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -29,7 +29,7 @@
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -37,15 +37,17 @@
 
 import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * <p>The OpportunisticContainerAllocator allocates containers on a given list
- * of Nodes after it modifies the container sizes to within allowable limits
- * specified by the <code>ClusterManager</code> running on the RM. It tries to
- * distribute the containers as evenly as possible. It also uses the
- * <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for
- * the allocated containers</p>
+ * <p>
+ * The OpportunisticContainerAllocator allocates containers on a given list of
+ * nodes, after modifying the container sizes to respect the limits set by the
+ * ResourceManager. It tries to distribute the containers as evenly as possible.
+ * It also uses the <code>NMTokenSecretManagerInNM</code> to generate the
+ * required NM tokens for the allocated containers.
+ * </p>
  */
 public class OpportunisticContainerAllocator {
 
@@ -78,15 +80,15 @@
     this.webpagePort = webpagePort;
   }
 
-  public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
-      ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks,
-      Set<String> blacklist, ApplicationAttemptId appAttId,
-      Map<String, NodeId> allNodes, String userName) throws YarnException {
+  public Map<Resource, List<Container>> allocate(
+      DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+      Collection<ResourceRequest> resourceAsks, Set<String> blacklist,
+      ApplicationAttemptId appAttId, Map<String, NodeId> allNodes,
+      String userName) throws YarnException {
     Map<Resource, List<Container>> containers = new HashMap<>();
-    Set<String> nodesAllocated = new HashSet<>();
     for (ResourceRequest anyAsk : resourceAsks) {
       allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
-          allNodes, userName, containers, nodesAllocated, anyAsk);
+          allNodes, userName, containers, anyAsk);
       LOG.info("Opportunistic allocation requested for ["
           + "priority=" + anyAsk.getPriority()
           + ", num_containers=" + anyAsk.getNumContainers()
@@ -96,30 +98,30 @@
     return containers;
   }
 
-  private void allocateOpportunisticContainers(DistSchedulerParams appParams,
-      ContainerIdCounter idCounter, Set<String> blacklist,
-      ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName,
-      Map<Resource, List<Container>> containers, Set<String> nodesAllocated,
-      ResourceRequest anyAsk) throws YarnException {
+  private void allocateOpportunisticContainers(
+      DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+      Set<String> blacklist, ApplicationAttemptId id,
+      Map<String, NodeId> allNodes, String userName,
+      Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
+      throws YarnException {
     int toAllocate = anyAsk.getNumContainers()
-        - (containers.isEmpty() ?
-        0 : containers.get(anyAsk.getCapability()).size());
+        - (containers.isEmpty() ? 0 :
+            containers.get(anyAsk.getCapability()).size());
 
-    List<String> topKNodesLeft = new ArrayList<>();
-    for (String s : allNodes.keySet()) {
-      // Bias away from whatever we have already allocated and respect blacklist
-      if (nodesAllocated.contains(s) || blacklist.contains(s)) {
+    List<NodeId> nodesForScheduling = new ArrayList<>();
+    for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
+      // Do not use blacklisted nodes for scheduling.
+      if (blacklist.contains(nodeEntry.getKey())) {
         continue;
       }
-      topKNodesLeft.add(s);
+      nodesForScheduling.add(nodeEntry.getValue());
     }
     int numAllocated = 0;
-    int nextNodeToAllocate = 0;
+    int nextNodeToSchedule = 0;
     for (int numCont = 0; numCont < toAllocate; numCont++) {
-      String topNode = topKNodesLeft.get(nextNodeToAllocate);
-      nextNodeToAllocate++;
-      nextNodeToAllocate %= topKNodesLeft.size();
-      NodeId nodeId = allNodes.get(topNode);
+      nextNodeToSchedule++;
+      nextNodeToSchedule %= nodesForScheduling.size();
+      NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
       Container container = buildContainer(appParams, idCounter, anyAsk, id,
           userName, nodeId);
       List<Container> cList = containers.get(anyAsk.getCapability());
@@ -134,7 +136,7 @@
     LOG.info("Allocated " + numAllocated + " opportunistic containers.");
   }
 
-  private Container buildContainer(DistSchedulerParams appParams,
+  private Container buildContainer(DistributedSchedulerParams appParams,
       ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
       String userName, NodeId nodeId) throws YarnException {
     ContainerId cId =
@@ -165,7 +167,7 @@
     return container;
   }
 
-  private Resource normalizeCapability(DistSchedulerParams appParams,
+  private Resource normalizeCapability(DistributedSchedulerParams appParams,
       ResourceRequest ask) {
     return Resources.normalize(RESOURCE_CALCULATOR,
         ask.getCapability(), appParams.minResource, appParams.maxResource,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
similarity index 75%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
index 8de849b..b093b3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -19,34 +19,30 @@
 package org.apache.hadoop.yarn.server.nodemanager.scheduler;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
-    .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
-import org.apache.hadoop.yarn.server.nodemanager.security
-    .NMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.nodemanager.security
-    .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -63,27 +59,30 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class TestLocalScheduler {
+/**
+ * Test cases for {@link DistributedScheduler}.
+ */
+public class TestDistributedScheduler {
 
   @Test
-  public void testLocalScheduler() throws Exception {
+  public void testDistributedScheduler() throws Exception {
 
     Configuration conf = new Configuration();
-    LocalScheduler localScheduler = new LocalScheduler();
+    DistributedScheduler distributedScheduler = new DistributedScheduler();
 
-    RequestInterceptor finalReqIntcptr = setup(conf, localScheduler);
+    RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
 
-    registerAM(localScheduler, finalReqIntcptr, Arrays.asList(
+    registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
         NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
 
-    final AtomicBoolean flipFlag = new AtomicBoolean(false);
+    final AtomicBoolean flipFlag = new AtomicBoolean(true);
     Mockito.when(
         finalReqIntcptr.allocateForDistributedScheduling(
-            Mockito.any(DistSchedAllocateRequest.class)))
-        .thenAnswer(new Answer<DistSchedAllocateResponse>() {
+            Mockito.any(DistributedSchedulingAllocateRequest.class)))
+        .thenAnswer(new Answer<DistributedSchedulingAllocateResponse>() {
           @Override
-          public DistSchedAllocateResponse answer(InvocationOnMock
-              invocationOnMock) throws Throwable {
+          public DistributedSchedulingAllocateResponse answer(
+              InvocationOnMock invocationOnMock) throws Throwable {
             flipFlag.set(!flipFlag.get());
             if (flipFlag.get()) {
               return createAllocateResponse(Arrays.asList(
@@ -101,15 +100,15 @@
 
     ResourceRequest opportunisticReq =
         createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
+
     allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
 
     // Verify 4 containers were allocated
     AllocateResponse allocateResponse =
-        localScheduler.allocate(allocateRequest);
+        distributedScheduler.allocate(allocateRequest);
     Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
 
-    // Verify equal distribution on hosts a and b
-    // And None on c and d
+    // Verify equal distribution on hosts a and b, and none on c or d
     Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
     Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
     Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
@@ -123,18 +122,18 @@
     allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
 
     // Verify 6 containers were allocated
-    allocateResponse = localScheduler.allocate(allocateRequest);
+    allocateResponse = distributedScheduler.allocate(allocateRequest);
     Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
 
-    // Verify New containers are equally distribution on hosts c and d
-    // And None on a and b
+    // Verify new containers are equally distribution on hosts c and d,
+    // and none on a or b
     allocs = mapAllocs(allocateResponse, 6);
     Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
     Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
     Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
     Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
 
-    // Ensure the LocalScheduler respects the list order..
+    // Ensure the DistributedScheduler respects the list order..
     // The first request should be allocated to "d" since it is ranked higher
     // The second request should be allocated to "c" since the ranking is
     // flipped on every allocate response.
@@ -142,7 +141,7 @@
     opportunisticReq =
         createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
     allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
-    allocateResponse = localScheduler.allocate(allocateRequest);
+    allocateResponse = distributedScheduler.allocate(allocateRequest);
     allocs = mapAllocs(allocateResponse, 1);
     Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
 
@@ -150,7 +149,7 @@
     opportunisticReq =
         createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
     allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
-    allocateResponse = localScheduler.allocate(allocateRequest);
+    allocateResponse = distributedScheduler.allocate(allocateRequest);
     allocs = mapAllocs(allocateResponse, 1);
     Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
 
@@ -158,22 +157,23 @@
     opportunisticReq =
         createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
     allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
-    allocateResponse = localScheduler.allocate(allocateRequest);
+    allocateResponse = distributedScheduler.allocate(allocateRequest);
     allocs = mapAllocs(allocateResponse, 1);
     Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
   }
 
-  private void registerAM(LocalScheduler localScheduler, RequestInterceptor
-      finalReqIntcptr, List<NodeId> nodeList) throws Exception {
-    DistSchedRegisterResponse distSchedRegisterResponse =
-        Records.newRecord(DistSchedRegisterResponse.class);
+  private void registerAM(DistributedScheduler distributedScheduler,
+      RequestInterceptor finalReqIntcptr, List<NodeId> nodeList)
+      throws Exception {
+    RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
+        Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
     distSchedRegisterResponse.setRegisterResponse(
         Records.newRecord(RegisterApplicationMasterResponse.class));
     distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
     distSchedRegisterResponse.setContainerIdStart(0);
-    distSchedRegisterResponse.setMaxAllocatableCapabilty(
+    distSchedRegisterResponse.setMaxContainerResource(
         Resource.newInstance(1024, 4));
-    distSchedRegisterResponse.setMinAllocatableCapabilty(
+    distSchedRegisterResponse.setMinContainerResource(
         Resource.newInstance(512, 2));
     distSchedRegisterResponse.setNodesForScheduling(nodeList);
     Mockito.when(
@@ -181,12 +181,12 @@
             Mockito.any(RegisterApplicationMasterRequest.class)))
         .thenReturn(distSchedRegisterResponse);
 
-    localScheduler.registerApplicationMaster(
+    distributedScheduler.registerApplicationMaster(
         Records.newRecord(RegisterApplicationMasterRequest.class));
   }
 
-  private RequestInterceptor setup(Configuration conf, LocalScheduler
-      localScheduler) {
+  private RequestInterceptor setup(Configuration conf,
+      DistributedScheduler distributedScheduler) {
     NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
     Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
     Context context = Mockito.mock(Context.class);
@@ -215,12 +215,12 @@
     NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
         new NMTokenSecretManagerInNM();
     nmTokenSecretManagerInNM.setMasterKey(mKey);
-    localScheduler.initLocal(
+    distributedScheduler.initLocal(
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
         containerAllocator, nmTokenSecretManagerInNM, "test");
 
     RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
-    localScheduler.setNextInterceptor(finalReqIntcptr);
+    distributedScheduler.setNextInterceptor(finalReqIntcptr);
     return finalReqIntcptr;
   }
 
@@ -237,17 +237,18 @@
     return opportunisticReq;
   }
 
-  private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
-    DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
-        (DistSchedAllocateResponse.class);
-    distSchedAllocateResponse.setAllocateResponse(
-        Records.newRecord(AllocateResponse.class));
+  private DistributedSchedulingAllocateResponse createAllocateResponse(
+      List<NodeId> nodes) {
+    DistributedSchedulingAllocateResponse distSchedAllocateResponse =
+        Records.newRecord(DistributedSchedulingAllocateResponse.class);
+    distSchedAllocateResponse
+        .setAllocateResponse(Records.newRecord(AllocateResponse.class));
     distSchedAllocateResponse.setNodesForScheduling(nodes);
     return distSchedAllocateResponse;
   }
 
-  private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
-      allocateResponse, int expectedSize) throws Exception {
+  private Map<NodeId, List<ContainerId>> mapAllocs(
+      AllocateResponse allocateResponse, int expectedSize) throws Exception {
     Assert.assertEquals(expectedSize,
         allocateResponse.getAllocatedContainers().size());
     Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
@@ -266,5 +267,4 @@
     }
     return allocs;
   }
-
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
similarity index 81%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
index 5aabddc..843ac09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
@@ -27,15 +27,15 @@
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
 import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
 
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -73,18 +73,18 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * The DistributedSchedulingService is started instead of the
- * ApplicationMasterService if DistributedScheduling is enabled for the YARN
+ * The DistributedSchedulingAMService is started instead of the
+ * ApplicationMasterService if distributed scheduling is enabled for the YARN
  * cluster.
  * It extends the functionality of the ApplicationMasterService by servicing
  * clients (AMs and AMRMProxy request interceptors) that understand the
  * DistributedSchedulingProtocol.
  */
-public class DistributedSchedulingService extends ApplicationMasterService
-    implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
+public class DistributedSchedulingAMService extends ApplicationMasterService
+    implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> {
 
   private static final Log LOG =
-      LogFactory.getLog(DistributedSchedulingService.class);
+      LogFactory.getLog(DistributedSchedulingAMService.class);
 
   private final NodeQueueLoadMonitor nodeMonitor;
 
@@ -94,12 +94,12 @@
       new ConcurrentHashMap<>();
   private final int k;
 
-  public DistributedSchedulingService(RMContext rmContext,
-      YarnScheduler scheduler) {
-    super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+  public DistributedSchedulingAMService(RMContext rmContext,
+                                      YarnScheduler scheduler) {
+    super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler);
     this.k = rmContext.getYarnConfiguration().getInt(
-        YarnConfiguration.DIST_SCHEDULING_TOP_K,
-        YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
+        YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
+        YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
     long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
         YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
         YarnConfiguration.
@@ -149,7 +149,7 @@
   @Override
   public Server getServer(YarnRPC rpc, Configuration serverConf,
       InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
-    Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+    Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
         addr, serverConf, secretManager,
         serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
             YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
@@ -184,43 +184,45 @@
   }
 
   @Override
-  public DistSchedRegisterResponse
+  public RegisterDistributedSchedulingAMResponse
   registerApplicationMasterForDistributedScheduling(
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
     RegisterApplicationMasterResponse response =
         registerApplicationMaster(request);
-    DistSchedRegisterResponse dsResp = recordFactory
-        .newRecordInstance(DistSchedRegisterResponse.class);
+    RegisterDistributedSchedulingAMResponse dsResp = recordFactory
+        .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
     dsResp.setRegisterResponse(response);
-    dsResp.setMinAllocatableCapabilty(
+    dsResp.setMinContainerResource(
         Resource.newInstance(
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
+                YarnConfiguration.
+                    DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
         )
     );
-    dsResp.setMaxAllocatableCapabilty(
+    dsResp.setMaxContainerResource(
         Resource.newInstance(
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+                YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
         )
     );
-    dsResp.setIncrAllocatableCapabilty(
+    dsResp.setIncrContainerResource(
         Resource.newInstance(
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
+                YarnConfiguration.
+                    DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
         )
     );
     dsResp.setContainerTokenExpiryInterval(
@@ -238,8 +240,9 @@
   }
 
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling(
-      DistSchedAllocateRequest request) throws YarnException, IOException {
+  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+      DistributedSchedulingAllocateRequest request)
+      throws YarnException, IOException {
     List<Container> distAllocContainers = request.getAllocatedContainers();
     for (Container container : distAllocContainers) {
       // Create RMContainer
@@ -255,8 +258,8 @@
               RMContainerEventType.LAUNCHED));
     }
     AllocateResponse response = allocate(request.getAllocateRequest());
-    DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
-        (DistSchedAllocateResponse.class);
+    DistributedSchedulingAllocateResponse dsResp = recordFactory
+        .newRecordInstance(DistributedSchedulingAllocateResponse.class);
     dsResp.setAllocateResponse(response);
     dsResp.setNodesForScheduling(
         this.nodeMonitor.selectLeastLoadedNodes(this.k));
@@ -264,7 +267,7 @@
   }
 
   private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
-      String rackName, NodeId nodeId) {
+                            String rackName, NodeId nodeId) {
     if (rackName != null) {
       mapping.putIfAbsent(rackName, new HashSet<NodeId>());
       Set<NodeId> nodeIds = mapping.get(rackName);
@@ -275,7 +278,7 @@
   }
 
   private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
-      String rackName, NodeId nodeId) {
+                                 String rackName, NodeId nodeId) {
     if (rackName != null) {
       Set<NodeId> nodeIds = mapping.get(rackName);
       synchronized (nodeIds) {
@@ -346,7 +349,7 @@
       break;
     // <-- IGNORED EVENTS : END -->
     default:
-      LOG.error("Unknown event arrived at DistributedSchedulingService: "
+      LOG.error("Unknown event arrived at DistributedSchedulingAMService: "
           + event.toString());
     }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 0c1df33..4509045 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1180,12 +1180,12 @@
     if (this.rmContext.getYarnConfiguration().getBoolean(
         YarnConfiguration.DIST_SCHEDULING_ENABLED,
         YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      DistributedSchedulingService distributedSchedulingService = new
-          DistributedSchedulingService(this.rmContext, scheduler);
+      DistributedSchedulingAMService distributedSchedulingService = new
+          DistributedSchedulingAMService(this.rmContext, scheduler);
       EventDispatcher distSchedulerEventDispatcher =
           new EventDispatcher(distributedSchedulingService,
-              DistributedSchedulingService.class.getName());
-      // Add an event dispoatcher for the DistributedSchedulingService
+              DistributedSchedulingAMService.class.getName());
+      // Add an event dispatcher for the DistributedSchedulingAMService
       // to handle node updates/additions and removals.
       // Since the SchedulerEvent is currently a super set of theses,
       // we register interest for it..
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 3764664..c677345 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -44,6 +44,7 @@
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -93,8 +94,8 @@
     this.queue = queue;
     this.user = user;
     this.activeUsersManager = activeUsersManager;
-    this.containerIdCounter =
-        new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+    this.containerIdCounter = new AtomicLong(
+        epoch << ResourceManager.EPOCH_BIT_SHIFT);
     this.appResourceUsage = appResourceUsage;
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index dcdc934..7d1b3c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -822,7 +822,7 @@
     if (this.rmContext.getYarnConfiguration().getBoolean(
         YarnConfiguration.DIST_SCHEDULING_ENABLED,
         YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      return new DistributedSchedulingService(getRMContext(), scheduler) {
+      return new DistributedSchedulingAMService(getRMContext(), scheduler) {
         @Override
         protected void serviceStart() {
           // override to not start rpc handler
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
similarity index 75%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
index 4716bab..0213a94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
@@ -25,16 +25,11 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -44,17 +39,16 @@
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -63,12 +57,10 @@
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
-    .DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
-    .DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
-    .AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -78,9 +70,12 @@
 import java.util.Arrays;
 import java.util.List;
 
-public class TestDistributedSchedulingService {
+/**
+ * Test cases for {@link DistributedSchedulingAMService}.
+ */
+public class TestDistributedSchedulingAMService {
 
-  // Test if the DistributedSchedulingService can handle both DSProtocol as
+  // Test if the DistributedSchedulingAMService can handle both DSProtocol as
   // well as AMProtocol clients
   @Test
   public void testRPCWrapping() throws Exception {
@@ -116,7 +111,8 @@
             Resource.newInstance(1, 2), 1, true, "exp",
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true))));
-    DistributedSchedulingService service = createService(factory, rmContext, c);
+    DistributedSchedulingAMService service =
+        createService(factory, rmContext, c);
     Server server = service.getServer(rpc, conf, addr, null);
     server.start();
 
@@ -126,7 +122,7 @@
         ProtobufRpcEngine.class);
     ApplicationMasterProtocolPB ampProxy =
         RPC.getProxy(ApplicationMasterProtocolPB
-        .class, 1, NetUtils.getConnectAddress(server), conf);
+            .class, 1, NetUtils.getConnectAddress(server), conf);
     RegisterApplicationMasterResponse regResp =
         new RegisterApplicationMasterResponsePBImpl(
             ampProxy.registerApplicationMaster(null,
@@ -156,34 +152,34 @@
 
 
     // Verify that the DistrubutedSchedulingService can handle the
-    // DistributedSchedulerProtocol clients as well
-    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+    // DistributedSchedulingAMProtocol clients as well
+    RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
         ProtobufRpcEngine.class);
-    DistributedSchedulerProtocolPB dsProxy =
-        RPC.getProxy(DistributedSchedulerProtocolPB
+    DistributedSchedulingAMProtocolPB dsProxy =
+        RPC.getProxy(DistributedSchedulingAMProtocolPB
             .class, 1, NetUtils.getConnectAddress(server), conf);
 
-    DistSchedRegisterResponse dsRegResp =
-        new DistSchedRegisterResponsePBImpl(
+    RegisterDistributedSchedulingAMResponse dsRegResp =
+        new RegisterDistributedSchedulingAMResponsePBImpl(
             dsProxy.registerApplicationMasterForDistributedScheduling(null,
                 ((RegisterApplicationMasterRequestPBImpl)factory
                     .newRecordInstance(RegisterApplicationMasterRequest.class))
                     .getProto()));
     Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
     Assert.assertEquals(4,
-        dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
+        dsRegResp.getMaxContainerResource().getVirtualCores());
     Assert.assertEquals(1024,
-        dsRegResp.getMinAllocatableCapabilty().getMemorySize());
+        dsRegResp.getMinContainerResource().getMemorySize());
     Assert.assertEquals(2,
-        dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
+        dsRegResp.getIncrContainerResource().getVirtualCores());
 
-    DistSchedAllocateRequestPBImpl distAllReq =
-        (DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
-            DistSchedAllocateRequest.class);
+    DistributedSchedulingAllocateRequestPBImpl distAllReq =
+        (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
+            DistributedSchedulingAllocateRequest.class);
     distAllReq.setAllocateRequest(allReq);
     distAllReq.setAllocatedContainers(Arrays.asList(c));
-    DistSchedAllocateResponse dsAllocResp =
-        new DistSchedAllocateResponsePBImpl(
+    DistributedSchedulingAllocateResponse dsAllocResp =
+        new DistributedSchedulingAllocateResponsePBImpl(
             dsProxy.allocateForDistributedScheduling(null,
                 distAllReq.getProto()));
     Assert.assertEquals(
@@ -199,9 +195,9 @@
         false, dsfinishResp.getIsUnregistered());
   }
 
-  private DistributedSchedulingService createService(final RecordFactory
+  private DistributedSchedulingAMService createService(final RecordFactory
       factory, final RMContext rmContext, final Container c) {
-    return new DistributedSchedulingService(rmContext, null) {
+    return new DistributedSchedulingAMService(rmContext, null) {
       @Override
       public RegisterApplicationMasterResponse registerApplicationMaster(
           RegisterApplicationMasterRequest request) throws
@@ -235,22 +231,24 @@
       }
 
       @Override
-      public DistSchedRegisterResponse
+      public RegisterDistributedSchedulingAMResponse
           registerApplicationMasterForDistributedScheduling(
-          RegisterApplicationMasterRequest request) throws
-          YarnException, IOException {
-        DistSchedRegisterResponse resp = factory.newRecordInstance(
-            DistSchedRegisterResponse.class);
+          RegisterApplicationMasterRequest request)
+          throws YarnException, IOException {
+        RegisterDistributedSchedulingAMResponse resp = factory
+            .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
         resp.setContainerIdStart(54321L);
-        resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
-        resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
-        resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
+        resp.setMaxContainerResource(Resource.newInstance(4096, 4));
+        resp.setMinContainerResource(Resource.newInstance(1024, 1));
+        resp.setIncrContainerResource(Resource.newInstance(2048, 2));
         return resp;
       }
 
       @Override
-      public DistSchedAllocateResponse allocateForDistributedScheduling(
-          DistSchedAllocateRequest request) throws YarnException, IOException {
+      public DistributedSchedulingAllocateResponse
+          allocateForDistributedScheduling(
+          DistributedSchedulingAllocateRequest request)
+          throws YarnException, IOException {
         List<ResourceRequest> askList =
             request.getAllocateRequest().getAskList();
         List<Container> allocatedContainers = request.getAllocatedContainers();
@@ -260,8 +258,8 @@
         Assert.assertEquals(1, askList.size());
         Assert.assertTrue(askList.get(0)
             .getExecutionTypeRequest().getEnforceExecutionType());
-        DistSchedAllocateResponse resp =
-            factory.newRecordInstance(DistSchedAllocateResponse.class);
+        DistributedSchedulingAllocateResponse resp = factory
+            .newRecordInstance(DistributedSchedulingAllocateResponse.class);
         resp.setNodesForScheduling(
             Arrays.asList(NodeId.newInstance("h1", 1234)));
         return resp;