SAMZA-2510: Incorrect shutdown status due to race between runloop and process callback thread (#1344)
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index a509a27..6974f35 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -155,12 +155,7 @@
long prevNs = clock.nanoTime();
- while (!shutdownNow) {
- if (throwable != null) {
- log.error("Caught throwable and stopping run loop", throwable);
- throw new SamzaException(throwable);
- }
-
+ while (!shutdownNow && throwable == null) {
long startNs = clock.nanoTime();
IncomingMessageEnvelope envelope = chooseEnvelope();
@@ -185,6 +180,17 @@
containerMetrics.utilization().set(((double) activeNs) / totalNs);
}
}
+
+ /*
+ * The current semantics of external shutdown request (RunLoop.shutdown()) is loosely defined and run loop doesn't
+ * wait for inflight messages to complete and triggers shutdown as soon as it notices the shutdown request.
+ * Hence, it is possible that the exception may or may not propagated based on order of execution
+ * between process callback and run loop thread.
+ */
+ if (throwable != null) {
+ log.error("Caught throwable and stopping run loop", throwable);
+ throw new SamzaException(throwable);
+ }
} finally {
workerTimer.shutdown();
callbackExecutor.shutdown();
@@ -648,8 +654,10 @@
@Override
public void onFailure(TaskCallback callback, Throwable t) {
try {
- state.doneProcess();
+ // set the exception code ahead of marking the message as processed to make sure the exception
+ // is visible to the run loop thread promptly. Refer SAMZA-2510 for more details.
abort(t);
+ state.doneProcess();
// update pending count, but not offset
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
log.error("Got callback failure for task {}", callbackImpl.getTaskName(), t);
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 4556679..9d65a57 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -19,6 +19,7 @@
package org.apache.samza.container;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -29,6 +30,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.context.ContainerContext;
@@ -780,4 +782,28 @@
commitLatch.await();
}
+
+ @Test(expected = SamzaException.class)
+ public void testExceptionIsPropagated() {
+ SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+ when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+ OffsetManager offsetManager = mock(OffsetManager.class);
+
+ TestTask task0 = new TestTask(false, false, false, null);
+ TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+
+ Map<TaskName, TaskInstance> tasks = ImmutableMap.of(taskName0, t0);
+
+ int maxMessagesInFlight = 2;
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
+
+ when(consumerMultiplexer.choose(false))
+ .thenReturn(envelope0)
+ .thenReturn(ssp0EndOfStream)
+ .thenReturn(null);
+
+ runLoop.run();
+ }
}