Add TASK_THREADPOOL_RESET_TIMEOUT as system property (#2177)

Add TASK_THREADPOOL_RESET_TIMEOUT as system property

Allow users to specify reset timeout with system property.
diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 4fe651c..75e8c03 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -34,6 +34,9 @@
   // Task Driver
   public static final String TASK_CONFIG_LIMITATION = "helixTask.configsLimitation";
 
+  // Task executor threadpool reset timeout in ms
+  public static final String TASK_THREADPOOL_RESET_TIMEOUT = "helixTask.threadpool.resetTimeout";
+
   // ZKHelixManager
   public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";
 
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index c5b0938..321ff78 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,10 +35,12 @@
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.messaging.handling.AsyncCallbackService;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.TaskExecutor;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -45,6 +48,7 @@
 import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
 import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +58,7 @@
   private final HelixTaskExecutor _taskExecutor;
   // TODO:rename to factory, this is not a service
   private final AsyncCallbackService _asyncCallbackService;
+  private final int _taskThreadpoolResetTimeout;
 
   private static Logger _logger = LoggerFactory.getLogger(DefaultMessagingService.class);
   ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded =
@@ -72,8 +77,12 @@
         new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()),
         new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()));
     _asyncCallbackService = new AsyncCallbackService();
-    _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
-        _asyncCallbackService);
+
+    _taskThreadpoolResetTimeout = HelixUtil
+        .getSystemPropertyAsInt(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT,
+            TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
+    _taskExecutor.registerMessageHandlerFactory(_asyncCallbackService, TaskExecutor.DEFAULT_PARALLEL_TASKS,
+        _taskThreadpoolResetTimeout);
   }
 
   @Override
@@ -335,6 +344,11 @@
     return _taskExecutor;
   }
 
+  @VisibleForTesting
+  int getTaskThreadpoolResetTimeout() {
+    return _taskThreadpoolResetTimeout;
+  }
+
   @Override
   // TODO if the manager is not Participant or Controller, no reply, so should fail immediately
   public int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback asyncCallback,
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 52467d8..80b70bb 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -144,8 +144,6 @@
   private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
   private static final String SESSION_SYNC = "SESSION-SYNC";
 
-  private static final int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200; // 200 ms
-
   /**
    * Map of MsgType->MsgHandlerFactoryRegistryItem
    */
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
index e9fa424..56dc004 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
@@ -25,6 +25,7 @@
 
 public interface TaskExecutor {
   int DEFAULT_PARALLEL_TASKS = 40;
+  int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200;
 
   /**
    * Register MultiType message handler factory that the executor can handle.
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
index 221152d..bf98d53 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -34,10 +34,12 @@
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
+import org.apache.helix.messaging.handling.TaskExecutor;
 import org.apache.helix.mock.MockManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
@@ -291,4 +293,15 @@
     Assert.assertTrue(
         svc.getMessageHandlerFactoryMap().containsKey(Message.MessageType.CONTROLLER_MSG.name()));
   }
+
+  @Test
+  public void testTaskThreadpoolResetTimeoutProperty() {
+    HelixManager manager = new MockManager();
+    System.setProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT, "300");
+    MockDefaultMessagingService svc = new MockDefaultMessagingService(manager);
+    Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), 300);
+    System.clearProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT);
+    svc = new MockDefaultMessagingService(new MockManager());
+    Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
+  }
 }