YARN-5113. Refactoring and other clean-up for distributed scheduling. (Konstantinos Karanasos via asuresh)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3bb73f5..6c921cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -301,53 +301,60 @@
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
- /** Is Distributed Scheduling Enabled. */
+ /** Setting that controls whether distributed scheduling is enabled or not. */
public static final String DIST_SCHEDULING_ENABLED =
YARN_PREFIX + "distributed-scheduling.enabled";
public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
- /** Mininum allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MIN_MEMORY =
- YARN_PREFIX + "distributed-scheduling.min-memory";
- public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
+ /** Minimum memory (in MB) used for allocating a container through distributed
+ * scheduling. */
+ public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.min-container-memory-mb";
+ public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512;
- /** Mininum allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MIN_VCORES =
- YARN_PREFIX + "distributed-scheduling.min-vcores";
- public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
+ /** Minimum virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES =
+ YARN_PREFIX + "distributed-scheduling.min-container-vcores";
+ public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1;
- /** Maximum allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MAX_MEMORY =
- YARN_PREFIX + "distributed-scheduling.max-memory";
- public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
+ /** Maximum memory (in MB) used for allocating a container through distributed
+ * scheduling. */
+ public static final String DIST_SCHEDULING_MAX_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.max-container-memory-mb";
+ public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048;
- /** Maximum allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MAX_VCORES =
- YARN_PREFIX + "distributed-scheduling.max-vcores";
- public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
+ /** Maximum virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES =
+ YARN_PREFIX + "distributed-scheduling.max-container-vcores";
+ public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;
- /** Incremental allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_INCR_MEMORY =
- YARN_PREFIX + "distributed-scheduling.incr-memory";
- public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
+ /** Incremental memory (in MB) used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb";
+ public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT =
+ 512;
- /** Incremental allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_INCR_VCORES =
+ /** Incremental virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES =
YARN_PREFIX + "distributed-scheduling.incr-vcores";
- public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
+ public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1;
- /** Container token expiry for container allocated via Distributed
- * Scheduling. */
+ /** Container token expiry for container allocated via distributed
+ * scheduling. */
public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
- YARN_PREFIX + "distributed-scheduling.container-token-expiry";
+ YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms";
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
600000;
- /** K least loaded nodes to be provided to the LocalScheduler of a
- * NodeManager for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_TOP_K =
- YARN_PREFIX + "distributed-scheduling.top-k";
- public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
+ /** Number of nodes to be used by the LocalScheduler of a NodeManager for
+ * dispatching containers during distributed scheduling. */
+ public static final String DIST_SCHEDULING_NODES_NUMBER_USED =
+ YARN_PREFIX + "distributed-scheduling.nodes-used";
+ public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10;
/** Frequency for computing least loaded NMs. */
public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
@@ -355,7 +362,7 @@
public static final long
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000;
- /** Comparator for determining Node Load for Distributed Scheduling. */
+ /** Comparator for determining node load for Distributed Scheduling. */
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
YARN_PREFIX + "nm-container-queuing.load-comparator";
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT =
@@ -378,13 +385,13 @@
YARN_PREFIX + "nm-container-queuing.max-queue-length";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
- /** Min wait time of container queue at NodeManager. */
+ /** Min queue wait time for a container at a NodeManager. */
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT =
1;
- /** Max wait time of container queue at NodeManager. */
+ /** Max queue wait time for a container queue at a NodeManager. */
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT =
@@ -1691,17 +1698,21 @@
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
+ /** The setting that controls whether AMRMProxy is enabled or not. */
public static final String AMRM_PROXY_ENABLED = NM_PREFIX
- + "amrmproxy.enable";
+ + "amrmproxy.enabled";
public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+
public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+ "amrmproxy.address";
public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+ DEFAULT_AMRM_PROXY_PORT;
+
public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+ "amrmproxy.client.thread-count";
public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+
public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index a4e5b0a..668821d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -124,41 +124,6 @@
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
- // Ignore Distributed Scheduling Related Configurations.
- // Since it is still a "work in progress" feature
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_ENABLED);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
- configurationPrefixToSkipCompare
- .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV);
-
// Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index c649071..71321e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -82,9 +82,9 @@
/**
* Validates End2End Distributed Scheduling flow which includes the AM
* specifying OPPORTUNISTIC containers in its resource requests,
- * the AMRMProxyService on the NM, the LocalScheduler RequestInterceptor on
- * the NM and the DistributedSchedulingProtocol used by the framework to talk
- * to the DistributedSchedulingService running on the RM.
+ * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
+ * on the NM and the DistributedSchedulingProtocol used by the framework to talk
+ * to the DistributedSchedulingAMService running on the RM.
*/
public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c8bc741..3ebdc99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2718,10 +2718,10 @@
<property>
<description>
- Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
- calls from the application masters to the resource manager.
+ Enable/Disable AMRMProxyService in the node manager. This service is used to
+ intercept calls from the application masters to the resource manager.
</description>
- <name>yarn.nodemanager.amrmproxy.enable</name>
+ <name>yarn.nodemanager.amrmproxy.enabled</name>
<value>false</value>
</property>
@@ -2743,8 +2743,9 @@
<property>
<description>
- The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
- AMRMProxyService to create the request processing pipeline for applications.
+ The comma separated list of class names that implement the
+ RequestInterceptor interface. This is used by the AMRMProxyService to create
+ the request processing pipeline for applications.
</description>
<name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
<value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
@@ -2752,6 +2753,141 @@
<property>
<description>
+ Setting that controls whether distributed scheduling is enabled.
+ </description>
+ <name>yarn.distributed-scheduling.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <description>
+ Minimum memory (in MB) used for allocating a container through distributed
+ scheduling.
+ </description>
+ <name>yarn.distributed-scheduling.min-container-memory-mb</name>
+ <value>512</value>
+ </property>
+
+ <property>
+ <description>
+ Minimum virtual CPU cores used for allocating a container through
+ distributed scheduling.
+ </description>
+ <name>yarn.distributed-scheduling.min-container-vcores</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <description>
+ Maximum memory (in MB) used for allocating a container through distributed
+ scheduling.
+ </description>
+ <name>yarn.distributed-scheduling.max-container-memory-mb</name>
+ <value>2048</value>
+ </property>
+
+ <property>
+ <description>
+ Maximum virtual CPU cores used for allocating a container through
+ distributed scheduling.
+ </description>
+ <name>yarn.distributed-scheduling.max-container-vcores</name>
+ <value>4</value>
+ </property>
+
+ <property>
+ <description>
+ Incremental memory (in MB) used for allocating a container through
+ distributed scheduling.
+ </description>
+ <name>yarn.distributed-scheduling.incr-container-memory-mb</name>
+ <value>512</value>
+ </property>
+
+ <property>
+ <description>
+ Incremental virtual CPU cores used for allocating a container through
+ distributed scheduling.
+ </description>
+ <name>yarn.distributed-scheduling.incr-vcores</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <description>
+ Container token expiry for container allocated via distributed scheduling.
+ </description>
+ <name>yarn.distributed-scheduling.container-token-expiry-ms</name>
+ <value>600000</value>
+ </property>
+
+ <property>
+ <description>
+ Number of nodes to be used by the LocalScheduler of a NodeManager for
+ dispatching containers during distributed scheduling.
+ </description>
+ <name>yarn.distributed-scheduling.nodes-used</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <description>
+ Frequency for computing least loaded NMs.
+ </description>
+ <name>yarn.nm-container-queuing.sorting-nodes-interval-ms</name>
+ <value>1000</value>
+ </property>
+
+ <property>
+ <description>
+ Comparator for determining node load for Distributed Scheduling.
+ </description>
+ <name>yarn.nm-container-queuing.load-comparator</name>
+ <value>QUEUE_LENGTH</value>
+ </property>
+
+ <property>
+ <description>
+ Value of standard deviation used for calculation of queue limit thresholds.
+ </description>
+ <name>yarn.nm-container-queuing.queue-limit-stdev</name>
+ <value>1.0f</value>
+ </property>
+
+ <property>
+ <description>
+ Min length of container queue at NodeManager.
+ </description>
+ <name>yarn.nm-container-queuing.min-queue-length</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <description>
+ Max length of container queue at NodeManager.
+ </description>
+ <name>yarn.nm-container-queuing.max-queue-length</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <description>
+ Min queue wait time for a container at a NodeManager.
+ </description>
+ <name>yarn.nm-container-queuing.min-queue-wait-time-ms</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <description>
+ Max queue wait time for a container queue at a NodeManager.
+ </description>
+ <name>yarn.nm-container-queuing.max-queue-wait-time-ms</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <description>
Error filename pattern, to identify the file in the container's
Log directory which contain the container's error log. As error file
redirection is done by client/AM and yarn will not be aware of the error
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index f8330e3..b9e10ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -144,7 +144,7 @@
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
- <include>distributed_scheduler_protocol.proto</include>
+ <include>distributed_scheduling_am_protocol.proto</include>
<include>yarn_server_common_protos.proto</include>
<include>yarn_server_common_service_protos.proto</include>
<include>yarn_server_common_service_protos.proto</include>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
deleted file mode 100644
index 26faa8f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.io.retry.Idempotent;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-import java.io.IOException;
-
-/**
- * <p>This protocol extends the <code>ApplicationMasterProtocol</code>. It is
- * used by the <code>LocalScheduler</code> running on the NodeManager to wrap
- * the request / response objects of the <code>registerApplicationMaster</code>
- * and <code>allocate</code> methods of the protocol with addition information
- * required to perform Distributed Scheduling.
- * </p>
- */
-public interface DistributedSchedulerProtocol
- extends ApplicationMasterProtocol {
-
- /**
- * <p> Extends the <code>registerApplicationMaster</code> to wrap the response
- * with additional metadata.</p>
- *
- * @param request ApplicationMaster registration request
- * @return A <code>DistSchedRegisterResponse</code> that contains a standard
- * AM registration response along with additional information required
- * for Distributed Scheduling
- * @throws YarnException
- * @throws IOException
- */
- @Public
- @Unstable
- @Idempotent
- DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
- RegisterApplicationMasterRequest request)
- throws YarnException, IOException;
-
- /**
- * <p> Extends the <code>allocate</code> to wrap the response with additional
- * metadata.</p>
- *
- * @param request ApplicationMaster allocate request
- * @return A <code>DistSchedAllocateResponse</code> that contains a standard
- * AM allocate response along with additional information required
- * for Distributed Scheduling
- * @throws YarnException
- * @throws IOException
- */
- @Public
- @Unstable
- @Idempotent
- DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException;
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocol.java
new file mode 100644
index 0000000..d1ed1fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocol.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * <p>
+ * This protocol extends the <code>ApplicationMasterProtocol</code>. It is used
+ * by the <code>DistributedScheduler</code> running on the NodeManager to wrap
+ * the request / response objects of the <code>registerApplicationMaster</code>
+ * and <code>allocate</code> methods of the protocol with additional information
+ * required to perform distributed scheduling.
+ * </p>
+ */
+public interface DistributedSchedulingAMProtocol
+ extends ApplicationMasterProtocol {
+
+ /**
+ * <p>
+ * Extends the <code>registerApplicationMaster</code> to wrap the response
+ * with additional metadata.
+ * </p>
+ *
+ * @param request
+ * ApplicationMaster registration request
+ * @return A <code>RegisterDistributedSchedulingAMResponse</code> that
+ * contains a standard AM registration response along with additional
+ * information required for distributed scheduling
+ * @throws YarnException YarnException
+ * @throws IOException IOException
+ */
+ @Public
+ @Unstable
+ @Idempotent
+ RegisterDistributedSchedulingAMResponse
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException;
+
+ /**
+ * <p>
+ * Extends the <code>allocate</code> to wrap the response with additional
+ * metadata.
+ * </p>
+ *
+ * @param request
+ * ApplicationMaster allocate request
+ * @return A <code>DistributedSchedulingAllocateResponse</code> that contains
+ * a standard AM allocate response along with additional information
+ * required for distributed scheduling
+ * @throws YarnException YarnException
+ * @throws IOException IOException
+ */
+ @Public
+ @Unstable
+ @Idempotent
+ DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocolPB.java
similarity index 77%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocolPB.java
index ce7911c..674d4e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulingAMProtocolPB.java
@@ -23,14 +23,15 @@
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
-import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol.DistributedSchedulerProtocolService;
+import org.apache.hadoop.yarn.proto.DistributedSchedulingAMProtocol.DistributedSchedulingAMProtocolService;
@Private
@Unstable
-@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB",
protocolVersion = 1)
-public interface DistributedSchedulerProtocolPB extends
- DistributedSchedulerProtocolService.BlockingInterface,
+public interface DistributedSchedulingAMProtocolPB extends
+ DistributedSchedulingAMProtocolService.BlockingInterface,
ApplicationMasterProtocolService.BlockingInterface,
- ApplicationMasterProtocolPB {
+ ApplicationMasterProtocolPB {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
index c23e27c..8555fc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
@@ -78,7 +78,7 @@
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
- } else if (protocol == DistributedSchedulerProtocol.class) {
+ } else if (protocol == DistributedSchedulingAMProtocol.class) {
return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulingAMProtocolPBClientImpl.java
similarity index 63%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulingAMProtocolPBClientImpl.java
index 0ca61df..66893b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulingAMProtocolPBClientImpl.java
@@ -23,51 +23,48 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-
-
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
-public class DistributedSchedulerProtocolPBClientImpl implements
- DistributedSchedulerProtocol, Closeable {
+/**
+ * Implementation of {@link DistributedSchedulingAMProtocol}, used when
+ * distributed scheduling is enabled.
+ */
+public class DistributedSchedulingAMProtocolPBClientImpl implements
+ DistributedSchedulingAMProtocol, Closeable {
- private DistributedSchedulerProtocolPB proxy;
+ private DistributedSchedulingAMProtocolPB proxy;
- public DistributedSchedulerProtocolPBClientImpl(long clientVersion,
- InetSocketAddress addr,
- Configuration conf) throws IOException {
- RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+ public DistributedSchedulingAMProtocolPBClientImpl(long clientVersion,
+ InetSocketAddress addr, Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
ProtobufRpcEngine.class);
- proxy = RPC.getProxy(DistributedSchedulerProtocolPB.class, clientVersion,
+ proxy = RPC.getProxy(DistributedSchedulingAMProtocolPB.class, clientVersion,
addr, conf);
}
@@ -79,14 +76,14 @@
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
- RegisterApplicationMasterRequest request)
- throws YarnException, IOException {
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
- return new DistSchedRegisterResponsePBImpl(
+ return new RegisterDistributedSchedulingAMResponsePBImpl(
proxy.registerApplicationMasterForDistributedScheduling(
null, requestProto));
} catch (ServiceException e) {
@@ -96,12 +93,14 @@
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
- YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto =
- ((DistSchedAllocateRequestPBImpl) request).getProto();
+ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
+ YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProto
+ requestProto =
+ ((DistributedSchedulingAllocateRequestPBImpl) request).getProto();
try {
- return new DistSchedAllocateResponsePBImpl(
+ return new DistributedSchedulingAllocateResponsePBImpl(
proxy.allocateForDistributedScheduling(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
@@ -110,9 +109,9 @@
}
@Override
- public RegisterApplicationMasterResponse registerApplicationMaster
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
@@ -125,9 +124,9 @@
}
@Override
- public FinishApplicationMasterResponse finishApplicationMaster
- (FinishApplicationMasterRequest request) throws YarnException,
- IOException {
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnException, IOException {
YarnServiceProtos.FinishApplicationMasterRequestProto requestProto =
((FinishApplicationMasterRequestPBImpl) request).getProto();
try {
@@ -140,8 +139,8 @@
}
@Override
- public AllocateResponse allocate(AllocateRequest request) throws
- YarnException, IOException {
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
YarnServiceProtos.AllocateRequestProto requestProto =
((AllocateRequestPBImpl) request).getProto();
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulingAMProtocolPBServiceImpl.java
similarity index 69%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulingAMProtocolPBServiceImpl.java
index 2763259..68cc077 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulingAMProtocolPBServiceImpl.java
@@ -20,24 +20,21 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
- .FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -47,27 +44,32 @@
import java.io.IOException;
-public class DistributedSchedulerProtocolPBServiceImpl implements
- DistributedSchedulerProtocolPB {
+/**
+ * Implementation of {@link DistributedSchedulingAMProtocolPB}.
+ */
+public class DistributedSchedulingAMProtocolPBServiceImpl implements
+ DistributedSchedulingAMProtocolPB {
- private DistributedSchedulerProtocol real;
+ private DistributedSchedulingAMProtocol real;
- public DistributedSchedulerProtocolPBServiceImpl(
- DistributedSchedulerProtocol impl) {
+ public DistributedSchedulingAMProtocolPBServiceImpl(
+ DistributedSchedulingAMProtocol impl) {
this.real = impl;
}
@Override
- public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
- registerApplicationMasterForDistributedScheduling(RpcController controller,
- RegisterApplicationMasterRequestProto proto) throws
- ServiceException {
+ public YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProto
+ registerApplicationMasterForDistributedScheduling(
+ RpcController controller, RegisterApplicationMasterRequestProto proto)
+ throws ServiceException {
RegisterApplicationMasterRequestPBImpl request = new
RegisterApplicationMasterRequestPBImpl(proto);
try {
- DistSchedRegisterResponse response =
+ RegisterDistributedSchedulingAMResponse response =
real.registerApplicationMasterForDistributedScheduling(request);
- return ((DistSchedRegisterResponsePBImpl) response).getProto();
+ return ((RegisterDistributedSchedulingAMResponsePBImpl) response)
+ .getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
@@ -76,16 +78,19 @@
}
@Override
- public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
- allocateForDistributedScheduling(RpcController controller,
- YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto)
+ public YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateResponseProto
+ allocateForDistributedScheduling(RpcController controller,
+ YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateRequestProto proto)
throws ServiceException {
- DistSchedAllocateRequestPBImpl request =
- new DistSchedAllocateRequestPBImpl(proto);
+ DistributedSchedulingAllocateRequestPBImpl request =
+ new DistributedSchedulingAllocateRequestPBImpl(proto);
try {
- DistSchedAllocateResponse response = real
+ DistributedSchedulingAllocateResponse response = real
.allocateForDistributedScheduling(request);
- return ((DistSchedAllocateResponsePBImpl) response).getProto();
+ return ((DistributedSchedulingAllocateResponsePBImpl) response)
+ .getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateRequest.java
similarity index 85%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateRequest.java
index 10ff95b..ac40592 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateRequest.java
@@ -26,12 +26,14 @@
import java.util.List;
/**
- * Request for a distributed scheduler to notify allocation of containers to
- * the Resource Manager.
+ * Object used by the Application Master when distributed scheduling is enabled,
+ * in order to forward the {@link AllocateRequest} for GUARANTEED containers to
+ * the Resource Manager, and to notify the Resource Manager about the allocation
+ * of OPPORTUNISTIC containers through the Distributed Scheduler.
*/
@Public
@Evolving
-public abstract class DistSchedAllocateRequest {
+public abstract class DistributedSchedulingAllocateRequest {
/**
* Get the underlying <code>AllocateRequest</code> object.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
similarity index 71%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
index 5f6e069..7a40449 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
@@ -26,16 +26,24 @@
import java.util.List;
+/**
+ * This is the response of the Resource Manager to the
+ * {@link DistributedSchedulingAllocateRequest}, when distributed scheduling is
+ * enabled. It includes the {@link AllocateResponse} for the GUARANTEED
+ * containers allocated by the Resource Manager. Moreover, it includes a list
+ * with the nodes that can be used by the Distributed Scheduler when allocating
+ * containers.
+ */
@Public
@Unstable
-public abstract class DistSchedAllocateResponse {
+public abstract class DistributedSchedulingAllocateResponse {
@Public
@Unstable
- public static DistSchedAllocateResponse newInstance(AllocateResponse
- allResp) {
- DistSchedAllocateResponse response =
- Records.newRecord(DistSchedAllocateResponse.class);
+ public static DistributedSchedulingAllocateResponse newInstance(
+ AllocateResponse allResp) {
+ DistributedSchedulingAllocateResponse response =
+ Records.newRecord(DistributedSchedulingAllocateResponse.class);
response.setAllocateResponse(allResp);
return response;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
similarity index 67%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
index e4e5138..a0a0e38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
@@ -20,24 +20,30 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
+/**
+ * This is the response to registering an Application Master when distributed
+ * scheduling is enabled. Apart from the
+ * {@link RegisterApplicationMasterResponse}, it includes various parameters
+ * to be used during distributed scheduling, such as the min and max resources
+ * that can be requested by containers.
+ */
@Public
@Unstable
-public abstract class DistSchedRegisterResponse {
+public abstract class RegisterDistributedSchedulingAMResponse {
@Public
@Unstable
- public static DistSchedRegisterResponse newInstance
+ public static RegisterDistributedSchedulingAMResponse newInstance
(RegisterApplicationMasterResponse regAMResp) {
- DistSchedRegisterResponse response =
- Records.newRecord(DistSchedRegisterResponse.class);
+ RegisterDistributedSchedulingAMResponse response =
+ Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
response.setRegisterResponse(regAMResp);
return response;
}
@@ -53,27 +59,27 @@
@Public
@Unstable
- public abstract void setMinAllocatableCapabilty(Resource minResource);
+ public abstract void setMinContainerResource(Resource minResource);
@Public
@Unstable
- public abstract Resource getMinAllocatableCapabilty();
+ public abstract Resource getMinContainerResource();
@Public
@Unstable
- public abstract void setMaxAllocatableCapabilty(Resource maxResource);
+ public abstract void setMaxContainerResource(Resource maxResource);
@Public
@Unstable
- public abstract Resource getMaxAllocatableCapabilty();
+ public abstract Resource getMaxContainerResource();
@Public
@Unstable
- public abstract void setIncrAllocatableCapabilty(Resource maxResource);
+ public abstract void setIncrContainerResource(Resource maxResource);
@Public
@Unstable
- public abstract Resource getIncrAllocatableCapabilty();
+ public abstract Resource getIncrContainerResource();
@Public
@Unstable
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
deleted file mode 100644
index eec62da..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-
-import com.google.protobuf.TextFormat;
-
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.server.api.protocolrecords
- .DistSchedRegisterResponse;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto =
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance();
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null;
- boolean viaProto = false;
-
- private Resource maxAllocatableCapability;
- private Resource minAllocatableCapability;
- private Resource incrAllocatableCapability;
- private List<NodeId> nodesForScheduling;
- private RegisterApplicationMasterResponse registerApplicationMasterResponse;
-
- public DistSchedRegisterResponsePBImpl() {
- builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder();
- }
-
- public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- private void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto);
- }
- viaProto = false;
- }
-
- private synchronized void mergeLocalToProto() {
- if (viaProto)
- maybeInitBuilder();
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
-
- private synchronized void mergeLocalToBuilder() {
- if (this.nodesForScheduling != null) {
- builder.clearNodesForScheduling();
- Iterable<YarnProtos.NodeIdProto> iterable =
- getNodeIdProtoIterable(this.nodesForScheduling);
- builder.addAllNodesForScheduling(iterable);
- }
- if (this.maxAllocatableCapability != null) {
- builder.setMaxAllocCapability(
- ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
- }
- if (this.minAllocatableCapability != null) {
- builder.setMinAllocCapability(
- ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
- }
- if (this.incrAllocatableCapability != null) {
- builder.setIncrAllocCapability(
- ProtoUtils.convertToProtoFormat(this.incrAllocatableCapability));
- }
- if (this.registerApplicationMasterResponse != null) {
- builder.setRegisterResponse(
- ((RegisterApplicationMasterResponsePBImpl)
- this.registerApplicationMasterResponse).getProto());
- }
- }
-
- @Override
- public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
- maybeInitBuilder();
- if(registerApplicationMasterResponse == null) {
- builder.clearRegisterResponse();
- }
- this.registerApplicationMasterResponse = resp;
- }
-
- @Override
- public RegisterApplicationMasterResponse getRegisterResponse() {
- if (this.registerApplicationMasterResponse != null) {
- return this.registerApplicationMasterResponse;
- }
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasRegisterResponse()) {
- return null;
- }
-
- this.registerApplicationMasterResponse =
- new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
- return this.registerApplicationMasterResponse;
- }
-
- @Override
- public void setMaxAllocatableCapabilty(Resource maxResource) {
- maybeInitBuilder();
- if(maxAllocatableCapability == null) {
- builder.clearMaxAllocCapability();
- }
- this.maxAllocatableCapability = maxResource;
- }
-
- @Override
- public Resource getMaxAllocatableCapabilty() {
- if (this.maxAllocatableCapability != null) {
- return this.maxAllocatableCapability;
- }
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasMaxAllocCapability()) {
- return null;
- }
-
- this.maxAllocatableCapability =
- ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability());
- return this.maxAllocatableCapability;
- }
-
- @Override
- public void setMinAllocatableCapabilty(Resource minResource) {
- maybeInitBuilder();
- if(minAllocatableCapability == null) {
- builder.clearMinAllocCapability();
- }
- this.minAllocatableCapability = minResource;
- }
-
- @Override
- public Resource getMinAllocatableCapabilty() {
- if (this.minAllocatableCapability != null) {
- return this.minAllocatableCapability;
- }
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasMinAllocCapability()) {
- return null;
- }
-
- this.minAllocatableCapability =
- ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability());
- return this.minAllocatableCapability;
- }
-
- @Override
- public void setIncrAllocatableCapabilty(Resource incrResource) {
- maybeInitBuilder();
- if(incrAllocatableCapability == null) {
- builder.clearIncrAllocCapability();
- }
- this.incrAllocatableCapability = incrResource;
- }
-
- @Override
- public Resource getIncrAllocatableCapabilty() {
- if (this.incrAllocatableCapability != null) {
- return this.incrAllocatableCapability;
- }
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasIncrAllocCapability()) {
- return null;
- }
-
- this.incrAllocatableCapability =
- ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability());
- return this.incrAllocatableCapability;
- }
-
- @Override
- public void setContainerTokenExpiryInterval(int interval) {
- maybeInitBuilder();
- builder.setContainerTokenExpiryInterval(interval);
- }
-
- @Override
- public int getContainerTokenExpiryInterval() {
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasContainerTokenExpiryInterval()) {
- return 0;
- }
- return p.getContainerTokenExpiryInterval();
- }
-
- @Override
- public void setContainerIdStart(long containerIdStart) {
- maybeInitBuilder();
- builder.setContainerIdStart(containerIdStart);
- }
-
- @Override
- public long getContainerIdStart() {
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasContainerIdStart()) {
- return 0;
- }
- return p.getContainerIdStart();
- }
-
-
- @Override
- public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
- maybeInitBuilder();
- if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
- if (this.nodesForScheduling != null) {
- this.nodesForScheduling.clear();
- }
- builder.clearNodesForScheduling();
- return;
- }
- this.nodesForScheduling = new ArrayList<>();
- this.nodesForScheduling.addAll(nodesForScheduling);
- }
-
- @Override
- public List<NodeId> getNodesForScheduling() {
- if (nodesForScheduling != null) {
- return nodesForScheduling;
- }
- initLocalNodesForSchedulingList();
- return nodesForScheduling;
- }
-
- private synchronized void initLocalNodesForSchedulingList() {
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
- nodesForScheduling = new ArrayList<>();
- if (list != null) {
- for (YarnProtos.NodeIdProto t : list) {
- nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
- }
- }
- }
- private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
- final List<NodeId> nodeList) {
- maybeInitBuilder();
- return new Iterable<YarnProtos.NodeIdProto>() {
- @Override
- public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
- return new Iterator<YarnProtos.NodeIdProto>() {
-
- Iterator<NodeId> iter = nodeList.iterator();
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public YarnProtos.NodeIdProto next() {
- return ProtoUtils.convertToProtoFormat(iter.next());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- @Override
- public String toString() {
- return TextFormat.shortDebugString(getProto());
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateRequestPBImpl.java
similarity index 82%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateRequestPBImpl.java
index be386b6..d99c85e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateRequestPBImpl.java
@@ -26,39 +26,40 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistributedSchedulingAllocateRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import java.util.Iterator;
import java.util.List;
/**
- * Implementation of {@link DistSchedAllocateRequest} for a distributed
- * scheduler to notify about the allocation of containers to the Resource
- * Manager.
+ * Implementation of {@link DistributedSchedulingAllocateRequest}.
*/
-public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
- private DistSchedAllocateRequestProto.Builder builder = null;
+public class DistributedSchedulingAllocateRequestPBImpl
+ extends DistributedSchedulingAllocateRequest {
+ private DistributedSchedulingAllocateRequestProto.Builder builder = null;
private boolean viaProto = false;
- private DistSchedAllocateRequestProto proto;
+ private DistributedSchedulingAllocateRequestProto proto;
private AllocateRequest allocateRequest;
private List<Container> containers;
- public DistSchedAllocateRequestPBImpl() {
- builder = DistSchedAllocateRequestProto.newBuilder();
+ public DistributedSchedulingAllocateRequestPBImpl() {
+ builder = DistributedSchedulingAllocateRequestProto.newBuilder();
}
- public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) {
+ public DistributedSchedulingAllocateRequestPBImpl(
+ DistributedSchedulingAllocateRequestProto proto) {
this.proto = proto;
this.viaProto = true;
}
@Override
public AllocateRequest getAllocateRequest() {
- DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ DistributedSchedulingAllocateRequestProtoOrBuilder p =
+ viaProto ? proto : builder;
if (this.allocateRequest != null) {
return this.allocateRequest;
}
@@ -88,7 +89,8 @@
}
private void initAllocatedContainers() {
- DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ DistributedSchedulingAllocateRequestProtoOrBuilder p =
+ viaProto ? proto : builder;
List<ContainerProto> list = p.getAllocatedContainersList();
this.containers = new ArrayList<Container>();
for (ContainerProto c : list) {
@@ -110,7 +112,7 @@
this.containers.addAll(pContainers);
}
- public DistSchedAllocateRequestProto getProto() {
+ public DistributedSchedulingAllocateRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
@@ -119,7 +121,7 @@
private void maybeInitBuilder() {
if (viaProto || builder == null) {
- builder = DistSchedAllocateRequestProto.newBuilder(proto);
+ builder = DistributedSchedulingAllocateRequestProto.newBuilder(proto);
}
viaProto = false;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
similarity index 70%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
index 3ea4965..18d5073 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
@@ -20,41 +20,47 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos;
-import org.apache.hadoop.yarn.server.api.protocolrecords
- .DistSchedAllocateResponse;
-
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
+/**
+ * Implementation of {@link DistributedSchedulingAllocateResponse}.
+ */
+public class DistributedSchedulingAllocateResponsePBImpl extends
+ DistributedSchedulingAllocateResponse {
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto =
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance();
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null;
+ YarnServerCommonServiceProtos.DistributedSchedulingAllocateResponseProto
+ proto = YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateResponseProto.getDefaultInstance();
+ YarnServerCommonServiceProtos.DistributedSchedulingAllocateResponseProto.
+ Builder builder = null;
boolean viaProto = false;
private AllocateResponse allocateResponse;
private List<NodeId> nodesForScheduling;
- public DistSchedAllocateResponsePBImpl() {
- builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder();
+ public DistributedSchedulingAllocateResponsePBImpl() {
+ builder = YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateResponseProto.newBuilder();
}
- public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) {
+ public DistributedSchedulingAllocateResponsePBImpl(
+ YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateResponseProto proto) {
this.proto = proto;
viaProto = true;
}
- public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() {
+ public YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
@@ -63,7 +69,8 @@
private void maybeInitBuilder() {
if (viaProto || builder == null) {
- builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto);
+ builder = YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateResponseProto.newBuilder(proto);
}
viaProto = false;
}
@@ -79,19 +86,20 @@
private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling();
- Iterable<YarnProtos.NodeIdProto> iterable =
- getNodeIdProtoIterable(this.nodesForScheduling);
+ Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
+ this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable);
}
if (this.allocateResponse != null) {
builder.setAllocateResponse(
- ((AllocateResponsePBImpl)this.allocateResponse).getProto());
+ ((AllocateResponsePBImpl) this.allocateResponse).getProto());
}
}
+
@Override
public void setAllocateResponse(AllocateResponse response) {
maybeInitBuilder();
- if(allocateResponse == null) {
+ if (allocateResponse == null) {
builder.clearAllocateResponse();
}
this.allocateResponse = response;
@@ -103,14 +111,14 @@
return this.allocateResponse;
}
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
- viaProto ? proto : builder;
+ YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
if (!p.hasAllocateResponse()) {
return null;
}
- this.allocateResponse =
- new AllocateResponsePBImpl(p.getAllocateResponse());
+ this.allocateResponse = new AllocateResponsePBImpl(p.getAllocateResponse());
return this.allocateResponse;
}
@@ -138,8 +146,9 @@
}
private synchronized void initLocalNodesForSchedulingList() {
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
- viaProto ? proto : builder;
+ YarnServerCommonServiceProtos.
+ DistributedSchedulingAllocateResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>();
if (list != null) {
@@ -148,6 +157,7 @@
}
}
}
+
private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
final List<NodeId> nodeList) {
maybeInitBuilder();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java
new file mode 100644
index 0000000..4aaf99c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implementation of {@link RegisterDistributedSchedulingAMResponse}.
+ */
+public class RegisterDistributedSchedulingAMResponsePBImpl extends
+ RegisterDistributedSchedulingAMResponse {
+
+ YarnServerCommonServiceProtos.RegisterDistributedSchedulingAMResponseProto
+ proto =
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProto
+ .getDefaultInstance();
+ YarnServerCommonServiceProtos.RegisterDistributedSchedulingAMResponseProto.
+ Builder builder = null;
+ boolean viaProto = false;
+
+ private Resource maxContainerResource;
+ private Resource minContainerResource;
+ private Resource incrContainerResource;
+ private List<NodeId> nodesForScheduling;
+ private RegisterApplicationMasterResponse registerApplicationMasterResponse;
+
+ public RegisterDistributedSchedulingAMResponsePBImpl() {
+ builder = YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProto.newBuilder();
+ }
+
+ public RegisterDistributedSchedulingAMResponsePBImpl(
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProto
+ getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ if (this.nodesForScheduling != null) {
+ builder.clearNodesForScheduling();
+ Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
+ this.nodesForScheduling);
+ builder.addAllNodesForScheduling(iterable);
+ }
+ if (this.maxContainerResource != null) {
+ builder.setMaxContainerResource(ProtoUtils.convertToProtoFormat(
+ this.maxContainerResource));
+ }
+ if (this.minContainerResource != null) {
+ builder.setMinContainerResource(ProtoUtils.convertToProtoFormat(
+ this.minContainerResource));
+ }
+ if (this.incrContainerResource != null) {
+ builder.setIncrContainerResource(
+ ProtoUtils.convertToProtoFormat(this.incrContainerResource));
+ }
+ if (this.registerApplicationMasterResponse != null) {
+ builder.setRegisterResponse(
+ ((RegisterApplicationMasterResponsePBImpl)
+ this.registerApplicationMasterResponse).getProto());
+ }
+ }
+
+ @Override
+ public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
+ maybeInitBuilder();
+ if (registerApplicationMasterResponse == null) {
+ builder.clearRegisterResponse();
+ }
+ this.registerApplicationMasterResponse = resp;
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse getRegisterResponse() {
+ if (this.registerApplicationMasterResponse != null) {
+ return this.registerApplicationMasterResponse;
+ }
+
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasRegisterResponse()) {
+ return null;
+ }
+
+ this.registerApplicationMasterResponse =
+ new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
+ return this.registerApplicationMasterResponse;
+ }
+
+ @Override
+ public void setMaxContainerResource(Resource maxResource) {
+ maybeInitBuilder();
+ if (maxContainerResource == null) {
+ builder.clearMaxContainerResource();
+ }
+ this.maxContainerResource = maxResource;
+ }
+
+ @Override
+ public Resource getMaxContainerResource() {
+ if (this.maxContainerResource != null) {
+ return this.maxContainerResource;
+ }
+
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasMaxContainerResource()) {
+ return null;
+ }
+
+ this.maxContainerResource = ProtoUtils.convertFromProtoFormat(p
+ .getMaxContainerResource());
+ return this.maxContainerResource;
+ }
+
+ @Override
+ public void setMinContainerResource(Resource minResource) {
+ maybeInitBuilder();
+ if (minContainerResource == null) {
+ builder.clearMinContainerResource();
+ }
+ this.minContainerResource = minResource;
+ }
+
+ @Override
+ public Resource getMinContainerResource() {
+ if (this.minContainerResource != null) {
+ return this.minContainerResource;
+ }
+
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasMinContainerResource()) {
+ return null;
+ }
+
+ this.minContainerResource = ProtoUtils.convertFromProtoFormat(p
+ .getMinContainerResource());
+ return this.minContainerResource;
+ }
+
+ @Override
+ public void setIncrContainerResource(Resource incrResource) {
+ maybeInitBuilder();
+ if (incrContainerResource == null) {
+ builder.clearIncrContainerResource();
+ }
+ this.incrContainerResource = incrResource;
+ }
+
+ @Override
+ public Resource getIncrContainerResource() {
+ if (this.incrContainerResource != null) {
+ return this.incrContainerResource;
+ }
+
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasIncrContainerResource()) {
+ return null;
+ }
+
+ this.incrContainerResource = ProtoUtils.convertFromProtoFormat(p
+ .getIncrContainerResource());
+ return this.incrContainerResource;
+ }
+
+ @Override
+ public void setContainerTokenExpiryInterval(int interval) {
+ maybeInitBuilder();
+ builder.setContainerTokenExpiryInterval(interval);
+ }
+
+ @Override
+ public int getContainerTokenExpiryInterval() {
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasContainerTokenExpiryInterval()) {
+ return 0;
+ }
+ return p.getContainerTokenExpiryInterval();
+ }
+
+ @Override
+ public void setContainerIdStart(long containerIdStart) {
+ maybeInitBuilder();
+ builder.setContainerIdStart(containerIdStart);
+ }
+
+ @Override
+ public long getContainerIdStart() {
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasContainerIdStart()) {
+ return 0;
+ }
+ return p.getContainerIdStart();
+ }
+
+ @Override
+ public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+ maybeInitBuilder();
+ if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+ if (this.nodesForScheduling != null) {
+ this.nodesForScheduling.clear();
+ }
+ builder.clearNodesForScheduling();
+ return;
+ }
+ this.nodesForScheduling = new ArrayList<>();
+ this.nodesForScheduling.addAll(nodesForScheduling);
+ }
+
+ @Override
+ public List<NodeId> getNodesForScheduling() {
+ if (nodesForScheduling != null) {
+ return nodesForScheduling;
+ }
+ initLocalNodesForSchedulingList();
+ return nodesForScheduling;
+ }
+
+ private synchronized void initLocalNodesForSchedulingList() {
+ YarnServerCommonServiceProtos.
+ RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+ nodesForScheduling = new ArrayList<>();
+ if (list != null) {
+ for (YarnProtos.NodeIdProto t : list) {
+ nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+ }
+ }
+ }
+
+ private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
+ final List<NodeId> nodeList) {
+ maybeInitBuilder();
+ return new Iterable<YarnProtos.NodeIdProto>() {
+ @Override
+ public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
+ return new Iterator<YarnProtos.NodeIdProto>() {
+
+ Iterator<NodeId> iter = nodeList.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public YarnProtos.NodeIdProto next() {
+ return ProtoUtils.convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduling_am_protocol.proto
similarity index 80%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduling_am_protocol.proto
index 818eb4a..274eaa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduling_am_protocol.proto
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+
/**
* These .proto interfaces are public and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
@@ -23,7 +24,7 @@
*/
option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "DistributedSchedulerProtocol";
+option java_outer_classname = "DistributedSchedulingAMProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
@@ -31,9 +32,8 @@
import "yarn_service_protos.proto";
import "yarn_server_common_service_protos.proto";
-
-service DistributedSchedulerProtocolService {
- rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
+service DistributedSchedulingAMProtocolService {
+ rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (RegisterDistributedSchedulingAMResponseProto);
+ rpc allocateForDistributedScheduling (DistributedSchedulingAllocateRequestProto) returns (DistributedSchedulingAllocateResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
- rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 3660252..55ac875 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -26,22 +26,22 @@
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
-message DistSchedRegisterResponseProto {
+message RegisterDistributedSchedulingAMResponseProto {
optional RegisterApplicationMasterResponseProto register_response = 1;
- optional ResourceProto max_alloc_capability = 2;
- optional ResourceProto min_alloc_capability = 3;
- optional ResourceProto incr_alloc_capability = 4;
+ optional ResourceProto max_container_resource = 2;
+ optional ResourceProto min_container_resource = 3;
+ optional ResourceProto incr_container_resource = 4;
optional int32 container_token_expiry_interval = 5;
optional int64 container_id_start = 6;
repeated NodeIdProto nodes_for_scheduling = 7;
}
-message DistSchedAllocateResponseProto {
+message DistributedSchedulingAllocateResponseProto {
optional AllocateResponseProto allocate_response = 1;
repeated NodeIdProto nodes_for_scheduling = 2;
}
-message DistSchedAllocateRequestProto {
+message DistributedSchedulingAllocateRequestProto {
optional AllocateRequestProto allocate_request = 1;
repeated ContainerProto allocated_containers = 2;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index ac360f4..511db16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -65,7 +65,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
@@ -467,10 +467,9 @@
interceptorClassNames.add(item.trim());
}
- // Make sure LocalScheduler is present at the beginning
- // of the chain..
+ // Make sure DistributedScheduler is present at the beginning of the chain.
if (this.nmContext.isDistributedSchedulingEnabled()) {
- interceptorClassNames.add(0, LocalScheduler.class.getName());
+ interceptorClassNames.add(0, DistributedScheduler.class.getName());
}
return interceptorClassNames;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 55c65f4..e6c9bbd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -21,12 +21,11 @@
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import java.io.IOException;
@@ -118,8 +117,9 @@
* @throws IOException
*/
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
+ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
return (this.nextInterceptor != null) ?
this.nextInterceptor.allocateForDistributedScheduling(request) : null;
}
@@ -134,10 +134,10 @@
* @throws IOException
*/
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
- RegisterApplicationMasterRequest request)
- throws YarnException, IOException {
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
return (this.nextInterceptor != null) ? this.nextInterceptor
.registerApplicationMasterForDistributedScheduling(request) : null;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index debff76..75fe022 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -43,12 +43,11 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
-import org.apache.hadoop.yarn.server.api.protocolrecords
- .DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +62,7 @@
AbstractRequestInterceptor {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultRequestInterceptor.class);
- private DistributedSchedulerProtocol rmClient;
+ private DistributedSchedulingAMProtocol rmClient;
private UserGroupInformation user = null;
@Override
@@ -77,13 +76,13 @@
user.addToken(appContext.getAMRMToken());
final Configuration conf = this.getConf();
- rmClient =
- user.doAs(new PrivilegedExceptionAction<DistributedSchedulerProtocol>() {
+ rmClient = user.doAs(
+ new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
@Override
- public DistributedSchedulerProtocol run() throws Exception {
+ public DistributedSchedulingAMProtocol run() throws Exception {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf,
- DistributedSchedulerProtocol.class);
+ DistributedSchedulingAMProtocol.class);
}
});
} catch (IOException e) {
@@ -124,7 +123,7 @@
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
@@ -134,13 +133,14 @@
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
+ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
}
- DistSchedAllocateResponse allocateResponse =
+ DistributedSchedulingAllocateResponse allocateResponse =
rmClient.allocateForDistributedScheduling(request);
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
@@ -180,10 +180,10 @@
@VisibleForTesting
public void setRMClient(final ApplicationMasterProtocol rmClient) {
- if (rmClient instanceof DistributedSchedulerProtocol) {
- this.rmClient = (DistributedSchedulerProtocol)rmClient;
+ if (rmClient instanceof DistributedSchedulingAMProtocol) {
+ this.rmClient = (DistributedSchedulingAMProtocol)rmClient;
} else {
- this.rmClient = new DistributedSchedulerProtocol() {
+ this.rmClient = new DistributedSchedulingAMProtocol() {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster
(RegisterApplicationMasterRequest request) throws YarnException,
@@ -205,7 +205,7 @@
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
@@ -213,8 +213,9 @@
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request)
+ public DistributedSchedulingAllocateResponse
+ allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
throw new IOException("Not Supported !!");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
index 7a73563..5995af1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
@@ -19,14 +19,14 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
/**
* Defines the contract to be implemented by the request intercepter classes,
* that can be used to intercept and inspect messages sent from the application
* master to the resource manager.
*/
-public interface RequestInterceptor extends DistributedSchedulerProtocol,
+public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
Configurable {
/**
* This method is called for initializing the intercepter. This is guaranteed
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
similarity index 82%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index ec0e8a4..bfb12ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -23,17 +23,13 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
- .FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
- .FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -46,14 +42,12 @@
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.nodemanager.amrmproxy
- .AMRMProxyApplicationContext;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
-import org.apache.hadoop.yarn.server.nodemanager.security
- .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,19 +62,19 @@
import java.util.TreeMap;
/**
- * <p>The LocalScheduler runs on the NodeManager and is modelled as an
+ * <p>The DistributedScheduler runs on the NodeManager and is modeled as an
* <code>AMRMProxy</code> request interceptor. It is responsible for the
- * following :</p>
+ * following:</p>
* <ul>
* <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
* response objects to extract instructions from the
- * <code>ClusterManager</code> running on the ResourceManager to aid in making
- * Scheduling scheduling decisions</li>
+ * <code>ClusterMonitor</code> running on the ResourceManager to aid in making
+ * distributed scheduling decisions.</li>
* <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
- * containers for the opportunistic resource outstandingOpReqs</li>
+ * containers for the outstanding OPPORTUNISTIC container requests.</li>
* </ul>
*/
-public final class LocalScheduler extends AbstractRequestInterceptor {
+public final class DistributedScheduler extends AbstractRequestInterceptor {
static class PartitionedResourceRequests {
private List<ResourceRequest> guaranteed = new ArrayList<>();
@@ -93,7 +87,7 @@
}
}
- static class DistSchedulerParams {
+ static class DistributedSchedulerParams {
Resource maxResource;
Resource minResource;
Resource incrementResource;
@@ -101,18 +95,20 @@
}
private static final Logger LOG = LoggerFactory
- .getLogger(LocalScheduler.class);
+ .getLogger(DistributedScheduler.class);
private final static RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
- // Currently just used to keep track of allocated Containers
- // Can be used for reporting stats later
+ // Currently just used to keep track of allocated containers.
+ // Can be used for reporting stats later.
private Set<ContainerId> containersAllocated = new HashSet<>();
- private DistSchedulerParams appParams = new DistSchedulerParams();
- private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
- new OpportunisticContainerAllocator.ContainerIdCounter();
+ private DistributedSchedulerParams appParams =
+ new DistributedSchedulerParams();
+ private final OpportunisticContainerAllocator.ContainerIdCounter
+ containerIdCounter =
+ new OpportunisticContainerAllocator.ContainerIdCounter();
private Map<String, NodeId> nodeList = new LinkedHashMap<>();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
@@ -123,7 +119,7 @@
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
// Resource Name (Host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
- // ResourceRequests (ask)
+ // ResourceRequest (ask).
final TreeMap<Priority, Map<Resource, ResourceRequest>>
outstandingOpReqs = new TreeMap<>();
@@ -158,8 +154,8 @@
* @param request
* registration request
* @return Allocate Response
- * @throws YarnException
- * @throws IOException
+ * @throws YarnException YarnException
+ * @throws IOException IOException
*/
@Override
public RegisterApplicationMasterResponse registerApplicationMaster
@@ -177,14 +173,14 @@
* @param request
* allocation request
* @return Allocate Response
- * @throws YarnException
- * @throws IOException
+ * @throws YarnException YarnException
+ * @throws IOException IOException
*/
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
- DistSchedAllocateRequest distRequest =
- RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class);
+ DistributedSchedulingAllocateRequest distRequest = RECORD_FACTORY
+ .newRecordInstance(DistributedSchedulingAllocateRequest.class);
distRequest.setAllocateRequest(request);
return allocateForDistributedScheduling(distRequest).getAllocateResponse();
}
@@ -199,9 +195,6 @@
/**
* Check if we already have a NMToken. if Not, generate the Token and
* add it to the response
- * @param response
- * @param nmTokens
- * @param allocatedContainers
*/
private void updateResponseWithNMTokens(AllocateResponse response,
List<NMToken> nmTokens, List<Container> allocatedContainers) {
@@ -235,11 +228,11 @@
}
private void updateParameters(
- DistSchedRegisterResponse registerResponse) {
- appParams.minResource = registerResponse.getMinAllocatableCapabilty();
- appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
+ RegisterDistributedSchedulingAMResponse registerResponse) {
+ appParams.minResource = registerResponse.getMinContainerResource();
+ appParams.maxResource = registerResponse.getMaxContainerResource();
appParams.incrementResource =
- registerResponse.getIncrAllocatableCapabilty();
+ registerResponse.getIncrContainerResource();
if (appParams.incrementResource == null) {
appParams.incrementResource = appParams.minResource;
}
@@ -253,11 +246,12 @@
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
- * (Priority, ResourceName, Capability) and adds it the outstanding
+ * (Priority, ResourceName, Capability) and adds to the outstanding
* OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
* the current YARN constraint that only a single ResourceRequest can exist at
- * a give Priority and Capability
- * @param resourceAsks
+ * a give Priority and Capability.
+ *
+ * @param resourceAsks the list with the {@link ResourceRequest}s
*/
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) {
@@ -297,11 +291,9 @@
/**
* This method matches a returned list of Container Allocations to any
- * outstanding OPPORTUNISTIC ResourceRequest
- * @param capability
- * @param allocatedContainers
+ * outstanding OPPORTUNISTIC ResourceRequest.
*/
- public void matchAllocationToOutstandingRequest(Resource capability,
+ private void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) {
for (Container c : allocatedContainers) {
containersAllocated.add(c.getId());
@@ -333,28 +325,29 @@
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
- throws YarnException, IOException {
+ throws YarnException, IOException {
LOG.info("Forwarding registration request to the" +
"Distributed Scheduler Service on YARN RM");
- DistSchedRegisterResponse dsResp = getNextInterceptor()
+ RegisterDistributedSchedulingAMResponse dsResp = getNextInterceptor()
.registerApplicationMasterForDistributedScheduling(request);
updateParameters(dsResp);
return dsResp;
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
+ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
- PartitionedResourceRequests partitionedAsks = partitionAskList(
- request.getAllocateRequest().getAskList());
+ PartitionedResourceRequests partitionedAsks =
+ partitionAskList(request.getAllocateRequest().getAskList());
List<ContainerId> releasedContainers =
request.getAllocateRequest().getReleaseList();
@@ -393,11 +386,12 @@
allocatedContainers.addAll(e.getValue());
}
}
+
request.setAllocatedContainers(allocatedContainers);
// Send all the GUARANTEED Reqs to RM
request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
- DistSchedAllocateResponse dsResp =
+ DistributedSchedulingAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);
// Update host to nodeId mapping
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index 22a6a24..ce5bda0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -29,7 +29,7 @@
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -37,15 +37,17 @@
import java.net.InetSocketAddress;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
/**
- * <p>The OpportunisticContainerAllocator allocates containers on a given list
- * of Nodes after it modifies the container sizes to within allowable limits
- * specified by the <code>ClusterManager</code> running on the RM. It tries to
- * distribute the containers as evenly as possible. It also uses the
- * <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for
- * the allocated containers</p>
+ * <p>
+ * The OpportunisticContainerAllocator allocates containers on a given list of
+ * nodes, after modifying the container sizes to respect the limits set by the
+ * ResourceManager. It tries to distribute the containers as evenly as possible.
+ * It also uses the <code>NMTokenSecretManagerInNM</code> to generate the
+ * required NM tokens for the allocated containers.
+ * </p>
*/
public class OpportunisticContainerAllocator {
@@ -78,15 +80,15 @@
this.webpagePort = webpagePort;
}
- public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
- ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks,
- Set<String> blacklist, ApplicationAttemptId appAttId,
- Map<String, NodeId> allNodes, String userName) throws YarnException {
+ public Map<Resource, List<Container>> allocate(
+ DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+ Collection<ResourceRequest> resourceAsks, Set<String> blacklist,
+ ApplicationAttemptId appAttId, Map<String, NodeId> allNodes,
+ String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
- Set<String> nodesAllocated = new HashSet<>();
for (ResourceRequest anyAsk : resourceAsks) {
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
- allNodes, userName, containers, nodesAllocated, anyAsk);
+ allNodes, userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
@@ -96,30 +98,30 @@
return containers;
}
- private void allocateOpportunisticContainers(DistSchedulerParams appParams,
- ContainerIdCounter idCounter, Set<String> blacklist,
- ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName,
- Map<Resource, List<Container>> containers, Set<String> nodesAllocated,
- ResourceRequest anyAsk) throws YarnException {
+ private void allocateOpportunisticContainers(
+ DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+ Set<String> blacklist, ApplicationAttemptId id,
+ Map<String, NodeId> allNodes, String userName,
+ Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
+ throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- - (containers.isEmpty() ?
- 0 : containers.get(anyAsk.getCapability()).size());
+ - (containers.isEmpty() ? 0 :
+ containers.get(anyAsk.getCapability()).size());
- List<String> topKNodesLeft = new ArrayList<>();
- for (String s : allNodes.keySet()) {
- // Bias away from whatever we have already allocated and respect blacklist
- if (nodesAllocated.contains(s) || blacklist.contains(s)) {
+ List<NodeId> nodesForScheduling = new ArrayList<>();
+ for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
+ // Do not use blacklisted nodes for scheduling.
+ if (blacklist.contains(nodeEntry.getKey())) {
continue;
}
- topKNodesLeft.add(s);
+ nodesForScheduling.add(nodeEntry.getValue());
}
int numAllocated = 0;
- int nextNodeToAllocate = 0;
+ int nextNodeToSchedule = 0;
for (int numCont = 0; numCont < toAllocate; numCont++) {
- String topNode = topKNodesLeft.get(nextNodeToAllocate);
- nextNodeToAllocate++;
- nextNodeToAllocate %= topKNodesLeft.size();
- NodeId nodeId = allNodes.get(topNode);
+ nextNodeToSchedule++;
+ nextNodeToSchedule %= nodesForScheduling.size();
+ NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
Container container = buildContainer(appParams, idCounter, anyAsk, id,
userName, nodeId);
List<Container> cList = containers.get(anyAsk.getCapability());
@@ -134,7 +136,7 @@
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
}
- private Container buildContainer(DistSchedulerParams appParams,
+ private Container buildContainer(DistributedSchedulerParams appParams,
ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
String userName, NodeId nodeId) throws YarnException {
ContainerId cId =
@@ -165,7 +167,7 @@
return container;
}
- private Resource normalizeCapability(DistSchedulerParams appParams,
+ private Resource normalizeCapability(DistributedSchedulerParams appParams,
ResourceRequest ask) {
return Resources.normalize(RESOURCE_CALCULATOR,
ask.getCapability(), appParams.minResource, appParams.maxResource,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
similarity index 75%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
index 8de849b..b093b3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -19,34 +19,30 @@
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
-import org.apache.hadoop.yarn.server.nodemanager.security
- .NMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.nodemanager.security
- .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
@@ -63,27 +59,30 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-public class TestLocalScheduler {
+/**
+ * Test cases for {@link DistributedScheduler}.
+ */
+public class TestDistributedScheduler {
@Test
- public void testLocalScheduler() throws Exception {
+ public void testDistributedScheduler() throws Exception {
Configuration conf = new Configuration();
- LocalScheduler localScheduler = new LocalScheduler();
+ DistributedScheduler distributedScheduler = new DistributedScheduler();
- RequestInterceptor finalReqIntcptr = setup(conf, localScheduler);
+ RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
- registerAM(localScheduler, finalReqIntcptr, Arrays.asList(
+ registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
- final AtomicBoolean flipFlag = new AtomicBoolean(false);
+ final AtomicBoolean flipFlag = new AtomicBoolean(true);
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
- Mockito.any(DistSchedAllocateRequest.class)))
- .thenAnswer(new Answer<DistSchedAllocateResponse>() {
+ Mockito.any(DistributedSchedulingAllocateRequest.class)))
+ .thenAnswer(new Answer<DistributedSchedulingAllocateResponse>() {
@Override
- public DistSchedAllocateResponse answer(InvocationOnMock
- invocationOnMock) throws Throwable {
+ public DistributedSchedulingAllocateResponse answer(
+ InvocationOnMock invocationOnMock) throws Throwable {
flipFlag.set(!flipFlag.get());
if (flipFlag.get()) {
return createAllocateResponse(Arrays.asList(
@@ -101,15 +100,15 @@
ResourceRequest opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
+
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 4 containers were allocated
AllocateResponse allocateResponse =
- localScheduler.allocate(allocateRequest);
+ distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
- // Verify equal distribution on hosts a and b
- // And None on c and d
+ // Verify equal distribution on hosts a and b, and none on c or d
Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
@@ -123,18 +122,18 @@
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
// Verify 6 containers were allocated
- allocateResponse = localScheduler.allocate(allocateRequest);
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
- // Verify New containers are equally distribution on hosts c and d
- // And None on a and b
+ // Verify new containers are equally distribution on hosts c and d,
+ // and none on a or b
allocs = mapAllocs(allocateResponse, 6);
Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
- // Ensure the LocalScheduler respects the list order..
+ // Ensure the DistributedScheduler respects the list order..
// The first request should be allocated to "d" since it is ranked higher
// The second request should be allocated to "c" since the ranking is
// flipped on every allocate response.
@@ -142,7 +141,7 @@
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
- allocateResponse = localScheduler.allocate(allocateRequest);
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
@@ -150,7 +149,7 @@
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
- allocateResponse = localScheduler.allocate(allocateRequest);
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
@@ -158,22 +157,23 @@
opportunisticReq =
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
- allocateResponse = localScheduler.allocate(allocateRequest);
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
allocs = mapAllocs(allocateResponse, 1);
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
}
- private void registerAM(LocalScheduler localScheduler, RequestInterceptor
- finalReqIntcptr, List<NodeId> nodeList) throws Exception {
- DistSchedRegisterResponse distSchedRegisterResponse =
- Records.newRecord(DistSchedRegisterResponse.class);
+ private void registerAM(DistributedScheduler distributedScheduler,
+ RequestInterceptor finalReqIntcptr, List<NodeId> nodeList)
+ throws Exception {
+ RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
+ Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
distSchedRegisterResponse.setRegisterResponse(
Records.newRecord(RegisterApplicationMasterResponse.class));
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
distSchedRegisterResponse.setContainerIdStart(0);
- distSchedRegisterResponse.setMaxAllocatableCapabilty(
+ distSchedRegisterResponse.setMaxContainerResource(
Resource.newInstance(1024, 4));
- distSchedRegisterResponse.setMinAllocatableCapabilty(
+ distSchedRegisterResponse.setMinContainerResource(
Resource.newInstance(512, 2));
distSchedRegisterResponse.setNodesForScheduling(nodeList);
Mockito.when(
@@ -181,12 +181,12 @@
Mockito.any(RegisterApplicationMasterRequest.class)))
.thenReturn(distSchedRegisterResponse);
- localScheduler.registerApplicationMaster(
+ distributedScheduler.registerApplicationMaster(
Records.newRecord(RegisterApplicationMasterRequest.class));
}
- private RequestInterceptor setup(Configuration conf, LocalScheduler
- localScheduler) {
+ private RequestInterceptor setup(Configuration conf,
+ DistributedScheduler distributedScheduler) {
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
Context context = Mockito.mock(Context.class);
@@ -215,12 +215,12 @@
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM();
nmTokenSecretManagerInNM.setMasterKey(mKey);
- localScheduler.initLocal(
+ distributedScheduler.initLocal(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
containerAllocator, nmTokenSecretManagerInNM, "test");
RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
- localScheduler.setNextInterceptor(finalReqIntcptr);
+ distributedScheduler.setNextInterceptor(finalReqIntcptr);
return finalReqIntcptr;
}
@@ -237,17 +237,18 @@
return opportunisticReq;
}
- private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
- DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
- (DistSchedAllocateResponse.class);
- distSchedAllocateResponse.setAllocateResponse(
- Records.newRecord(AllocateResponse.class));
+ private DistributedSchedulingAllocateResponse createAllocateResponse(
+ List<NodeId> nodes) {
+ DistributedSchedulingAllocateResponse distSchedAllocateResponse =
+ Records.newRecord(DistributedSchedulingAllocateResponse.class);
+ distSchedAllocateResponse
+ .setAllocateResponse(Records.newRecord(AllocateResponse.class));
distSchedAllocateResponse.setNodesForScheduling(nodes);
return distSchedAllocateResponse;
}
- private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
- allocateResponse, int expectedSize) throws Exception {
+ private Map<NodeId, List<ContainerId>> mapAllocs(
+ AllocateResponse allocateResponse, int expectedSize) throws Exception {
Assert.assertEquals(expectedSize,
allocateResponse.getAllocatedContainers().size());
Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
@@ -266,5 +267,4 @@
}
return allocs;
}
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
similarity index 81%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
index 5aabddc..843ac09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
@@ -27,15 +27,15 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -73,18 +73,18 @@
import java.util.concurrent.ConcurrentHashMap;
/**
- * The DistributedSchedulingService is started instead of the
- * ApplicationMasterService if DistributedScheduling is enabled for the YARN
+ * The DistributedSchedulingAMService is started instead of the
+ * ApplicationMasterService if distributed scheduling is enabled for the YARN
* cluster.
* It extends the functionality of the ApplicationMasterService by servicing
* clients (AMs and AMRMProxy request interceptors) that understand the
* DistributedSchedulingProtocol.
*/
-public class DistributedSchedulingService extends ApplicationMasterService
- implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
+public class DistributedSchedulingAMService extends ApplicationMasterService
+ implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> {
private static final Log LOG =
- LogFactory.getLog(DistributedSchedulingService.class);
+ LogFactory.getLog(DistributedSchedulingAMService.class);
private final NodeQueueLoadMonitor nodeMonitor;
@@ -94,12 +94,12 @@
new ConcurrentHashMap<>();
private final int k;
- public DistributedSchedulingService(RMContext rmContext,
- YarnScheduler scheduler) {
- super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+ public DistributedSchedulingAMService(RMContext rmContext,
+ YarnScheduler scheduler) {
+ super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler);
this.k = rmContext.getYarnConfiguration().getInt(
- YarnConfiguration.DIST_SCHEDULING_TOP_K,
- YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
+ YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
+ YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
YarnConfiguration.
@@ -149,7 +149,7 @@
@Override
public Server getServer(YarnRPC rpc, Configuration serverConf,
InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
- Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+ Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
addr, serverConf, secretManager,
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
@@ -184,43 +184,45 @@
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
RegisterApplicationMasterResponse response =
registerApplicationMaster(request);
- DistSchedRegisterResponse dsResp = recordFactory
- .newRecordInstance(DistSchedRegisterResponse.class);
+ RegisterDistributedSchedulingAMResponse dsResp = recordFactory
+ .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
dsResp.setRegisterResponse(response);
- dsResp.setMinAllocatableCapabilty(
+ dsResp.setMinContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
+ YarnConfiguration.
+ DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
)
);
- dsResp.setMaxAllocatableCapabilty(
+ dsResp.setMaxContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
)
);
- dsResp.setIncrAllocatableCapabilty(
+ dsResp.setIncrContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
+ YarnConfiguration.
+ DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
- YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
)
);
dsResp.setContainerTokenExpiryInterval(
@@ -238,8 +240,9 @@
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
+ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
List<Container> distAllocContainers = request.getAllocatedContainers();
for (Container container : distAllocContainers) {
// Create RMContainer
@@ -255,8 +258,8 @@
RMContainerEventType.LAUNCHED));
}
AllocateResponse response = allocate(request.getAllocateRequest());
- DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
- (DistSchedAllocateResponse.class);
+ DistributedSchedulingAllocateResponse dsResp = recordFactory
+ .newRecordInstance(DistributedSchedulingAllocateResponse.class);
dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
@@ -264,7 +267,7 @@
}
private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
- String rackName, NodeId nodeId) {
+ String rackName, NodeId nodeId) {
if (rackName != null) {
mapping.putIfAbsent(rackName, new HashSet<NodeId>());
Set<NodeId> nodeIds = mapping.get(rackName);
@@ -275,7 +278,7 @@
}
private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
- String rackName, NodeId nodeId) {
+ String rackName, NodeId nodeId) {
if (rackName != null) {
Set<NodeId> nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
@@ -346,7 +349,7 @@
break;
// <-- IGNORED EVENTS : END -->
default:
- LOG.error("Unknown event arrived at DistributedSchedulingService: "
+ LOG.error("Unknown event arrived at DistributedSchedulingAMService: "
+ event.toString());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 0c1df33..4509045 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1180,12 +1180,12 @@
if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
- DistributedSchedulingService distributedSchedulingService = new
- DistributedSchedulingService(this.rmContext, scheduler);
+ DistributedSchedulingAMService distributedSchedulingService = new
+ DistributedSchedulingAMService(this.rmContext, scheduler);
EventDispatcher distSchedulerEventDispatcher =
new EventDispatcher(distributedSchedulingService,
- DistributedSchedulingService.class.getName());
- // Add an event dispoatcher for the DistributedSchedulingService
+ DistributedSchedulingAMService.class.getName());
+ // Add an event dispatcher for the DistributedSchedulingAMService
// to handle node updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses,
// we register interest for it..
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 3764664..c677345 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -93,8 +94,8 @@
this.queue = queue;
this.user = user;
this.activeUsersManager = activeUsersManager;
- this.containerIdCounter =
- new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+ this.containerIdCounter = new AtomicLong(
+ epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index dcdc934..7d1b3c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -822,7 +822,7 @@
if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
- return new DistributedSchedulingService(getRMContext(), scheduler) {
+ return new DistributedSchedulingAMService(getRMContext(), scheduler) {
@Override
protected void serviceStart() {
// override to not start rpc handler
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
similarity index 75%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
index 4716bab..0213a94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
@@ -25,16 +25,11 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -44,17 +39,16 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -63,12 +57,10 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
- .DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
- .DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
- .AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.Assert;
import org.junit.Test;
@@ -78,9 +70,12 @@
import java.util.Arrays;
import java.util.List;
-public class TestDistributedSchedulingService {
+/**
+ * Test cases for {@link DistributedSchedulingAMService}.
+ */
+public class TestDistributedSchedulingAMService {
- // Test if the DistributedSchedulingService can handle both DSProtocol as
+ // Test if the DistributedSchedulingAMService can handle both DSProtocol as
// well as AMProtocol clients
@Test
public void testRPCWrapping() throws Exception {
@@ -116,7 +111,8 @@
Resource.newInstance(1, 2), 1, true, "exp",
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))));
- DistributedSchedulingService service = createService(factory, rmContext, c);
+ DistributedSchedulingAMService service =
+ createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@@ -126,7 +122,7 @@
ProtobufRpcEngine.class);
ApplicationMasterProtocolPB ampProxy =
RPC.getProxy(ApplicationMasterProtocolPB
- .class, 1, NetUtils.getConnectAddress(server), conf);
+ .class, 1, NetUtils.getConnectAddress(server), conf);
RegisterApplicationMasterResponse regResp =
new RegisterApplicationMasterResponsePBImpl(
ampProxy.registerApplicationMaster(null,
@@ -156,34 +152,34 @@
// Verify that the DistrubutedSchedulingService can handle the
- // DistributedSchedulerProtocol clients as well
- RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+ // DistributedSchedulingAMProtocol clients as well
+ RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
ProtobufRpcEngine.class);
- DistributedSchedulerProtocolPB dsProxy =
- RPC.getProxy(DistributedSchedulerProtocolPB
+ DistributedSchedulingAMProtocolPB dsProxy =
+ RPC.getProxy(DistributedSchedulingAMProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
- DistSchedRegisterResponse dsRegResp =
- new DistSchedRegisterResponsePBImpl(
+ RegisterDistributedSchedulingAMResponse dsRegResp =
+ new RegisterDistributedSchedulingAMResponsePBImpl(
dsProxy.registerApplicationMasterForDistributedScheduling(null,
((RegisterApplicationMasterRequestPBImpl)factory
.newRecordInstance(RegisterApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
Assert.assertEquals(4,
- dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
+ dsRegResp.getMaxContainerResource().getVirtualCores());
Assert.assertEquals(1024,
- dsRegResp.getMinAllocatableCapabilty().getMemorySize());
+ dsRegResp.getMinContainerResource().getMemorySize());
Assert.assertEquals(2,
- dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
+ dsRegResp.getIncrContainerResource().getVirtualCores());
- DistSchedAllocateRequestPBImpl distAllReq =
- (DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
- DistSchedAllocateRequest.class);
+ DistributedSchedulingAllocateRequestPBImpl distAllReq =
+ (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
+ DistributedSchedulingAllocateRequest.class);
distAllReq.setAllocateRequest(allReq);
distAllReq.setAllocatedContainers(Arrays.asList(c));
- DistSchedAllocateResponse dsAllocResp =
- new DistSchedAllocateResponsePBImpl(
+ DistributedSchedulingAllocateResponse dsAllocResp =
+ new DistributedSchedulingAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
distAllReq.getProto()));
Assert.assertEquals(
@@ -199,9 +195,9 @@
false, dsfinishResp.getIsUnregistered());
}
- private DistributedSchedulingService createService(final RecordFactory
+ private DistributedSchedulingAMService createService(final RecordFactory
factory, final RMContext rmContext, final Container c) {
- return new DistributedSchedulingService(rmContext, null) {
+ return new DistributedSchedulingAMService(rmContext, null) {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws
@@ -235,22 +231,24 @@
}
@Override
- public DistSchedRegisterResponse
+ public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
- RegisterApplicationMasterRequest request) throws
- YarnException, IOException {
- DistSchedRegisterResponse resp = factory.newRecordInstance(
- DistSchedRegisterResponse.class);
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ RegisterDistributedSchedulingAMResponse resp = factory
+ .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
resp.setContainerIdStart(54321L);
- resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
- resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
- resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
+ resp.setMaxContainerResource(Resource.newInstance(4096, 4));
+ resp.setMinContainerResource(Resource.newInstance(1024, 1));
+ resp.setIncrContainerResource(Resource.newInstance(2048, 2));
return resp;
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling(
- DistSchedAllocateRequest request) throws YarnException, IOException {
+ public DistributedSchedulingAllocateResponse
+ allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
List<ResourceRequest> askList =
request.getAllocateRequest().getAskList();
List<Container> allocatedContainers = request.getAllocatedContainers();
@@ -260,8 +258,8 @@
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());
- DistSchedAllocateResponse resp =
- factory.newRecordInstance(DistSchedAllocateResponse.class);
+ DistributedSchedulingAllocateResponse resp = factory
+ .newRecordInstance(DistributedSchedulingAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
return resp;