Implement DefaultCloudEventCallbackImpl (#1995)
Implement a default callback implementation for Helix cloud event listeners.
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
index d9aa3b2..f988902 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -19,13 +19,20 @@
* under the License.
*/
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.util.InstanceValidationUtil;
/**
* A default callback implementation class to be used in {@link HelixCloudEventListener}
*/
public class DefaultCloudEventCallbackImpl {
+ private final String _reason =
+ "Cloud event callback %s in class %s triggered in listener HelixManager %s, at time %s .";
+ protected final String _className = this.getClass().getSimpleName();
/**
* Disable the instance
@@ -33,8 +40,14 @@
* @param eventInfo Detailed information about the event
*/
public void disableInstance(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ if (InstanceValidationUtil
+ .isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) {
+ manager.getClusterManagmentTool()
+ .enableInstance(manager.getClusterName(), manager.getInstanceName(), false,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, String
+ .format(_reason, "disableInstance", _className, manager,
+ System.currentTimeMillis()));
+ }
}
/**
@@ -43,27 +56,48 @@
* @param eventInfo Detailed information about the event
*/
public void enableInstance(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ String instanceName = manager.getInstanceName();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ if (InstanceValidationUtil.getInstanceHelixDisabledType(accessor, instanceName)
+ .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())) {
+ manager.getClusterManagmentTool()
+ .enableInstance(manager.getClusterName(), instanceName, true);
+ }
}
/**
- *
+ * Put cluster into maintenance mode if the cluster is not currently in maintenance mode
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ if (!manager.getClusterManagmentTool().isInMaintenanceMode(manager.getClusterName())) {
+ manager.getClusterManagmentTool()
+ .manuallyEnableMaintenanceMode(manager.getClusterName(), true, String
+ .format(_reason, "enterMaintenanceMode", _className, manager,
+ System.currentTimeMillis()), null);
+ }
}
/**
- *
+ * Exit maintenance mode for the cluster, if there is no more live instances disabled for cloud event
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ List<String> instances =
+ manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
+ // Check if there is any disabled live instance that was disabled due to cloud event,
+ // if none left, exit maintenance mode
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ if (instances.stream().noneMatch(instance ->
+ InstanceValidationUtil.getInstanceHelixDisabledType(accessor, instance)
+ .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())
+ && InstanceValidationUtil.isAlive(accessor, instance))) {
+ manager.getClusterManagmentTool()
+ .manuallyEnableMaintenanceMode(manager.getClusterName(), false, String
+ .format(_reason, "exitMaintenanceMode", _className, manager,
+ System.currentTimeMillis()), null);
+ }
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
new file mode 100644
index 0000000..bb19ef5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
@@ -0,0 +1,114 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDefaultCloudEventCallbackImpl extends ZkStandAloneCMTestBase {
+ private final DefaultCloudEventCallbackImpl _impl =
+ DefaultCloudEventCallbackImpl.class.newInstance();
+ private MockParticipantManager _instanceManager;
+ private HelixAdmin _admin;
+
+ public TestDefaultCloudEventCallbackImpl() throws IllegalAccessException, InstantiationException {
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _instanceManager = _participants[0];
+ _admin = _instanceManager.getClusterManagmentTool();
+ }
+
+ @Test
+ public void testDisableInstance() {
+ Assert.assertTrue(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ _impl.disableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ Assert.assertEquals(_manager.getConfigAccessor()
+ .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
+ .getInstanceDisabledType(), InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
+
+ // Should not disable instance if it is already disabled due to other reasons
+ // And disabled type should remain unchanged
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
+ _impl.disableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ Assert.assertEquals(InstanceValidationUtil
+ .getInstanceHelixDisabledType(_manager.getHelixDataAccessor(),
+ _instanceManager.getInstanceName()),
+ InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
+ }
+
+ @Test(dependsOnMethods = "testDisableInstance")
+ public void testEnableInstance() {
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ // Should enable instance if the instance is disabled due to cloud event
+ _impl.enableInstance(_instanceManager, null);
+ Assert.assertTrue(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+
+ // Should not enable instance if it is not disabled due to cloud event
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
+ _impl.enableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ _admin.enableInstance(_instanceManager.getClusterName(), _instanceManager.getInstanceName(),
+ true);
+ }
+
+ @Test
+ public void testEnterMaintenanceMode() {
+ Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ _impl.enterMaintenanceMode(_instanceManager, null);
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ }
+
+ @Test(dependsOnMethods = "testEnterMaintenanceMode")
+ public void testExitMaintenanceMode() {
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ // Should not exit maintenance mode if there is remaining live instance that is disabled due to cloud event
+ _admin.enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
+ _impl.exitMaintenanceMode(_instanceManager, null);
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+
+ // Should exit maintenance mode if there is no remaining live instance that is disabled due to cloud event
+ _admin.enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false,
+ InstanceConstants.InstanceDisabledType.USER_OPERATION, null);
+ _impl.exitMaintenanceMode(_instanceManager, null);
+ Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ }
+}