SAMZA-2611: [AM-HA] heartbeat reestablish causes container's heartbeat thread to die (#1452)
Symptom: When new AM takes a long time to to start up, already running container's heartbeat thread silently dies and does not make any heartbeat requests to the new AM.
Cause: AM url (yarn.am.tracking.url) key-value is removed from Coordinator stream when new AM is starting up - as this config is present in old config (aka coordinator stream) but not in the new AM generated config. This causes the running container to fetch a null when its constantly fetching value for this key and thus throws NPE.
Changes: When AMHA is enabled, do not remove this config
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 7e20f26..0eba766 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetConfig;
@@ -92,17 +93,19 @@
if (!response.isAlive()) {
if (isApplicationMasterHighAvailabilityEnabled) {
LOG.warn("Failed to establish connection with {}. Checking for new AM", coordinatorUrl);
- if (checkAndEstablishConnectionWithNewAM()) {
+ try {
+ if (checkAndEstablishConnectionWithNewAM()) {
+ return;
+ }
+ } catch (Exception e) {
+ // On exception in re-establish connection with new AM, force exit.
+ LOG.error("Exception trying to connect with new AM", e);
+ forceExit("failure in establishing cconnection with new AM", 0);
return;
}
}
- scheduler.schedule(() -> {
- // On timeout of container shutting down, force exit.
- LOG.error("Graceful shutdown timeout expired. Force exiting.");
- ThreadUtil.logThreadDump("Thread dump at heartbeat monitor shutdown timeout.");
- System.exit(1);
- }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS);
- onContainerExpired.run();
+ // On timeout of container shutting down, force exit.
+ forceExit("Graceful shutdown timeout expired. Force exiting.", SHUTDOWN_TIMOUT_MS);
}
}, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS);
started = true;
@@ -135,7 +138,7 @@
}
} catch (InterruptedException e) {
LOG.warn("Interrupted during sleep.");
- Thread.currentThread().interrupt();
+ throw new SamzaException(e);
}
attempt++;
}
@@ -147,6 +150,15 @@
return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
}
+ private void forceExit(String message, int timeout) {
+ scheduler.schedule(() -> {
+ LOG.error(message);
+ ThreadUtil.logThreadDump("Thread dump at heartbeat monitor: due to " + message);
+ System.exit(1);
+ }, timeout, TimeUnit.MILLISECONDS);
+ onContainerExpired.run();
+ }
+
private static class HeartbeatThreadFactory implements ThreadFactory {
private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index b9469cb..0462663 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -24,6 +24,7 @@
import org.apache.samza.SamzaException
import org.apache.samza.config._
+import org.apache.samza.coordinator.CoordinationConstants
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
@@ -198,6 +199,11 @@
keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey))
}
+ if (jobConfig.getApplicationMasterHighAvailabilityEnabled) {
+ // if AM HA is enabled then retain AM url as running containers are fetching it from c-stream until new AM publishes new AM url.
+ keysToRemove = keysToRemove.filter(configKey => !(configKey.equals(CoordinationConstants.YARN_COORDINATOR_URL)))
+ }
+
info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
}
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index ff297d1..5c0bf16 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -174,6 +174,30 @@
this.containerHeartbeatMonitor.stop();
verify(this.scheduler).shutdown();
}
+
+ @Test
+ public void testConnectToNewAMSerdeException() throws InterruptedException {
+ String containerExecutionId = "0";
+ String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+ this.containerHeartbeatMonitor =
+ spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+ containerExecutionId, coordinatorStreamStore, true, 5, 10));
+ CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+ when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+ when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+ when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new NullPointerException("serde failed"));
+
+ this.containerHeartbeatMonitor.start();
+ // wait for the executor to finish the heartbeat check task
+ boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+ assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+ // shutdown task should have been submitted
+ verify(this.scheduler).schedule(any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS));
+ verify(this.onExpired).run();
+
+ this.containerHeartbeatMonitor.stop();
+ verify(this.scheduler).shutdown();
+ }
/**
* Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on
* {@code schedulerFixedRateExecutionLatch} when the task is finished executing.