Support register user defined CloudEventHandler when participant connect. (#2121)
This change will load user defined CloudEventHandler dynamically when ZkHelixManager.connect(). User need to implement a EventController and pass the class name when initiating ZkHelixManager.
diff --git a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
index 326f87d..db255b1 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
@@ -24,7 +24,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+
import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.cloud.event.CloudEventHandler;
import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
import org.apache.helix.model.CloudConfig;
import org.slf4j.Logger;
@@ -159,6 +161,14 @@
return _customizedCloudProperties;
}
+ public String getCloudEventHandlerClassName() {
+ String defaultHandler = CloudEventHandler.class.getName();
+ return getCloudEventCallbackProperty() == null ? defaultHandler
+ : getCloudEventCallbackProperty().getUserArgs().getOrDefault(
+ CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
+ defaultHandler);
+ }
+
public void setCloudEnabled(boolean isCloudEnabled) {
_isCloudEnabled = isCloudEnabled;
}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/AbstractEventHandler.java b/helix-core/src/main/java/org/apache/helix/cloud/event/AbstractEventHandler.java
new file mode 100644
index 0000000..0c8822d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/AbstractEventHandler.java
@@ -0,0 +1,29 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This class is the the interface for singleton eventHandler.
+ * User may implement their own eventHandler or use the default CloudEventHandler
+ */
+public interface AbstractEventHandler {
+ void registerCloudEventListener(CloudEventListener listener);
+ void unregisterCloudEventListener(CloudEventListener listener);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
index 5dcbe16..86688a1 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
@@ -33,7 +33,7 @@
* 3. PostEventHandlerCallback -> only one allowed
* to enable an easy management of event listeners and callbacks.
*/
-public class CloudEventHandler {
+public class CloudEventHandler implements AbstractEventHandler {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandler.class.getName());
private List<CloudEventListener> _unorderedEventListenerList = new ArrayList<>();
private Optional<CloudEventListener> _preEventHandlerCallback = Optional.empty();
@@ -44,6 +44,7 @@
* If no listener type is specified, register as an unordered listener.
* @param listener
*/
+ @Override
public void registerCloudEventListener(CloudEventListener listener) {
if (listener != null) {
switch (listener.getListenerType()) {
@@ -65,6 +66,7 @@
* Unregister an event listener to the event handler.
* @param listener
*/
+ @Override
public void unregisterCloudEventListener(CloudEventListener listener) {
_unorderedEventListenerList.remove(listener);
}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandlerFactory.java
index 1b1a53c..8cab4f4 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandlerFactory.java
@@ -19,27 +19,39 @@
* under the License.
*/
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
/**
- * This class is the factory for singleton class {@link CloudEventHandler}
+ * This class is the factory for singleton class {@link AbstractEventHandler}
*/
public class CloudEventHandlerFactory {
- private static CloudEventHandler INSTANCE = null;
+ private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandlerFactory.class);
+ private static Map<String, AbstractEventHandler> INSTANCE_MAP = new HashMap();
private CloudEventHandlerFactory() {
}
/**
- * Get a CloudEventHandler instance
+ * Get an instance of AbstractEventHandler implementation.
* @return
*/
- public static CloudEventHandler getInstance() {
- if (INSTANCE == null) {
- synchronized (CloudEventHandlerFactory.class) {
- if (INSTANCE == null) {
- INSTANCE = new CloudEventHandler();
- }
+ public static AbstractEventHandler getInstance(String eventHandlerClassName)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ synchronized (CloudEventHandlerFactory.class) {
+ AbstractEventHandler instance = INSTANCE_MAP.get(eventHandlerClassName);
+ if (instance == null) {
+ LOG.info("Initiating an object of {}", eventHandlerClassName);
+ instance = (AbstractEventHandler) (HelixUtil
+ .loadClass(AbstractEventHandler.class, eventHandlerClassName)).newInstance();
+ INSTANCE_MAP.put(eventHandlerClassName, instance);
}
+ return instance;
}
- return INSTANCE;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
index 1888eb6..6c27d2d 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
@@ -56,6 +56,7 @@
*/
public static class UserArgsInputKey {
public static final String CALLBACK_IMPL_CLASS_NAME = "callbackImplClassName";
+ public static final String CLOUD_EVENT_HANDLER_CLASS_NAME = "cloudEventHandlerClassName";
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 63ec08f..cbb2877 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -832,7 +832,10 @@
if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
_cloudEventListener =
new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
- CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
+ CloudEventHandlerFactory.getInstance(
+ _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
+ .registerCloudEventListener(_cloudEventListener);
+ LOG.info("Using handler: " + helixCloudProperty.getCloudEventHandlerClassName());
}
}
}
@@ -881,7 +884,13 @@
_helixPropertyStore = null;
if (_cloudEventListener != null) {
- CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
+ try {
+ CloudEventHandlerFactory.getInstance(
+ _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
+ .unregisterCloudEventListener(_cloudEventListener);
+ } catch (Exception e) {
+ LOG.error("Failed to unregister cloudEventListener.", e);
+ }
_cloudEventListener = null;
}
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/HelixTestCloudEventHandler.java b/helix-core/src/test/java/org/apache/helix/cloud/event/HelixTestCloudEventHandler.java
new file mode 100644
index 0000000..1355623
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/HelixTestCloudEventHandler.java
@@ -0,0 +1,42 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class HelixTestCloudEventHandler extends CloudEventHandler {
+ private static final int TIMEOUT = 900; // second to timeout
+ private static ExecutorService executorService = Executors.newSingleThreadExecutor();
+ public static boolean anyListenerIsRegisterFlag = false;
+
+ @Override
+ public void registerCloudEventListener(CloudEventListener listener) {
+ super.registerCloudEventListener(listener);
+ anyListenerIsRegisterFlag = true;
+ }
+
+ @Override
+ public void unregisterCloudEventListener(CloudEventListener listener) {
+ super.unregisterCloudEventListener(listener);
+ anyListenerIsRegisterFlag = false;
+ }
+
+}
\ No newline at end of file
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
index 772a9b9..de46257 100644
--- a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
@@ -75,21 +75,33 @@
.build());
}
- public void connect() throws IllegalAccessException, InstantiationException {
+ @Override
+ public void connect()
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException {
if (_helixManagerProperty != null) {
HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty();
if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
_cloudEventListener =
new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
- CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
+ System.out.println("Using handler: " + helixCloudProperty.getCloudEventHandlerClassName());
+ CloudEventHandlerFactory.getInstance(
+ _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
+ .registerCloudEventListener(_cloudEventListener);
}
}
}
+ @Override
public void disconnect() {
if (_cloudEventListener != null) {
- CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
- _cloudEventListener = null;
+ try {
+ CloudEventHandlerFactory.getInstance(
+ _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
+ .unregisterCloudEventListener(_cloudEventListener);
+ } catch (Exception e) {
+ System.out.println("Failed to unregister cloudEventListener." );
+ e.printStackTrace();
+ }
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
index 7207cbc..1f345bc 100644
--- a/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
@@ -20,6 +20,8 @@
*/
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixManager;
@@ -76,16 +78,19 @@
@Test
public void testOptionalHelixOperation() throws Exception {
// Cloud event callback property
- CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
- .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
- MockCloudEventCallbackImpl.class.getCanonicalName()));
+ Map<String, String> paramMap = new HashMap<>();
+ paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+ MockCloudEventCallbackImpl.class.getCanonicalName());
+ paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
+ HelixTestCloudEventHandler.class.getCanonicalName());
+ CloudEventCallbackProperty property = new CloudEventCallbackProperty(paramMap);
property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true);
_cloudProperty.setCloudEventCallbackProperty(property);
_helixManager.connect();
// Manually trigger event
- CloudEventHandlerFactory.getInstance()
+ ((CloudEventHandler)CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
Assert.assertTrue(
callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -101,7 +106,7 @@
MockCloudEventCallbackImpl.triggeredOperation.clear();
// Manually trigger event
- CloudEventHandlerFactory.getInstance()
+ ((CloudEventHandler)CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
Assert.assertTrue(
callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -115,7 +120,7 @@
MockCloudEventCallbackImpl.triggeredOperation.clear();
// Manually trigger event
- CloudEventHandlerFactory.getInstance()
+ ((CloudEventHandler) CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
Assert.assertFalse(
callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -160,7 +165,7 @@
});
// Manually trigger event
- CloudEventHandlerFactory.getInstance()
+ ((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
@@ -169,7 +174,7 @@
MockCloudEventCallbackImpl.triggeredOperation.clear();
- CloudEventHandlerFactory.getInstance()
+ ((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
@@ -186,9 +191,44 @@
_cloudProperty.setCloudEventCallbackProperty(property);
_helixManager.connect();
+ }
+
+ @Test
+ public void testRegisterAndUnregister() throws Exception {
+ // Cloud event callback property
+ Map<String, String> paramMap = new HashMap<>();
+ paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+ MockCloudEventCallbackImpl.class.getCanonicalName());
+ paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
+ HelixTestCloudEventHandler.class.getCanonicalName());
+ CloudEventCallbackProperty property = new CloudEventCallbackProperty(paramMap);
+ property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true);
+ _cloudProperty.setCloudEventCallbackProperty(property);
+
+ _helixManager.connect();
+
+ Assert.assertTrue(HelixTestCloudEventHandler.anyListenerIsRegisterFlag);
+
+ _helixManager.disconnect();
+ Assert.assertFalse(HelixTestCloudEventHandler.anyListenerIsRegisterFlag);
+ }
+
+ @Test
+ public void testUsingInvalidHandlerClassName() throws Exception {
+ // Cloud event callback property
+ CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
+ .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
+ "org.apache.helix.cloud.InvalidClassName"));
+ _cloudProperty.setCloudEventCallbackProperty(property);
+
+ try {
+ _helixManager.connect();}
+ catch (Exception ex){
+ Assert.assertEquals(ex.getClass(), java.lang.ClassNotFoundException.class);
+ }
// Manually trigger event
- CloudEventHandlerFactory.getInstance()
+ ((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
}