Add REST APIs for management mode (#1807)

This commit adds JAVA and REST APIs to set cluster freeze mode and get the management mode status
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index d403571..05bd60e 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -23,6 +23,8 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.api.topology.ClusterTopology;
 import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConstraints;
@@ -37,8 +39,6 @@
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.api.status.ClusterManagementMode;
-import org.apache.helix.api.status.ClusterManagementModeRequest;
 
 /*
  * Helix cluster management
@@ -382,6 +382,15 @@
   void setClusterManagementMode(ClusterManagementModeRequest request);
 
   /**
+   * Gets cluster management status {@link ClusterManagementMode}: what mode the cluster is and
+   * whether the cluster has fully reached to that mode.
+   *
+   * @param clusterName cluster name
+   * @return {@link ClusterManagementMode}
+   */
+  ClusterManagementMode getClusterManagementMode(String clusterName);
+
+  /**
    * Reset a list of partitions in error state for an instance
    * The partitions are assume to be in error state and reset will bring them from error
    * to initial state. An error to initial state transition is required for reset.
diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
index d7a1637..cbd2019 100644
--- a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
+++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
@@ -55,6 +55,12 @@
     private final Type mode;
     private final Status status;
 
+    // Default constructor for json deserialization
+    private ClusterManagementMode() {
+        mode = null;
+        status = null;
+    }
+
     public ClusterManagementMode(Type mode, Status status) {
         this.mode = mode;
         this.status = status;
diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java
index dd2fe58..f3d9b07 100644
--- a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java
+++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java
@@ -19,11 +19,14 @@
  * under the License.
  */
 
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
 import com.google.common.base.Preconditions;
 
 /**
  * Represents a request to set the cluster management mode {@link ClusterManagementMode}
  */
+@JsonDeserialize(builder = ClusterManagementModeRequest.Builder.class)
 public class ClusterManagementModeRequest {
   private final ClusterManagementMode.Type _mode;
   private final String _clusterName;
@@ -57,6 +60,7 @@
     return new Builder();
   }
 
+  @JsonPOJOBuilder(buildMethodName = "buildFromJson")
   public static final class Builder {
     private ClusterManagementMode.Type mode;
     private String clusterName;
@@ -94,6 +98,12 @@
       return new ClusterManagementModeRequest(this);
     }
 
+    // Used by Json deserializer
+    private ClusterManagementModeRequest buildFromJson() {
+      Preconditions.checkNotNull(mode, "Mode not set");
+      return new ClusterManagementModeRequest((this));
+    }
+
     private void validate() {
       Preconditions.checkNotNull(mode, "Mode not set");
       Preconditions.checkNotNull(clusterName, "Cluster name not set");
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6cc1e5f..00bdfcc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -83,6 +83,7 @@
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
 import org.apache.helix.controller.stages.ManagementModeStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 2545139..73c699e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -54,6 +54,7 @@
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.exceptions.HelixConflictException;
 import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.api.topology.ClusterTopology;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
@@ -62,6 +63,7 @@
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterStatus;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.CurrentState;
@@ -81,7 +83,6 @@
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.HelixUtil;
@@ -521,6 +522,15 @@
     }
   }
 
+  @Override
+  public ClusterManagementMode getClusterManagementMode(String clusterName) {
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
+    ClusterStatus status = accessor.getProperty(accessor.keyBuilder().clusterStatus());
+    return status == null ? null
+        : new ClusterManagementMode(status.getManagementMode(), status.getManagementModeStatus());
+  }
+
   private void enableClusterPauseMode(String clusterName, boolean cancelPendingST, String reason) {
     String hostname = NetworkUtil.getLocalhostName();
     logger.info(
@@ -533,9 +543,6 @@
     if (baseDataAccessor.exists(accessor.keyBuilder().pause().getPath(), AccessOption.PERSISTENT)) {
       throw new HelixConflictException(clusterName + " pause signal already exists");
     }
-    if (baseDataAccessor.exists(accessor.keyBuilder().maintenance().getPath(), AccessOption.PERSISTENT)) {
-      throw new HelixConflictException(clusterName + " maintenance signal already exists");
-    }
 
     // check whether cancellation is enabled
     ClusterConfig config = accessor.getProperty(accessor.keyBuilder().clusterConfig());
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 2bacc74..f7a1034 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -30,6 +30,7 @@
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
+import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.api.topology.ClusterTopology;
 import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
@@ -331,6 +332,11 @@
 
   }
 
+  @Override
+  public ClusterManagementMode getClusterManagementMode(String clusterName) {
+    return null;
+  }
+
   @Override public void resetPartition(String clusterName, String instanceName, String resourceName,
       List<String> partitionNames) {
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index 78010b9..9efcdc7 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -22,8 +22,10 @@
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -36,17 +38,22 @@
 
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.api.exceptions.HelixConflictException;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
@@ -66,6 +73,7 @@
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -300,6 +308,113 @@
   @ResponseMetered(name = HttpConstants.READ_REQUEST)
   @Timed(name = HttpConstants.READ_REQUEST)
   @GET
+  @Path("{clusterId}/management-mode")
+  public Response getClusterManagementMode(@PathParam("clusterId") String clusterId,
+      @QueryParam("showDetails") boolean showDetails) {
+    ClusterManagementMode mode = getHelixAdmin().getClusterManagementMode(clusterId);
+    if (mode == null) {
+      return notFound("Cluster " + clusterId + " is not in management mode");
+    }
+
+    Map<String, Object> responseMap = new HashMap<>();
+    responseMap.put("cluster", clusterId);
+    responseMap.put("mode", mode.getMode());
+    responseMap.put("status", mode.getStatus());
+    if (showDetails) {
+      // To show details, query participants that are in progress to management mode.
+      responseMap.put("details", getManagementModeDetails(clusterId, mode));
+    }
+
+    return JSONRepresentation(responseMap);
+  }
+
+  private Map<String, Object> getManagementModeDetails(String clusterId,
+      ClusterManagementMode mode) {
+    Map<String, Object> details = new HashMap<>();
+    Map<String, Object> participantDetails = new HashMap<>();
+    ClusterManagementMode.Status status = mode.getStatus();
+    details.put("cluster", ImmutableMap.of("cluster", clusterId, "status", status.name()));
+
+    boolean hasPendingST = false;
+    Set<String> liveInstancesInProgress = new HashSet<>();
+
+    if (ClusterManagementMode.Status.IN_PROGRESS.equals(status)) {
+      HelixDataAccessor accessor = getDataAccssor(clusterId);
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
+      BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+      if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())) {
+        // Entering cluster freeze mode, check live instance freeze status and pending ST
+        for (LiveInstance liveInstance : liveInstances) {
+          String instanceName = liveInstance.getInstanceName();
+          if (!LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstance.getStatus())) {
+            liveInstancesInProgress.add(instanceName);
+          }
+          Stat stat = baseAccessor
+              .getStat(keyBuilder.messages(instanceName).getPath(), AccessOption.PERSISTENT);
+          if (stat.getNumChildren() > 0) {
+            hasPendingST = true;
+            liveInstancesInProgress.add(instanceName);
+          }
+        }
+      } else if (ClusterManagementMode.Type.NORMAL.equals(mode.getMode())) {
+        // Exiting freeze mode, check live instance unfreeze status
+        for (LiveInstance liveInstance : liveInstances) {
+          if (LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstance.getStatus())) {
+            liveInstancesInProgress.add(liveInstance.getInstanceName());
+          }
+        }
+      }
+    }
+
+    participantDetails.put("status", status.name());
+    participantDetails.put("liveInstancesInProgress", liveInstancesInProgress);
+    if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())) {
+      // Add pending ST result for cluster freeze mode
+      participantDetails.put("hasPendingStateTransition", hasPendingST);
+    }
+
+    details.put(ClusterProperties.liveInstances.name(), participantDetails);
+    return details;
+  }
+
+  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+  @Timed(name = HttpConstants.WRITE_REQUEST)
+  @POST
+  @Path("{clusterId}/management-mode")
+  public Response updateClusterManagementMode(@PathParam("clusterId") String clusterId,
+      @DefaultValue("{}") String content) {
+    ClusterManagementModeRequest request;
+    try {
+      request = OBJECT_MAPPER.readerFor(ClusterManagementModeRequest.class).readValue(content);
+    } catch (JsonProcessingException e) {
+      LOG.warn("Failed to parse json string: {}", content, e);
+      return badRequest("Invalid payload json body: " + content);
+    }
+
+    // Need to add cluster name
+    request = ClusterManagementModeRequest.newBuilder()
+        .withClusterName(clusterId)
+        .withMode(request.getMode())
+        .withCancelPendingST(request.isCancelPendingST())
+        .withReason(request.getReason())
+        .build();
+
+    try {
+      getHelixAdmin().setClusterManagementMode(request);
+    } catch (HelixConflictException e) {
+      return Response.status(Response.Status.CONFLICT).entity(e.getMessage()).build();
+    } catch (HelixException e) {
+      return serverError(e.getMessage());
+    }
+
+    return JSONRepresentation(ImmutableMap.of("acknowledged", true));
+  }
+
+  @ResponseMetered(name = HttpConstants.READ_REQUEST)
+  @Timed(name = HttpConstants.READ_REQUEST)
+  @GET
   @Path("{clusterId}/configs")
   public Response getClusterConfig(@PathParam("clusterId") String clusterId) {
     ConfigAccessor accessor = getConfigAccessor();
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index cca5284..8d2de70 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -36,16 +36,20 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableMap;
 import com.sun.research.ws.wadl.HTTPMethods;
+import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.cloud.azure.AzureConstants;
 import org.apache.helix.cloud.constants.CloudProvider;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CustomizedStateConfig;
@@ -54,6 +58,7 @@
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.RESTConfig;
 import org.apache.helix.rest.common.HelixRestNamespace;
 import org.apache.helix.rest.server.auditlog.AuditLog;
@@ -1285,6 +1290,71 @@
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test
+  public void testClusterFreezeMode() throws Exception {
+    String cluster = _clusters.iterator().next();
+    HelixDataAccessor dataAccessor =
+        new ZKHelixDataAccessor(cluster, new ZkBaseDataAccessor<>(_gZkClient));
+    // Pause not existed
+    Assert.assertNull(dataAccessor.getProperty(dataAccessor.keyBuilder().pause()));
+
+    String endpoint = "clusters/" + cluster + "/management-mode";
+
+    // Set cluster pause mode
+    ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+        .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+        .withClusterName(cluster)
+        .build();
+    String payload = OBJECT_MAPPER.writeValueAsString(request);
+    post(endpoint, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+
+    PauseSignal pauseSignal = dataAccessor.getProperty(dataAccessor.keyBuilder().pause());
+    Assert.assertNotNull(pauseSignal);
+    Assert.assertTrue(pauseSignal.isClusterPause());
+    Assert.assertFalse(pauseSignal.getCancelPendingST());
+
+    // Wait until cluster status is persisted
+    TestHelper.verify(() -> dataAccessor.getBaseDataAccessor()
+            .exists(dataAccessor.keyBuilder().clusterStatus().getPath(), AccessOption.PERSISTENT),
+        TestHelper.WAIT_DURATION);
+
+    // Verify get cluster status
+    String body = get(endpoint, null, Response.Status.OK.getStatusCode(), true);
+    ClusterManagementMode mode =
+        OBJECT_MAPPER.readerFor(ClusterManagementMode.class).readValue(body);
+    Assert.assertEquals(mode.getMode(), ClusterManagementMode.Type.CLUSTER_PAUSE);
+    // Depending on timing, it could IN_PROGRESS or COMPLETED.
+    // It's just to verify the rest response format is correct
+    Assert.assertTrue(ClusterManagementMode.Status.IN_PROGRESS.equals(mode.getStatus())
+        || ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus()));
+
+    body = get(endpoint, ImmutableMap.of("showDetails", "true"), Response.Status.OK.getStatusCode(),
+        true);
+    Map<String, Object> responseMap = OBJECT_MAPPER.readerFor(Map.class).readValue(body);
+    Map<String, Object> detailsMap = (Map<String, Object>) responseMap.get("details");
+
+    Assert.assertEquals(responseMap.get("cluster"), cluster);
+    Assert.assertEquals(responseMap.get("mode"), mode.getMode().name());
+    Assert.assertEquals(responseMap.get("status"), mode.getStatus().name());
+    Assert.assertTrue(responseMap.containsKey("details"));
+    Assert.assertTrue(detailsMap.containsKey("cluster"));
+    Assert.assertTrue(detailsMap.containsKey("liveInstances"));
+
+    // set normal mode
+    request = ClusterManagementModeRequest.newBuilder()
+        .withMode(ClusterManagementMode.Type.NORMAL)
+        .withClusterName(cluster)
+        .build();
+    payload = OBJECT_MAPPER.writeValueAsString(request);
+    post(endpoint, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+
+    // Pause signal is deleted
+    pauseSignal = dataAccessor.getProperty(dataAccessor.keyBuilder().pause());
+    Assert.assertNull(pauseSignal);
+  }
+
   private ClusterConfig getClusterConfigFromRest(String cluster) throws IOException {
     String body = get("clusters/" + cluster + "/configs", null, Response.Status.OK.getStatusCode(), true);