SAMZA-2510: Incorrect shutdown status due to race between runloop and process callback thread (#1344)

diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index a509a27..6974f35 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -155,12 +155,7 @@
 
       long prevNs = clock.nanoTime();
 
-      while (!shutdownNow) {
-        if (throwable != null) {
-          log.error("Caught throwable and stopping run loop", throwable);
-          throw new SamzaException(throwable);
-        }
-
+      while (!shutdownNow && throwable == null) {
         long startNs = clock.nanoTime();
 
         IncomingMessageEnvelope envelope = chooseEnvelope();
@@ -185,6 +180,17 @@
           containerMetrics.utilization().set(((double) activeNs) / totalNs);
         }
       }
+
+      /*
+       * The current semantics of external shutdown request (RunLoop.shutdown()) is loosely defined and run loop doesn't
+       * wait for inflight messages to complete and triggers shutdown as soon as it notices the shutdown request.
+       * Hence, it is possible that the exception may or may not propagated based on order of execution
+       * between process callback and run loop thread.
+       */
+      if (throwable != null) {
+        log.error("Caught throwable and stopping run loop", throwable);
+        throw new SamzaException(throwable);
+      }
     } finally {
       workerTimer.shutdown();
       callbackExecutor.shutdown();
@@ -648,8 +654,10 @@
     @Override
     public void onFailure(TaskCallback callback, Throwable t) {
       try {
-        state.doneProcess();
+        // set the exception code ahead of marking the message as processed to make sure the exception
+        // is visible to the run loop thread promptly. Refer SAMZA-2510 for more details.
         abort(t);
+        state.doneProcess();
         // update pending count, but not offset
         TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
         log.error("Got callback failure for task {}", callbackImpl.getTaskName(), t);
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 4556679..9d65a57 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.container;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,6 +30,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.context.ContainerContext;
@@ -780,4 +782,28 @@
 
     commitLatch.await();
   }
+
+  @Test(expected = SamzaException.class)
+  public void testExceptionIsPropagated() {
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
+    TestTask task0 = new TestTask(false, false, false, null);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+
+    Map<TaskName, TaskInstance> tasks = ImmutableMap.of(taskName0, t0);
+
+    int maxMessagesInFlight = 2;
+    RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+        callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+        () -> 0L, false);
+
+    when(consumerMultiplexer.choose(false))
+        .thenReturn(envelope0)
+        .thenReturn(ssp0EndOfStream)
+        .thenReturn(null);
+
+    runLoop.run();
+  }
 }