Gracefully handle interruptions in the Helix logic. (#1779)

This PR aims to address the problem that the interrupt signal is swallowed by the catch logic so the thread is not really interrupted. This may cause leakage or race conditions.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
index f36725b..18c278b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
@@ -38,19 +38,19 @@
       throw new StageException("No async worker found for " + taskType);
     }
 
-    worker.queueEvent(taskType, new Runnable() {
-      @Override
-      public void run() {
-        long startTimestamp = System.currentTimeMillis();
-        logger.info("START AsyncProcess: {}", taskType);
-        try {
-          execute(event);
-        } catch (Exception e) {
-          logger.error("Failed to process {} asynchronously", taskType, e);
-        }
-        long endTimestamp = System.currentTimeMillis();
-        logger.info("END AsyncProcess: {}, took {} ms", taskType, endTimestamp - startTimestamp);
+    worker.queueEvent(taskType, () -> {
+      long startTimestamp = System.currentTimeMillis();
+      logger.info("START AsyncProcess: {}", taskType);
+      try {
+        execute(event);
+      } catch (InterruptedException e) {
+        logger.warn("Process {} has been interrupted", taskType, e);
+        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        logger.error("Failed to process {} asynchronously", taskType, e);
       }
+      long endTimestamp = System.currentTimeMillis();
+      logger.info("END AsyncProcess: {}, took {} ms", taskType, endTimestamp - startTimestamp);
     });
     logger.info("Submitted asynchronous {} task to worker", taskType);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
index 90719d1..1bf3721 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java
@@ -126,4 +126,4 @@
     CallbackEventThreadPoolFactory.unregisterEventProcessor(_manager.hashCode());
     _threadPoolExecutor = null;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index b3df401..e165199 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -767,7 +767,9 @@
         }
       }
     } catch (InterruptedException e) {
-      LOG.error("Current thread is interrupted when starting ZkCacheEventThread. ", e);
+      // The InterruptedException may come from lockInterruptibly().
+      // If it fails to get the lock, throw exception so none of the initialization will be done.
+      throw new HelixException("Current thread is interrupted when acquiring lock. ", e);
     } finally {
       _eventLock.unlock();
     }
@@ -815,7 +817,6 @@
     }
 
     LOG.debug("Stop ZkCacheEventThread...done");
-
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 794182d..2507990 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -20,6 +20,7 @@
  */
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.SystemPropertyKeys;
@@ -134,7 +135,7 @@
   }
 
   /**
-   * Waits uninterruptibly until the task has started.
+   * Waits until the task has started.
    */
   public void waitTillStarted() {
     synchronized (_startedSync) {
@@ -142,7 +143,7 @@
         try {
           _startedSync.wait();
         } catch (InterruptedException e) {
-          LOG.warn(
+          throw new HelixException(
               String.format("Interrupted while waiting for task %s to start.", _taskPartition), e);
         }
       }
@@ -150,7 +151,7 @@
   }
 
   /**
-   * Waits uninterruptibly until the task has finished, either normally or due to an
+   * Waits until the task has finished, either normally or due to an
    * error/cancellation..
    */
   public TaskResult waitTillDone() {
@@ -159,7 +160,7 @@
         try {
           _doneSync.wait();
         } catch (InterruptedException e) {
-          LOG.warn(
+          throw new HelixException(
               String.format("Interrupted while waiting for task %s to complete.", _taskPartition),
               e);
         }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestAsyncBaseStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestAsyncBaseStage.java
new file mode 100644
index 0000000..e5c0efe
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestAsyncBaseStage.java
@@ -0,0 +1,82 @@
+package org.apache.helix.controller.stages;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAsyncBaseStage {
+  private static AsyncWorkerType DEFAULT_WORKER_TYPE = AsyncWorkerType.ExternalViewComputeWorker;
+  @Test
+  public void testAsyncStageCleanup() throws Exception {
+    BlockingAsyncStage blockingAsyncStage = new BlockingAsyncStage();
+
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> asyncFIFOWorkerPool =
+        new HashMap<>();
+    DedupEventProcessor<String, Runnable> worker =
+        new DedupEventProcessor<String, Runnable>("ClusterName", DEFAULT_WORKER_TYPE.name()) {
+          @Override
+          protected void handleEvent(Runnable event) {
+            event.run();
+          }
+        };
+    worker.start();
+    asyncFIFOWorkerPool.put(DEFAULT_WORKER_TYPE, worker);
+
+    ClusterEvent event = new ClusterEvent("ClusterName", ClusterEventType.OnDemandRebalance);
+    event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), asyncFIFOWorkerPool);
+
+    // Test normal execute case
+    blockingAsyncStage.process(event);
+    Assert.assertTrue(TestHelper.verify(() -> blockingAsyncStage._isStarted, 500));
+    Assert.assertFalse(blockingAsyncStage._isFinished);
+    blockingAsyncStage.proceed();
+    Assert.assertTrue(TestHelper.verify(() -> blockingAsyncStage._isFinished, 500));
+    blockingAsyncStage.reset();
+
+    // Test interruption case
+    blockingAsyncStage.process(event);
+    TestHelper.verify(() -> blockingAsyncStage._isStarted, 500);
+    Assert.assertFalse(blockingAsyncStage._isFinished);
+    worker.shutdown();
+    Assert.assertFalse(TestHelper.verify(() -> blockingAsyncStage._isFinished, 1000));
+    Assert.assertFalse(worker.isAlive());
+    blockingAsyncStage.reset();
+  }
+
+  private class BlockingAsyncStage extends AbstractAsyncBaseStage {
+    public boolean _isFinished = false;
+    public boolean _isStarted = false;
+
+    private CountDownLatch _countDownLatch = new CountDownLatch(1);
+
+    public void reset() {
+      _isFinished = false;
+      _isStarted = false;
+      _countDownLatch = new CountDownLatch(1);
+    }
+
+    public void proceed() {
+      _countDownLatch.countDown();
+    }
+
+    @Override
+    public AsyncWorkerType getAsyncWorkerType() {
+      return DEFAULT_WORKER_TYPE;
+    }
+
+    @Override
+    public void execute(ClusterEvent event) throws Exception {
+      _isStarted = true;
+      _countDownLatch.await();
+      _isFinished = true;
+    }
+  }
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 7f662cf..506d234 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -244,6 +244,7 @@
         }
       } catch (InterruptedException e) {
         LOG.error("Interrupted waiting for success", e);
+        return false;
       }
       return true;
     }