SAMZA-2611: [AM-HA] heartbeat reestablish causes container's heartbeat thread to die (#1452)

Symptom: When new AM takes a long time to to start up, already running container's heartbeat thread silently dies and does not make any heartbeat requests to the new AM.

Cause: AM url (yarn.am.tracking.url) key-value is removed from Coordinator stream when new AM is starting up - as this config is present in old config (aka coordinator stream) but not in the new AM generated config. This causes the running container to fetch a null when its constantly fetching value for this key and thus throws NPE.

Changes: When AMHA is enabled, do not remove this config
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 7e20f26..0eba766 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -26,6 +26,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
@@ -92,17 +93,19 @@
       if (!response.isAlive()) {
         if (isApplicationMasterHighAvailabilityEnabled) {
           LOG.warn("Failed to establish connection with {}. Checking for new AM", coordinatorUrl);
-          if (checkAndEstablishConnectionWithNewAM()) {
+          try {
+            if (checkAndEstablishConnectionWithNewAM()) {
+              return;
+            }
+          } catch (Exception e) {
+            // On exception in re-establish connection with new AM, force exit.
+            LOG.error("Exception trying to connect with new AM", e);
+            forceExit("failure in establishing cconnection with new AM", 0);
             return;
           }
         }
-        scheduler.schedule(() -> {
-          // On timeout of container shutting down, force exit.
-          LOG.error("Graceful shutdown timeout expired. Force exiting.");
-          ThreadUtil.logThreadDump("Thread dump at heartbeat monitor shutdown timeout.");
-          System.exit(1);
-        }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS);
-        onContainerExpired.run();
+        // On timeout of container shutting down, force exit.
+        forceExit("Graceful shutdown timeout expired. Force exiting.", SHUTDOWN_TIMOUT_MS);
       }
     }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS);
     started = true;
@@ -135,7 +138,7 @@
         }
       } catch (InterruptedException e) {
         LOG.warn("Interrupted during sleep.");
-        Thread.currentThread().interrupt();
+        throw new SamzaException(e);
       }
       attempt++;
     }
@@ -147,6 +150,15 @@
     return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
   }
 
+  private void forceExit(String message, int timeout) {
+    scheduler.schedule(() -> {
+      LOG.error(message);
+      ThreadUtil.logThreadDump("Thread dump at heartbeat monitor: due to " + message);
+      System.exit(1);
+    }, timeout, TimeUnit.MILLISECONDS);
+    onContainerExpired.run();
+  }
+
   private static class HeartbeatThreadFactory implements ThreadFactory {
     private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
     private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index b9469cb..0462663 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -24,6 +24,7 @@
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
+import org.apache.samza.coordinator.CoordinationConstants
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
 import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
@@ -198,6 +199,11 @@
         keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey))
       }
 
+      if (jobConfig.getApplicationMasterHighAvailabilityEnabled) {
+        // if AM HA is enabled then retain AM url as running containers are fetching it from c-stream until new AM publishes new AM url.
+        keysToRemove = keysToRemove.filter(configKey => !(configKey.equals(CoordinationConstants.YARN_COORDINATOR_URL)))
+      }
+
       info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
       keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
     }
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index ff297d1..5c0bf16 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -174,6 +174,30 @@
     this.containerHeartbeatMonitor.stop();
     verify(this.scheduler).shutdown();
   }
+
+  @Test
+  public void testConnectToNewAMSerdeException() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new NullPointerException("serde failed"));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
   /**
    * Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on
    * {@code schedulerFixedRateExecutionLatch} when the task is finished executing.