SAMZA-2601: avoid infinite loop when resource not allocated with host affinity disabled (#1441)

Problem: If YARN does not immediately allocate resources for a container and host affinity is disabled for the job, then the container allocator will spam the logs with messages about the request, even if the request is considered expired. This is because the allocator loops over the pending requests to check if they have been allocated, but there is no delay between loop iterations, so it might just keep checking the same pending request over and over as fast as possible. A metric is also incremented in this flow, so that metric is extremely high, which is probably not the intention.

Changes: Avoid infinite loop by breaking the loop if host affinity is disabled

Tests: new unit test

API changes: N/A

Usage instructions: N/A

Upgrade instructions: None
diff --git a/KEYS b/KEYS
index 3cc4efc..9730772 100644
--- a/KEYS
+++ b/KEYS
@@ -918,3 +918,40 @@
 VcSdqZkA/iKbC7Fv6xS7fTmhnLmAJ40h
 =Ja6V
 -----END PGP PUBLIC KEY BLOCK-----
+pub   2048R/D2103453 2020-11-19 [expires: 2023-11-19]
+uid                  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+sig 3        D2103453 2020-11-19  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+sub   2048R/4C01BF41 2020-11-19 [expires: 2023-11-19]
+sig          D2103453 2020-11-19  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v2.0.22 (GNU/Linux)
+
+mQENBF+2rJsBCACpNvGmT730xGXb0/lELUhzBu10BhdXfkr6FupYU6IxNMSZc/W4
+LKjB0ptF/OfodjdOhA2Gbv0kZt4mRBgmwe+vIJkFAXOZccq+G708ZPBsjifUDESN
+oG9/juCJSSrYGcclWiPg0Fe3bBWm5KPWSkYG1OnHTWnqT7a5n6comgZmNori5tmA
+aPN/RnEywEY7vnqyvcJnfPKysvvaoTUbfoUaiGIVGwprYnQ3huvq+o81etWAOVvZ
+qry+6m6fA5l35bmr29xlbiK7lYCShie7u03U9eRiaLGevFSziViq/cp7OlcAXVN4
+hGtuXh+NVMgaxsy7HIs4H+x9oc9IO+WpdNS5ABEBAAG0M0JvcmlzIFNoa29sbmlr
+IChMaW5rZWRpbiBTYW16YSkgPGJvcnlhc0BhcGFjaGUub3JnPokBPwQTAQIAKQUC
+X7asmwIbAwUJBaOagAcLCQgHAwIBBhUIAgkKCwQWAgMBAh4BAheAAAoJED6OsXvS
+EDRTz4IH/iWHlHFlvasnLCgVU+29NMrx+jFRXMG5pDzd/bNocrlD+E42ocpOU1vo
+bGwvey0I4Hi8DCGKR2p14IAs2tzaKYZR0kel1QesciI+ngyE/zWEI3xKv8OXiS8A
+XdQC1Eq8Q+BJhun3A8zuQU095r9/sygTFBWlOtQ1Wn2qJ5oC6005qDDbyDAbEsxp
+4m+VaM7faHGjpNOSekHCJW2wue55I+IEF/eH8BsRbw6DGlIFhspsl17wJQAcnmRu
+wakPKZGtEyjgwVYdmUpne7GW87n18JdT/6ywfTqw17/ly0zBJhl2M0JTq0d0+Z9P
+NFwt1pWDDJOtmMClMrj3PcAk5DVVPFG5AQ0EX7asmwEIALMe7kvm6j5bgrj2fZVg
+663bqvTSdn90WQ+lQgpURYXMz4VAFNKlaqDGOX2L94vdI4pIEvPfedQ0C9wZQFz3
+NujaRL4gsxx9CHolKzTs68DE7pyiikzpZFuSIKfv0bukSmzqgvz83VKH33jAZiqd
+HFzRh6ad0iMhDpF4g//E6SLnqJdrf335JAB1P/v9LH579XWfbGWegIkXOwG8QTPB
+LSslAKah8JFBILQfeL4MmbJnwb/BQqLIWVyK5hjzon+mxKEWHQNLaxM4R517JCFQ
+DNzBYyoqO517XXEbCdUQ/ZXCRH2+gCFEWj9kA2b6Frqf4B1PD8BUWBjBYDv1RNNd
+kfsAEQEAAYkBJQQYAQIADwUCX7asmwIbDAUJBaOagAAKCRA+jrF70hA0U/trB/9G
+gqyT6JSlmBY+8ep9NW+bDaoWQZY0f4kAR2JyehUM8kLrLqB7D3moah7XrR6nOoWt
+e56yp81+3hGLTcHK2WKapTgYssPrm/nv7NySMUw04UvRMrn+wV5U+7MUtdBxO23R
+hSRuqb40wPBzo97Lb7LOEqLF0OHNMiFSV1qLuhyDGg87zEkfJM+o52Y/CT9nh60Z
+5OXa1fCQhpPC29d7OsBJklE3EtwokCyeeqxJFpcDBNTqM+5GLWbRZp6YQfnNdoPB
+XgSlVbifAXWaMZr8kFpNF6H+zm6tZ3rclihqgWhUqrAAmT11qtMacbK8dRVw3im5
+oZPFXmvDQy/353sy1WRs
+=MLmR
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/build.gradle b/build.gradle
index e30776a..2931169 100644
--- a/build.gradle
+++ b/build.gradle
@@ -82,6 +82,7 @@
     'docs/_site/**',
     'docs/sitemap.xml',
     'docs/learn/documentation/*/api/javadocs/**',
+    'docs/learn/documentation/*/rest/javadocs/**',
     'docs/Gemfile.lock',
     'gradle/wrapper/**',
     'gradlew',
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
index e6208ba..8a90549 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
@@ -25,6 +25,7 @@
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.stream.Collectors;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.util.HttpUtil;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -56,7 +57,7 @@
 
   public ContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
     this.heartbeatEndpoint =
-        String.format("%scontainerHeartbeat?executionContainerId=%s", coordinatorUrl, executionEnvContainerId);
+        String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, coordinatorUrl, executionEnvContainerId);
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index d7a648b..22268a8 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -27,4 +27,13 @@
   public static final String APPLICATION_RUNNER_PATH_SUFFIX = "ApplicationRunnerData";
   public static final String RUNID_LOCK_ID = "runId";
   public static final int LOCK_TIMEOUT_MS = 300000;
+
+  // Yarn coordination constants for heartbeat
+  public static final String YARN_CONTAINER_HEARTBEAT_SERVELET = "containerHeartbeat";
+  public static final String YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID = "executionContainerId";
+  public static final String YARN_COORDINATOR_URL = "yarn.am.tracking.url";
+  private static final String YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT = "%s" + YARN_CONTAINER_HEARTBEAT_SERVELET;
+  private static final String YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT = YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID + "=" + "%s";
+  public static final String YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT = YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT + "?" +
+      YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT;
 }
diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
index 3ed3928..b683d35 100644
--- a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
@@ -27,6 +27,7 @@
 import javax.servlet.http.HttpServletResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
 import org.apache.samza.job.yarn.YarnAppState;
 import org.apache.samza.job.yarn.YarnContainer;
@@ -47,7 +48,6 @@
  */
 public class YarnContainerHeartbeatServlet extends HttpServlet {
 
-  private static final String YARN_CONTAINER_ID = "executionContainerId";
   private static final Logger LOG = LoggerFactory.getLogger(YarnContainerHeartbeatServlet.class);
   private static final String APPLICATION_JSON = "application/json";
   private static final String GROUP = SamzaAppMasterMetrics.class.getName();
@@ -67,7 +67,7 @@
       throws ServletException, IOException {
     ContainerId yarnContainerId;
     PrintWriter printWriter = resp.getWriter();
-    String containerIdParam = req.getParameter(YARN_CONTAINER_ID);
+    String containerIdParam = req.getParameter(CoordinationConstants.YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID);
     ContainerHeartbeatResponse response;
     resp.setContentType(APPLICATION_JSON);
     boolean alive = false;
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index f436f79..0f512ad 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -23,6 +23,7 @@
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.Config
+import org.apache.samza.coordinator.CoordinationConstants
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
@@ -40,7 +41,7 @@
 class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry, yarnConfiguration: YarnConfiguration) extends  Logging {
   var rpcApp: HttpServer = null
   var webApp: HttpServer = null
-  val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+  val SERVER_URL_OPT: String = CoordinationConstants.YARN_COORDINATOR_URL;
   var securityManager: Option[SamzaAppMasterSecurityManager] = None
 
   def onInit() {
@@ -56,7 +57,8 @@
     webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state))
     webApp.start
 
-    samzaAppState.jobModelManager.server.addServlet("/containerHeartbeat", new YarnContainerHeartbeatServlet(state, registry))
+    samzaAppState.jobModelManager.server.addServlet("/" + CoordinationConstants.YARN_CONTAINER_HEARTBEAT_SERVELET,
+      new YarnContainerHeartbeatServlet(state, registry))
     samzaAppState.jobModelManager.start
     state.rpcUrl = rpcApp.getUrl
     state.trackingUrl = webApp.getUrl
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
index 8987834..0901d85 100644
--- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -24,6 +24,7 @@
 import junit.framework.Assert;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.yarn.YarnAppState;
 import org.apache.samza.job.yarn.YarnContainer;
@@ -76,7 +77,7 @@
     String validContainerId = "container_1350670447861_0003_01_000002";
     when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
     yarnAppState.runningProcessors.put(validContainerId, container);
-    URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + validContainerId);
+    URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, webApp.getUrl().toString(), validContainerId));
     String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
     heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
     Assert.assertTrue(heartbeat.isAlive());
@@ -89,7 +90,8 @@
     String invalidContainerId = "container_1350670447861_0003_01_000002";
     when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
     yarnAppState.runningProcessors.put(validContainerId, container);
-    URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + invalidContainerId);
+    URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT,
+        webApp.getUrl().toString(), invalidContainerId));
     String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
     heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
     Assert.assertFalse(heartbeat.isAlive());