Implement DefaultCloudEventCallbackImpl (#1995)

Implement a default callback implementation for Helix cloud event listeners.
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
index d9aa3b2..f988902 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -19,13 +19,20 @@
  * under the License.
  */
 
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.util.InstanceValidationUtil;
 
 /**
  * A default callback implementation class to be used in {@link HelixCloudEventListener}
  */
 public class DefaultCloudEventCallbackImpl {
+  private final String _reason =
+      "Cloud event callback %s in class %s triggered in listener HelixManager %s, at time %s .";
+  protected final String _className = this.getClass().getSimpleName();
 
   /**
    * Disable the instance
@@ -33,8 +40,14 @@
    * @param eventInfo Detailed information about the event
    */
   public void disableInstance(HelixManager manager, Object eventInfo) {
-    // To be implemented
-    throw new NotImplementedException();
+    if (InstanceValidationUtil
+        .isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) {
+      manager.getClusterManagmentTool()
+          .enableInstance(manager.getClusterName(), manager.getInstanceName(), false,
+              InstanceConstants.InstanceDisabledType.CLOUD_EVENT, String
+                  .format(_reason, "disableInstance", _className, manager,
+                      System.currentTimeMillis()));
+    }
   }
 
   /**
@@ -43,27 +56,48 @@
    * @param eventInfo Detailed information about the event
    */
   public void enableInstance(HelixManager manager, Object eventInfo) {
-    // To be implemented
-    throw new NotImplementedException();
+    String instanceName = manager.getInstanceName();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    if (InstanceValidationUtil.getInstanceHelixDisabledType(accessor, instanceName)
+        .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())) {
+      manager.getClusterManagmentTool()
+          .enableInstance(manager.getClusterName(), instanceName, true);
+    }
   }
 
   /**
-   *
+   * Put cluster into maintenance mode if the cluster is not currently in maintenance mode
    * @param manager The helix manager associated with the listener
    * @param eventInfo Detailed information about the event
    */
   public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
-    // To be implemented
-    throw new NotImplementedException();
+    if (!manager.getClusterManagmentTool().isInMaintenanceMode(manager.getClusterName())) {
+      manager.getClusterManagmentTool()
+          .manuallyEnableMaintenanceMode(manager.getClusterName(), true, String
+              .format(_reason, "enterMaintenanceMode", _className, manager,
+                  System.currentTimeMillis()), null);
+    }
   }
 
   /**
-   *
+   * Exit maintenance mode for the cluster, if there is no more live instances disabled for cloud event
    * @param manager The helix manager associated with the listener
    * @param eventInfo Detailed information about the event
    */
   public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
-    // To be implemented
-    throw new NotImplementedException();
+    List<String> instances =
+        manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
+    // Check if there is any disabled live instance that was disabled due to cloud event,
+    // if none left, exit maintenance mode
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    if (instances.stream().noneMatch(instance ->
+        InstanceValidationUtil.getInstanceHelixDisabledType(accessor, instance)
+            .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())
+            && InstanceValidationUtil.isAlive(accessor, instance))) {
+      manager.getClusterManagmentTool()
+          .manuallyEnableMaintenanceMode(manager.getClusterName(), false, String
+              .format(_reason, "exitMaintenanceMode", _className, manager,
+                  System.currentTimeMillis()), null);
+    }
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
new file mode 100644
index 0000000..bb19ef5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
@@ -0,0 +1,114 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDefaultCloudEventCallbackImpl extends ZkStandAloneCMTestBase {
+  private final DefaultCloudEventCallbackImpl _impl =
+      DefaultCloudEventCallbackImpl.class.newInstance();
+  private MockParticipantManager _instanceManager;
+  private HelixAdmin _admin;
+
+  public TestDefaultCloudEventCallbackImpl() throws IllegalAccessException, InstantiationException {
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _instanceManager = _participants[0];
+    _admin = _instanceManager.getClusterManagmentTool();
+  }
+
+  @Test
+  public void testDisableInstance() {
+    Assert.assertTrue(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    _impl.disableInstance(_instanceManager, null);
+    Assert.assertFalse(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    Assert.assertEquals(_manager.getConfigAccessor()
+        .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
+        .getInstanceDisabledType(), InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
+
+    // Should not disable instance if it is already disabled due to other reasons
+    // And disabled type should remain unchanged
+    _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
+    _impl.disableInstance(_instanceManager, null);
+    Assert.assertFalse(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    Assert.assertEquals(InstanceValidationUtil
+            .getInstanceHelixDisabledType(_manager.getHelixDataAccessor(),
+                _instanceManager.getInstanceName()),
+        InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+
+    _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false,
+        InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
+  }
+
+  @Test(dependsOnMethods = "testDisableInstance")
+  public void testEnableInstance() {
+    Assert.assertFalse(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    // Should enable instance if the instance is disabled due to cloud event
+    _impl.enableInstance(_instanceManager, null);
+    Assert.assertTrue(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+
+    // Should not enable instance if it is not disabled due to cloud event
+    _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
+    _impl.enableInstance(_instanceManager, null);
+    Assert.assertFalse(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    _admin.enableInstance(_instanceManager.getClusterName(), _instanceManager.getInstanceName(),
+        true);
+  }
+
+  @Test
+  public void testEnterMaintenanceMode() {
+    Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+    _impl.enterMaintenanceMode(_instanceManager, null);
+    Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+  }
+
+  @Test(dependsOnMethods = "testEnterMaintenanceMode")
+  public void testExitMaintenanceMode() {
+    Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+    // Should not exit maintenance mode if there is remaining live instance that is disabled due to cloud event
+    _admin.enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false,
+        InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
+    _impl.exitMaintenanceMode(_instanceManager, null);
+    Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+
+    // Should exit maintenance mode if there is no remaining live instance that is disabled due to cloud event
+    _admin.enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false,
+        InstanceConstants.InstanceDisabledType.USER_OPERATION, null);
+    _impl.exitMaintenanceMode(_instanceManager, null);
+    Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+  }
+}