Add REST APIs for management mode (#1807)
This commit adds JAVA and REST APIs to set cluster freeze mode and get the management mode status
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index d403571..05bd60e 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConstraints;
@@ -37,8 +39,6 @@
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.api.status.ClusterManagementMode;
-import org.apache.helix.api.status.ClusterManagementModeRequest;
/*
* Helix cluster management
@@ -382,6 +382,15 @@
void setClusterManagementMode(ClusterManagementModeRequest request);
/**
+ * Gets cluster management status {@link ClusterManagementMode}: what mode the cluster is and
+ * whether the cluster has fully reached to that mode.
+ *
+ * @param clusterName cluster name
+ * @return {@link ClusterManagementMode}
+ */
+ ClusterManagementMode getClusterManagementMode(String clusterName);
+
+ /**
* Reset a list of partitions in error state for an instance
* The partitions are assume to be in error state and reset will bring them from error
* to initial state. An error to initial state transition is required for reset.
diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
index d7a1637..cbd2019 100644
--- a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
+++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
@@ -55,6 +55,12 @@
private final Type mode;
private final Status status;
+ // Default constructor for json deserialization
+ private ClusterManagementMode() {
+ mode = null;
+ status = null;
+ }
+
public ClusterManagementMode(Type mode, Status status) {
this.mode = mode;
this.status = status;
diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java
index dd2fe58..f3d9b07 100644
--- a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java
+++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java
@@ -19,11 +19,14 @@
* under the License.
*/
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.base.Preconditions;
/**
* Represents a request to set the cluster management mode {@link ClusterManagementMode}
*/
+@JsonDeserialize(builder = ClusterManagementModeRequest.Builder.class)
public class ClusterManagementModeRequest {
private final ClusterManagementMode.Type _mode;
private final String _clusterName;
@@ -57,6 +60,7 @@
return new Builder();
}
+ @JsonPOJOBuilder(buildMethodName = "buildFromJson")
public static final class Builder {
private ClusterManagementMode.Type mode;
private String clusterName;
@@ -94,6 +98,12 @@
return new ClusterManagementModeRequest(this);
}
+ // Used by Json deserializer
+ private ClusterManagementModeRequest buildFromJson() {
+ Preconditions.checkNotNull(mode, "Mode not set");
+ return new ClusterManagementModeRequest((this));
+ }
+
private void validate() {
Preconditions.checkNotNull(mode, "Mode not set");
Preconditions.checkNotNull(clusterName, "Cluster name not set");
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6cc1e5f..00bdfcc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -83,6 +83,7 @@
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
import org.apache.helix.controller.stages.ManagementModeStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.PersistAssignmentStage;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 2545139..73c699e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -54,6 +54,7 @@
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.exceptions.HelixConflictException;
import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
@@ -62,6 +63,7 @@
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterStatus;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.CurrentState;
@@ -81,7 +83,6 @@
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.util.HelixUtil;
@@ -521,6 +522,15 @@
}
}
+ @Override
+ public ClusterManagementMode getClusterManagementMode(String clusterName) {
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
+ ClusterStatus status = accessor.getProperty(accessor.keyBuilder().clusterStatus());
+ return status == null ? null
+ : new ClusterManagementMode(status.getManagementMode(), status.getManagementModeStatus());
+ }
+
private void enableClusterPauseMode(String clusterName, boolean cancelPendingST, String reason) {
String hostname = NetworkUtil.getLocalhostName();
logger.info(
@@ -533,9 +543,6 @@
if (baseDataAccessor.exists(accessor.keyBuilder().pause().getPath(), AccessOption.PERSISTENT)) {
throw new HelixConflictException(clusterName + " pause signal already exists");
}
- if (baseDataAccessor.exists(accessor.keyBuilder().maintenance().getPath(), AccessOption.PERSISTENT)) {
- throw new HelixConflictException(clusterName + " maintenance signal already exists");
- }
// check whether cancellation is enabled
ClusterConfig config = accessor.getProperty(accessor.keyBuilder().clusterConfig());
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 2bacc74..f7a1034 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -30,6 +30,7 @@
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
+import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
@@ -331,6 +332,11 @@
}
+ @Override
+ public ClusterManagementMode getClusterManagementMode(String clusterName) {
+ return null;
+ }
+
@Override public void resetPartition(String clusterName, String instanceName, String resourceName,
List<String> partitionNames) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index 78010b9..9efcdc7 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -22,8 +22,10 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -36,17 +38,22 @@
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.api.exceptions.HelixConflictException;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
@@ -66,6 +73,7 @@
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -300,6 +308,113 @@
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
+ @Path("{clusterId}/management-mode")
+ public Response getClusterManagementMode(@PathParam("clusterId") String clusterId,
+ @QueryParam("showDetails") boolean showDetails) {
+ ClusterManagementMode mode = getHelixAdmin().getClusterManagementMode(clusterId);
+ if (mode == null) {
+ return notFound("Cluster " + clusterId + " is not in management mode");
+ }
+
+ Map<String, Object> responseMap = new HashMap<>();
+ responseMap.put("cluster", clusterId);
+ responseMap.put("mode", mode.getMode());
+ responseMap.put("status", mode.getStatus());
+ if (showDetails) {
+ // To show details, query participants that are in progress to management mode.
+ responseMap.put("details", getManagementModeDetails(clusterId, mode));
+ }
+
+ return JSONRepresentation(responseMap);
+ }
+
+ private Map<String, Object> getManagementModeDetails(String clusterId,
+ ClusterManagementMode mode) {
+ Map<String, Object> details = new HashMap<>();
+ Map<String, Object> participantDetails = new HashMap<>();
+ ClusterManagementMode.Status status = mode.getStatus();
+ details.put("cluster", ImmutableMap.of("cluster", clusterId, "status", status.name()));
+
+ boolean hasPendingST = false;
+ Set<String> liveInstancesInProgress = new HashSet<>();
+
+ if (ClusterManagementMode.Status.IN_PROGRESS.equals(status)) {
+ HelixDataAccessor accessor = getDataAccssor(clusterId);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
+ BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+ if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())) {
+ // Entering cluster freeze mode, check live instance freeze status and pending ST
+ for (LiveInstance liveInstance : liveInstances) {
+ String instanceName = liveInstance.getInstanceName();
+ if (!LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstance.getStatus())) {
+ liveInstancesInProgress.add(instanceName);
+ }
+ Stat stat = baseAccessor
+ .getStat(keyBuilder.messages(instanceName).getPath(), AccessOption.PERSISTENT);
+ if (stat.getNumChildren() > 0) {
+ hasPendingST = true;
+ liveInstancesInProgress.add(instanceName);
+ }
+ }
+ } else if (ClusterManagementMode.Type.NORMAL.equals(mode.getMode())) {
+ // Exiting freeze mode, check live instance unfreeze status
+ for (LiveInstance liveInstance : liveInstances) {
+ if (LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstance.getStatus())) {
+ liveInstancesInProgress.add(liveInstance.getInstanceName());
+ }
+ }
+ }
+ }
+
+ participantDetails.put("status", status.name());
+ participantDetails.put("liveInstancesInProgress", liveInstancesInProgress);
+ if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())) {
+ // Add pending ST result for cluster freeze mode
+ participantDetails.put("hasPendingStateTransition", hasPendingST);
+ }
+
+ details.put(ClusterProperties.liveInstances.name(), participantDetails);
+ return details;
+ }
+
+ @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+ @Timed(name = HttpConstants.WRITE_REQUEST)
+ @POST
+ @Path("{clusterId}/management-mode")
+ public Response updateClusterManagementMode(@PathParam("clusterId") String clusterId,
+ @DefaultValue("{}") String content) {
+ ClusterManagementModeRequest request;
+ try {
+ request = OBJECT_MAPPER.readerFor(ClusterManagementModeRequest.class).readValue(content);
+ } catch (JsonProcessingException e) {
+ LOG.warn("Failed to parse json string: {}", content, e);
+ return badRequest("Invalid payload json body: " + content);
+ }
+
+ // Need to add cluster name
+ request = ClusterManagementModeRequest.newBuilder()
+ .withClusterName(clusterId)
+ .withMode(request.getMode())
+ .withCancelPendingST(request.isCancelPendingST())
+ .withReason(request.getReason())
+ .build();
+
+ try {
+ getHelixAdmin().setClusterManagementMode(request);
+ } catch (HelixConflictException e) {
+ return Response.status(Response.Status.CONFLICT).entity(e.getMessage()).build();
+ } catch (HelixException e) {
+ return serverError(e.getMessage());
+ }
+
+ return JSONRepresentation(ImmutableMap.of("acknowledged", true));
+ }
+
+ @ResponseMetered(name = HttpConstants.READ_REQUEST)
+ @Timed(name = HttpConstants.READ_REQUEST)
+ @GET
@Path("{clusterId}/configs")
public Response getClusterConfig(@PathParam("clusterId") String clusterId) {
ConfigAccessor accessor = getConfigAccessor();
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index cca5284..8d2de70 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -36,16 +36,20 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.sun.research.ws.wadl.HTTPMethods;
+import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.cloud.azure.AzureConstants;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CustomizedStateConfig;
@@ -54,6 +58,7 @@
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.rest.common.HelixRestNamespace;
import org.apache.helix.rest.server.auditlog.AuditLog;
@@ -1285,6 +1290,71 @@
System.out.println("End test :" + TestHelper.getTestMethodName());
}
+ @Test
+ public void testClusterFreezeMode() throws Exception {
+ String cluster = _clusters.iterator().next();
+ HelixDataAccessor dataAccessor =
+ new ZKHelixDataAccessor(cluster, new ZkBaseDataAccessor<>(_gZkClient));
+ // Pause not existed
+ Assert.assertNull(dataAccessor.getProperty(dataAccessor.keyBuilder().pause()));
+
+ String endpoint = "clusters/" + cluster + "/management-mode";
+
+ // Set cluster pause mode
+ ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+ .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+ .withClusterName(cluster)
+ .build();
+ String payload = OBJECT_MAPPER.writeValueAsString(request);
+ post(endpoint, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode());
+
+ PauseSignal pauseSignal = dataAccessor.getProperty(dataAccessor.keyBuilder().pause());
+ Assert.assertNotNull(pauseSignal);
+ Assert.assertTrue(pauseSignal.isClusterPause());
+ Assert.assertFalse(pauseSignal.getCancelPendingST());
+
+ // Wait until cluster status is persisted
+ TestHelper.verify(() -> dataAccessor.getBaseDataAccessor()
+ .exists(dataAccessor.keyBuilder().clusterStatus().getPath(), AccessOption.PERSISTENT),
+ TestHelper.WAIT_DURATION);
+
+ // Verify get cluster status
+ String body = get(endpoint, null, Response.Status.OK.getStatusCode(), true);
+ ClusterManagementMode mode =
+ OBJECT_MAPPER.readerFor(ClusterManagementMode.class).readValue(body);
+ Assert.assertEquals(mode.getMode(), ClusterManagementMode.Type.CLUSTER_PAUSE);
+ // Depending on timing, it could IN_PROGRESS or COMPLETED.
+ // It's just to verify the rest response format is correct
+ Assert.assertTrue(ClusterManagementMode.Status.IN_PROGRESS.equals(mode.getStatus())
+ || ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus()));
+
+ body = get(endpoint, ImmutableMap.of("showDetails", "true"), Response.Status.OK.getStatusCode(),
+ true);
+ Map<String, Object> responseMap = OBJECT_MAPPER.readerFor(Map.class).readValue(body);
+ Map<String, Object> detailsMap = (Map<String, Object>) responseMap.get("details");
+
+ Assert.assertEquals(responseMap.get("cluster"), cluster);
+ Assert.assertEquals(responseMap.get("mode"), mode.getMode().name());
+ Assert.assertEquals(responseMap.get("status"), mode.getStatus().name());
+ Assert.assertTrue(responseMap.containsKey("details"));
+ Assert.assertTrue(detailsMap.containsKey("cluster"));
+ Assert.assertTrue(detailsMap.containsKey("liveInstances"));
+
+ // set normal mode
+ request = ClusterManagementModeRequest.newBuilder()
+ .withMode(ClusterManagementMode.Type.NORMAL)
+ .withClusterName(cluster)
+ .build();
+ payload = OBJECT_MAPPER.writeValueAsString(request);
+ post(endpoint, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode());
+
+ // Pause signal is deleted
+ pauseSignal = dataAccessor.getProperty(dataAccessor.keyBuilder().pause());
+ Assert.assertNull(pauseSignal);
+ }
+
private ClusterConfig getClusterConfigFromRest(String cluster) throws IOException {
String body = get("clusters/" + cluster + "/configs", null, Response.Status.OK.getStatusCode(), true);