SAMZA-2600: Extract constants for string literals used in AM and container (#1439)
* Extract constants for string literals used in AM and container
* Rename samza.autosizing.server.url to yarn.am.tracking.url
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
index e6208ba..8a90549 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
@@ -25,6 +25,7 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.stream.Collectors;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.util.HttpUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
@@ -56,7 +57,7 @@
public ContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
this.heartbeatEndpoint =
- String.format("%scontainerHeartbeat?executionContainerId=%s", coordinatorUrl, executionEnvContainerId);
+ String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, coordinatorUrl, executionEnvContainerId);
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index d7a648b..22268a8 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -27,4 +27,13 @@
public static final String APPLICATION_RUNNER_PATH_SUFFIX = "ApplicationRunnerData";
public static final String RUNID_LOCK_ID = "runId";
public static final int LOCK_TIMEOUT_MS = 300000;
+
+ // Yarn coordination constants for heartbeat
+ public static final String YARN_CONTAINER_HEARTBEAT_SERVELET = "containerHeartbeat";
+ public static final String YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID = "executionContainerId";
+ public static final String YARN_COORDINATOR_URL = "yarn.am.tracking.url";
+ private static final String YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT = "%s" + YARN_CONTAINER_HEARTBEAT_SERVELET;
+ private static final String YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT = YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID + "=" + "%s";
+ public static final String YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT = YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT + "?" +
+ YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT;
}
diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
index 3ed3928..b683d35 100644
--- a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
@@ -27,6 +27,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
import org.apache.samza.job.yarn.YarnAppState;
import org.apache.samza.job.yarn.YarnContainer;
@@ -47,7 +48,6 @@
*/
public class YarnContainerHeartbeatServlet extends HttpServlet {
- private static final String YARN_CONTAINER_ID = "executionContainerId";
private static final Logger LOG = LoggerFactory.getLogger(YarnContainerHeartbeatServlet.class);
private static final String APPLICATION_JSON = "application/json";
private static final String GROUP = SamzaAppMasterMetrics.class.getName();
@@ -67,7 +67,7 @@
throws ServletException, IOException {
ContainerId yarnContainerId;
PrintWriter printWriter = resp.getWriter();
- String containerIdParam = req.getParameter(YARN_CONTAINER_ID);
+ String containerIdParam = req.getParameter(CoordinationConstants.YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID);
ContainerHeartbeatResponse response;
resp.setContentType(APPLICATION_JSON);
boolean alive = false;
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index f436f79..0f512ad 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.samza.clustermanager.SamzaApplicationState
import org.apache.samza.config.Config
+import org.apache.samza.coordinator.CoordinationConstants
import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
import org.apache.samza.coordinator.stream.messages.SetConfig
@@ -40,7 +41,7 @@
class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry, yarnConfiguration: YarnConfiguration) extends Logging {
var rpcApp: HttpServer = null
var webApp: HttpServer = null
- val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+ val SERVER_URL_OPT: String = CoordinationConstants.YARN_COORDINATOR_URL;
var securityManager: Option[SamzaAppMasterSecurityManager] = None
def onInit() {
@@ -56,7 +57,8 @@
webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state))
webApp.start
- samzaAppState.jobModelManager.server.addServlet("/containerHeartbeat", new YarnContainerHeartbeatServlet(state, registry))
+ samzaAppState.jobModelManager.server.addServlet("/" + CoordinationConstants.YARN_CONTAINER_HEARTBEAT_SERVELET,
+ new YarnContainerHeartbeatServlet(state, registry))
samzaAppState.jobModelManager.start
state.rpcUrl = rpcApp.getUrl
state.trackingUrl = webApp.getUrl
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
index 8987834..0901d85 100644
--- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -24,6 +24,7 @@
import junit.framework.Assert;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.job.yarn.YarnAppState;
import org.apache.samza.job.yarn.YarnContainer;
@@ -76,7 +77,7 @@
String validContainerId = "container_1350670447861_0003_01_000002";
when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
yarnAppState.runningProcessors.put(validContainerId, container);
- URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + validContainerId);
+ URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, webApp.getUrl().toString(), validContainerId));
String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
Assert.assertTrue(heartbeat.isAlive());
@@ -89,7 +90,8 @@
String invalidContainerId = "container_1350670447861_0003_01_000002";
when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
yarnAppState.runningProcessors.put(validContainerId, container);
- URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + invalidContainerId);
+ URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT,
+ webApp.getUrl().toString(), invalidContainerId));
String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
Assert.assertFalse(heartbeat.isAlive());