SAMZA-2604: Datamodel change to capture physical container id for AM HA (#1445)

Feature:
Main feature is Cluster based Job coordinator (aka AM) high availability. The feature ensures that the new AM can establish connection with already running containers to avoid restarting all running containers when AM dies. This PR enables capturing of the physical execution environment container id (ex: yarn container id "container_123_123") mapping to Samza logical processor id (ex: "0"). In future PRs, this mapping will be used by the new AM.

Changes:
Introduce new Coordinator Stream Message and manager to read/write this message
Container upon launch will write to c-stream, its logical and physical id
Job Coordinator(AM) upon launch will read the mapping of all containers from c-stream.

Tests:
added unit test for new manager
working on tests for other classes as they have no coverage for relevant code.

API changes:
New c-stream message

Usage instructions: None

Upgrade instructions: Backwards compatible. N/A
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 63ee3c7..b98c727 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -34,6 +34,7 @@
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
@@ -45,6 +46,7 @@
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -206,6 +208,12 @@
     this.localityManager =
         new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
 
+    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+      ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
+          new NamespaceAwareCoordinatorStreamStore(metadataStore, SetExecutionEnvContainerIdMapping.TYPE));
+
+      state.processorToExecutionId.putAll(executionContainerIdManager.readExecutionEnvironmentContainerIdMapping());
+    }
     // build metastore for container placement messages
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(metadataStore);
 
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 1bc1669..a2ad540 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -241,7 +241,6 @@
 
     state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
     state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
-
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
@@ -334,7 +333,6 @@
       return;
     }
     state.runningProcessors.remove(processorId);
-
     int exitStatus = resourceStatus.getExitCode();
     switch (exitStatus) {
       case SamzaResourceStatus.SUCCESS:
@@ -413,7 +411,6 @@
           processorId, containerId, containerHost);
       state.pendingProcessors.remove(processorId);
       state.runningProcessors.put(processorId, resource);
-
       if (state.neededProcessors.decrementAndGet() == 0) {
         state.jobHealthy.set(true);
       }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 784f0b4..930d366 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -106,6 +106,17 @@
   public final ConcurrentMap<String, SamzaResource> runningProcessors = new ConcurrentHashMap<>(0);
 
   /**
+   * Map of Samza processor Id (aka logical id) to execution environment container id (aka physical id ex: yarn container id).
+   * This map will be used during the start up phase of new AM in AM-HA.
+   *
+   * This map is populated at startup of ClusterBasedJobCoordinator.
+   * It initially holds the processorId to execution id mapping (if any) present in the coordinator stream.
+   * This could correspond to processors currently running or from previous attempt or previous deploy.
+   * TODO: SAMZA-2607 : remove this map and all its usages.
+   */
+  public final ConcurrentMap<String, String> processorToExecutionId = new ConcurrentHashMap<>(0);
+
+  /**
    *  Map of the failed Samza processor ID to resource status of the last attempted of the container.
    *  This map is only used when {@link org.apache.samza.config.ClusterManagerConfig#CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES}
    *  is set to false, this map tracks the containers which have exhausted all retires for restart and JobCoordinator is
diff --git a/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
new file mode 100644
index 0000000..557ef77
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used for persisting and reading the execution environment container id information into the metadata store.
+ * Processor id (logical Samza processor id) to execution environment container id (ex: yarn container id) is written.
+ **/
+public class ExecutionContainerIdManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionContainerIdManager.class);
+
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
+
+  /**
+   * Builds the ExecutionContainerIdManager based upon the provided {@link MetadataStore} that is instantiated.
+   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before reading/writing into metadata store.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write the processor container id mapping.
+   */
+  public ExecutionContainerIdManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE);
+  }
+
+  public void writeExecutionEnvironmentContainerIdMapping(String processorId, String executionEnvContainerId) {
+    Preconditions.checkNotNull(processorId, "Container's logical processor id can not be null.");
+    Preconditions.checkNotNull(executionEnvContainerId, "Container's physical execution environment container id can not be null.");
+    LOG.info("Processor {} has executionEnvContainerId as {}", processorId, executionEnvContainerId);
+    metadataStore.put(processorId, valueSerde.toBytes(executionEnvContainerId));
+    metadataStore.flush();
+  }
+
+  public Map<String, String> readExecutionEnvironmentContainerIdMapping() {
+    Map<String, String> executionEnvironmentContainerIdMapping = new HashMap<>();
+    metadataStore.all().forEach((processorId, valueBytes) -> {
+      if (valueBytes != null) {
+        String executionEnvContainerId = valueSerde.fromBytes(valueBytes);
+        executionEnvironmentContainerIdMapping.put(processorId, executionEnvContainerId);
+      }
+    });
+    if (LOG.isDebugEnabled()) {
+      for (Map.Entry<String, String> entry : executionEnvironmentContainerIdMapping.entrySet()) {
+        LOG.debug("Processor {} has executionEnvContainerId as {}", entry.getKey(), entry.getValue());
+      }
+    }
+    return executionEnvironmentContainerIdMapping;
+  }
+
+  public void close() {
+    metadataStore.close();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index 9b862bd..86983f1 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -23,6 +23,7 @@
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
@@ -51,7 +52,10 @@
   public String fromBytes(byte[] bytes) {
     Map<String, Object> values = messageSerde.fromBytes(bytes);
     CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[]{}, values);
-    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+    if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) {
+      SetExecutionEnvContainerIdMapping executionContainerIdMapping = new SetExecutionEnvContainerIdMapping(message);
+      return executionContainerIdMapping.getExecutionEnvironmentContainerId();
+    } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping hostMapping = new SetContainerHostMapping(message);
       return hostMapping.getHostLocality();
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
@@ -76,7 +80,11 @@
 
   @Override
   public byte[] toBytes(String value) {
-    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+    if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) {
+      SetExecutionEnvContainerIdMapping
+          executionEnvContainerIdMapping = new SetExecutionEnvContainerIdMapping(SOURCE, "", value);
+      return messageSerde.toBytes(executionEnvContainerIdMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping hostMapping = new SetContainerHostMapping(SOURCE, "", value, "", "");
       return messageSerde.toBytes(hostMapping.getMessageMap());
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
new file mode 100644
index 0000000..508b1df
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetExecutionEnvContainerIdMapping is used internally by the Samza framework to
+ * persist the processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ProcessorId
+ *     Type: set-execution-env-container-id-mapping
+ *     Source: "SamzaContainer-$ProcessorId"
+ *     MessageMap:
+ *     {
+ *         execution-env-container-id: execution environment container id
+ *     }
+ * }
+ * */
+public class SetExecutionEnvContainerIdMapping extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-execution-env-container-id-mapping";
+  public static final String EXEC_ENV_ID_KEY = "execution-env-container-id";
+
+  /**
+   * SetExecutionEnvContainerIdMapping is used to set the processor id to execution environment id mapping information.
+   * @param message which holds the processor id to execution environment id mapping information.
+   */
+  public SetExecutionEnvContainerIdMapping(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * SetExecutionEnvContainerIdMapping is used to set the processor id to execution environment id mapping information.
+   * @param source the source of the message
+   * @param key the key which is used to persist the message
+   * @param executionEnvContainerId the execution environment container id
+   */
+  public SetExecutionEnvContainerIdMapping(String source, String key, String executionEnvContainerId) {
+    super(source);
+    setType(TYPE);
+    setKey(key);
+    putMessageValue(EXEC_ENV_ID_KEY, executionEnvContainerId);
+  }
+
+  public String getExecutionEnvironmentContainerId() {
+    return getMessageValue(EXEC_ENV_ID_KEY);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index ec477c9..459ad89 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -31,6 +31,7 @@
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
@@ -40,6 +41,7 @@
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -98,7 +100,7 @@
       String jobName,
       String jobId,
       String containerId,
-      Optional<String> execEnvContainerId,
+      Optional<String> execEnvContainerIdOptional,
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
@@ -118,7 +120,7 @@
       Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
 
       // Creating diagnostics manager and reporter, and wiring it respectively
-      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
+      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerIdOptional, config);
       Option<DiagnosticsManager> diagnosticsManager = Option.empty();
       if (diagnosticsManagerReporterPair.isPresent()) {
         diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
@@ -153,6 +155,14 @@
         heartbeatMonitor.start();
       }
 
+      if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+        execEnvContainerIdOptional.ifPresent(execEnvContainerId -> {
+          ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
+              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE));
+          executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId, execEnvContainerId);
+        });
+      }
+
       container.run();
       if (heartbeatMonitor != null) {
         heartbeatMonitor.stop();
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
new file mode 100644
index 0000000..2ecd88c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestExecutionContainerIdManager {
+
+  private static final Config
+      CONFIG = new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", "test-kafka"));
+
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil;
+  private MetadataStore store;
+  private ExecutionContainerIdManager executionContainerIdManager;
+
+  @Before
+  public void setup() {
+    coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(CONFIG);
+    coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    store = Mockito.spy(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
+        SetExecutionEnvContainerIdMapping.TYPE));
+    executionContainerIdManager = new ExecutionContainerIdManager(store);
+
+  }
+
+  @After
+  public void tearDown() {
+    MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
+  }
+
+  @Test
+  public void testExecutionContainerIdManager() {
+    String physicalId = "container_123_123_123";
+    String processorId = "0";
+
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+    Map<String, String> localMap = executionContainerIdManager.readExecutionEnvironmentContainerIdMapping();
+
+    Map<String, String> expectedMap = ImmutableMap.of(processorId, physicalId);
+    assertEquals(expectedMap, localMap);
+
+    executionContainerIdManager.close();
+
+    MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer producer = coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer();
+    MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer consumer = coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer();
+    assertTrue(producer.isStopped());
+    assertTrue(consumer.isStopped());
+
+    ArgumentCaptor<byte[]> argument1 = ArgumentCaptor.forClass(byte[].class);
+    Mockito.verify(store).put(Mockito.eq(processorId), argument1.capture());
+    CoordinatorStreamValueSerde valueSerde = new CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE);
+    assertEquals(physicalId, valueSerde.fromBytes(argument1.getValue()));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testInvalidKeyExecutionContainerIdManager() {
+    String physicalId = "container_123_123_123";
+    String processorId = null;
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+  }
+  @Test(expected = NullPointerException.class)
+  public void testInvalidValueExecutionContainerIdManager() {
+    String physicalId = null;
+    String processorId = "0";
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+  }
+}
+