Support register user defined CloudEventHandler when participant connect. (#2121)

This change will load user defined CloudEventHandler dynamically when ZkHelixManager.connect(). User need to implement a EventController and pass the class name when initiating ZkHelixManager.
diff --git a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
index 326f87d..db255b1 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
@@ -24,7 +24,9 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+
 import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.cloud.event.CloudEventHandler;
 import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
 import org.apache.helix.model.CloudConfig;
 import org.slf4j.Logger;
@@ -159,6 +161,14 @@
     return _customizedCloudProperties;
   }
 
+  public String getCloudEventHandlerClassName() {
+    String defaultHandler = CloudEventHandler.class.getName();
+    return getCloudEventCallbackProperty() == null ? defaultHandler
+        : getCloudEventCallbackProperty().getUserArgs().getOrDefault(
+            CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
+            defaultHandler);
+  }
+
   public void setCloudEnabled(boolean isCloudEnabled) {
     _isCloudEnabled = isCloudEnabled;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/AbstractEventHandler.java b/helix-core/src/main/java/org/apache/helix/cloud/event/AbstractEventHandler.java
new file mode 100644
index 0000000..0c8822d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/AbstractEventHandler.java
@@ -0,0 +1,29 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This class is the the interface for singleton eventHandler.
+ * User may implement their own eventHandler or use the default CloudEventHandler
+ */
+public interface AbstractEventHandler {
+  void registerCloudEventListener(CloudEventListener listener);
+  void unregisterCloudEventListener(CloudEventListener listener);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
index 5dcbe16..86688a1 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
@@ -33,7 +33,7 @@
  * 3. PostEventHandlerCallback -> only one allowed
  * to enable an easy management of event listeners and callbacks.
  */
-public class CloudEventHandler {
+public class CloudEventHandler implements AbstractEventHandler {
   private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandler.class.getName());
   private List<CloudEventListener> _unorderedEventListenerList = new ArrayList<>();
   private Optional<CloudEventListener> _preEventHandlerCallback = Optional.empty();
@@ -44,6 +44,7 @@
    * If no listener type is specified, register as an unordered listener.
    * @param listener
    */
+  @Override
   public void registerCloudEventListener(CloudEventListener listener) {
     if (listener != null) {
       switch (listener.getListenerType()) {
@@ -65,6 +66,7 @@
    * Unregister an event listener to the event handler.
    * @param listener
    */
+  @Override
   public void unregisterCloudEventListener(CloudEventListener listener) {
     _unorderedEventListenerList.remove(listener);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandlerFactory.java
index 1b1a53c..8cab4f4 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandlerFactory.java
@@ -19,27 +19,39 @@
  * under the License.
  */
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
 /**
- * This class is the factory for singleton class {@link CloudEventHandler}
+ * This class is the factory for singleton class {@link AbstractEventHandler}
  */
 public class CloudEventHandlerFactory {
-  private static CloudEventHandler INSTANCE = null;
+  private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandlerFactory.class);
+  private static Map<String, AbstractEventHandler> INSTANCE_MAP = new HashMap();
 
   private CloudEventHandlerFactory() {
   }
 
   /**
-   * Get a CloudEventHandler instance
+   * Get an instance of AbstractEventHandler implementation.
    * @return
    */
-  public static CloudEventHandler getInstance() {
-    if (INSTANCE == null) {
-      synchronized (CloudEventHandlerFactory.class) {
-        if (INSTANCE == null) {
-          INSTANCE = new CloudEventHandler();
-        }
+  public static AbstractEventHandler getInstance(String eventHandlerClassName)
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+    synchronized (CloudEventHandlerFactory.class) {
+      AbstractEventHandler instance = INSTANCE_MAP.get(eventHandlerClassName);
+      if (instance == null) {
+        LOG.info("Initiating an object of {}", eventHandlerClassName);
+        instance = (AbstractEventHandler) (HelixUtil
+            .loadClass(AbstractEventHandler.class, eventHandlerClassName)).newInstance();
+        INSTANCE_MAP.put(eventHandlerClassName, instance);
       }
+      return instance;
     }
-    return INSTANCE;
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
index 1888eb6..6c27d2d 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
@@ -56,6 +56,7 @@
    */
   public static class UserArgsInputKey {
     public static final String CALLBACK_IMPL_CLASS_NAME = "callbackImplClassName";
+    public static final String CLOUD_EVENT_HANDLER_CLASS_NAME = "cloudEventHandlerClassName";
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 63ec08f..cbb2877 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -832,7 +832,10 @@
       if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
         _cloudEventListener =
             new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
-        CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
+        CloudEventHandlerFactory.getInstance(
+            _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
+            .registerCloudEventListener(_cloudEventListener);
+        LOG.info("Using handler: " + helixCloudProperty.getCloudEventHandlerClassName());
       }
     }
   }
@@ -881,7 +884,13 @@
       _helixPropertyStore = null;
 
       if (_cloudEventListener != null) {
-        CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
+        try {
+          CloudEventHandlerFactory.getInstance(
+              _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
+              .unregisterCloudEventListener(_cloudEventListener);
+        } catch (Exception e) {
+          LOG.error("Failed to unregister cloudEventListener.", e);
+        }
         _cloudEventListener = null;
       }
 
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/HelixTestCloudEventHandler.java b/helix-core/src/test/java/org/apache/helix/cloud/event/HelixTestCloudEventHandler.java
new file mode 100644
index 0000000..1355623
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/HelixTestCloudEventHandler.java
@@ -0,0 +1,42 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class HelixTestCloudEventHandler extends CloudEventHandler {
+  private static final int TIMEOUT = 900;  // second to timeout
+  private static ExecutorService executorService = Executors.newSingleThreadExecutor();
+  public static boolean anyListenerIsRegisterFlag = false;
+
+  @Override
+  public void registerCloudEventListener(CloudEventListener listener) {
+    super.registerCloudEventListener(listener);
+    anyListenerIsRegisterFlag = true;
+  }
+
+  @Override
+  public void unregisterCloudEventListener(CloudEventListener listener) {
+    super.unregisterCloudEventListener(listener);
+    anyListenerIsRegisterFlag = false;
+  }
+
+}
\ No newline at end of file
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
index 772a9b9..de46257 100644
--- a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
@@ -75,21 +75,33 @@
             .build());
   }
 
-  public void connect() throws IllegalAccessException, InstantiationException {
+  @Override
+  public void connect()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
     if (_helixManagerProperty != null) {
       HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty();
       if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
         _cloudEventListener =
             new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
-        CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
+        System.out.println("Using handler: " + helixCloudProperty.getCloudEventHandlerClassName());
+        CloudEventHandlerFactory.getInstance(
+            _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
+            .registerCloudEventListener(_cloudEventListener);
       }
     }
   }
 
+  @Override
   public void disconnect() {
     if (_cloudEventListener != null) {
-      CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
-      _cloudEventListener = null;
+      try {
+        CloudEventHandlerFactory.getInstance(
+            _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
+            .unregisterCloudEventListener(_cloudEventListener);
+      } catch (Exception e) {
+        System.out.println("Failed to unregister cloudEventListener." );
+        e.printStackTrace();
+      }
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
index 7207cbc..1f345bc 100644
--- a/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
@@ -20,6 +20,8 @@
  */
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.HelixManager;
@@ -76,16 +78,19 @@
   @Test
   public void testOptionalHelixOperation() throws Exception {
     // Cloud event callback property
-    CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
-        .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
-            MockCloudEventCallbackImpl.class.getCanonicalName()));
+    Map<String, String> paramMap = new HashMap<>();
+    paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+        MockCloudEventCallbackImpl.class.getCanonicalName());
+    paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
+        HelixTestCloudEventHandler.class.getCanonicalName());
+    CloudEventCallbackProperty property = new CloudEventCallbackProperty(paramMap);
     property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true);
     _cloudProperty.setCloudEventCallbackProperty(property);
 
     _helixManager.connect();
 
     // Manually trigger event
-    CloudEventHandlerFactory.getInstance()
+    ((CloudEventHandler)CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
         .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
     Assert.assertTrue(
         callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -101,7 +106,7 @@
     MockCloudEventCallbackImpl.triggeredOperation.clear();
 
     // Manually trigger event
-    CloudEventHandlerFactory.getInstance()
+    ((CloudEventHandler)CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
         .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
     Assert.assertTrue(
         callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -115,7 +120,7 @@
     MockCloudEventCallbackImpl.triggeredOperation.clear();
 
     // Manually trigger event
-    CloudEventHandlerFactory.getInstance()
+    ((CloudEventHandler) CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
         .performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
     Assert.assertFalse(
         callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -160,7 +165,7 @@
         });
 
     // Manually trigger event
-    CloudEventHandlerFactory.getInstance()
+    ((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
         .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
     Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
     Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
@@ -169,7 +174,7 @@
 
     MockCloudEventCallbackImpl.triggeredOperation.clear();
 
-    CloudEventHandlerFactory.getInstance()
+    ((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
         .performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
     Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
     Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
@@ -186,9 +191,44 @@
     _cloudProperty.setCloudEventCallbackProperty(property);
 
     _helixManager.connect();
+  }
+
+  @Test
+  public void testRegisterAndUnregister() throws Exception {
+    // Cloud event callback property
+    Map<String, String> paramMap = new HashMap<>();
+    paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+        MockCloudEventCallbackImpl.class.getCanonicalName());
+    paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
+        HelixTestCloudEventHandler.class.getCanonicalName());
+    CloudEventCallbackProperty property = new CloudEventCallbackProperty(paramMap);
+    property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true);
+    _cloudProperty.setCloudEventCallbackProperty(property);
+
+    _helixManager.connect();
+
+    Assert.assertTrue(HelixTestCloudEventHandler.anyListenerIsRegisterFlag);
+
+    _helixManager.disconnect();
+    Assert.assertFalse(HelixTestCloudEventHandler.anyListenerIsRegisterFlag);
+  }
+
+  @Test
+  public void testUsingInvalidHandlerClassName() throws Exception {
+    // Cloud event callback property
+    CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
+        .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
+            "org.apache.helix.cloud.InvalidClassName"));
+    _cloudProperty.setCloudEventCallbackProperty(property);
+
+    try {
+    _helixManager.connect();}
+    catch (Exception ex){
+      Assert.assertEquals(ex.getClass(), java.lang.ClassNotFoundException.class);
+    }
 
     // Manually trigger event
-    CloudEventHandlerFactory.getInstance()
+    ((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
         .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
   }