blob: cc4988569fcbd4f397b2e44cc6a27572b053aee6 [file] [log] [blame]
package org.apache.helix.rest.server.resources.helix;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.rest.server.filters.ClusterAuth;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ClusterAuth
@Path("/clusters/{clusterId}/resources")
public class ResourceAccessor extends AbstractHelixResource {
private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class);
public enum ResourceProperties {
idealState,
idealStates,
externalView,
externalViews,
resourceConfig,
}
public enum HealthStatus {
HEALTHY,
PARTIAL_HEALTHY,
UNHEALTHY
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
public Response getResources(@PathParam("clusterId") String clusterId) {
ObjectNode root = JsonNodeFactory.instance.objectNode();
root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
RealmAwareZkClient zkClient = getRealmAwareZkClient();
ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name());
ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name());
List<String> idealStates = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
List<String> externalViews = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
if (idealStates != null) {
idealStatesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(idealStates));
} else {
return notFound();
}
if (externalViews != null) {
externalViewsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(externalViews));
}
return JSONRepresentation(root);
}
/**
* Returns health profile of all resources in the cluster
* @param clusterId
* @return JSON result
*/
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
@Path("health")
public Response getResourceHealth(@PathParam("clusterId") String clusterId) {
RealmAwareZkClient zkClient = getRealmAwareZkClient();
List<String> resourcesInIdealState =
zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
List<String> resourcesInExternalView =
zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
Map<String, String> resourceHealthResult = new HashMap<>();
for (String resourceName : resourcesInIdealState) {
if (resourcesInExternalView.contains(resourceName)) {
Map<String, String> partitionHealth = computePartitionHealth(clusterId, resourceName);
if (partitionHealth.isEmpty()
|| partitionHealth.values().contains(HealthStatus.UNHEALTHY.name())) {
// No partitions for a resource or there exists one or more UNHEALTHY partitions in this
// resource, UNHEALTHY
resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name());
} else if (partitionHealth.values().contains(HealthStatus.PARTIAL_HEALTHY.name())) {
// No UNHEALTHY partition, but one or more partially healthy partitions, resource is
// partially healthy
resourceHealthResult.put(resourceName, HealthStatus.PARTIAL_HEALTHY.name());
} else {
// No UNHEALTHY or partially healthy partitions and non-empty, resource is healthy
resourceHealthResult.put(resourceName, HealthStatus.HEALTHY.name());
}
} else {
// If a resource is not in ExternalView, then it is UNHEALTHY
resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name());
}
}
return JSONRepresentation(resourceHealthResult);
}
/**
* Returns health profile of all partitions for the corresponding resource in the cluster
* @param clusterId
* @param resourceName
* @return JSON result
* @throws IOException
*/
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
@Path("{resourceName}/health")
public Response getPartitionHealth(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName) {
return JSONRepresentation(computePartitionHealth(clusterId, resourceName));
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
@Path("{resourceName}")
public Response getResource(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName,
@DefaultValue("getResource") @QueryParam("command") String command) {
// Get the command. If not provided, the default would be "getResource"
Command cmd;
try {
cmd = Command.valueOf(command);
} catch (Exception e) {
return badRequest("Invalid command : " + command);
}
ConfigAccessor accessor = getConfigAccessor();
HelixAdmin admin = getHelixAdmin();
switch (cmd) {
case getResource:
ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
Map<String, ZNRecord> resourceMap = new HashMap<>();
if (idealState != null) {
resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord());
} else {
return notFound();
}
resourceMap.put(ResourceProperties.resourceConfig.name(), null);
resourceMap.put(ResourceProperties.externalView.name(), null);
if (resourceConfig != null) {
resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
}
if (externalView != null) {
resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord());
}
return JSONRepresentation(resourceMap);
case validateWeight:
// Validate ResourceConfig for WAGED rebalance
Map<String, Boolean> validationResultMap;
try {
validationResultMap = admin.validateResourcesForWagedRebalance(clusterId,
Collections.singletonList(resourceName));
} catch (HelixException e) {
return badRequest(e.getMessage());
}
return JSONRepresentation(validationResultMap);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
}
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@PUT
@Path("{resourceName}")
public Response addResource(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName,
@DefaultValue("-1") @QueryParam("numPartitions") int numPartitions,
@DefaultValue("") @QueryParam("stateModelRef") String stateModelRef,
@DefaultValue("SEMI_AUTO") @QueryParam("rebalancerMode") String rebalancerMode,
@DefaultValue("DEFAULT") @QueryParam("rebalanceStrategy") String rebalanceStrategy,
@DefaultValue("0") @QueryParam("bucketSize") int bucketSize,
@DefaultValue("-1") @QueryParam("maxPartitionsPerInstance") int maxPartitionsPerInstance,
@DefaultValue("addResource") @QueryParam("command") String command, String content) {
// Get the command. If not provided, the default would be "addResource"
Command cmd;
try {
cmd = Command.valueOf(command);
} catch (Exception e) {
return badRequest("Invalid command : " + command);
}
HelixAdmin admin = getHelixAdmin();
try {
switch (cmd) {
case addResource:
if (content.length() != 0) {
ZNRecord record;
try {
record = toZNRecord(content);
} catch (IOException e) {
_logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
return badRequest("Input is not a valid ZNRecord!");
}
if (record.getSimpleFields() != null) {
admin.addResource(clusterId, resourceName, new IdealState(record));
}
} else {
admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode,
rebalanceStrategy, bucketSize, maxPartitionsPerInstance);
}
break;
case addWagedResource:
// Check if content is valid
if (content == null || content.length() == 0) {
_logger.error("Input is null or empty!");
return badRequest("Input is null or empty!");
}
Map<String, ZNRecord> input;
// Content must supply both IdealState and ResourceConfig
try {
TypeReference<Map<String, ZNRecord>> typeRef =
new TypeReference<Map<String, ZNRecord>>() {
};
input = ZNRECORD_READER.forType(typeRef).readValue(content);
} catch (IOException e) {
_logger.error("Failed to deserialize user's input {}, Exception: {}", content, e);
return badRequest("Input is not a valid map of String-ZNRecord pairs!");
}
// Check if the map contains both IdealState and ResourceConfig
ZNRecord idealStateRecord =
input.get(ResourceAccessor.ResourceProperties.idealState.name());
ZNRecord resourceConfigRecord =
input.get(ResourceAccessor.ResourceProperties.resourceConfig.name());
if (idealStateRecord == null || resourceConfigRecord == null) {
_logger.error("Input does not contain both IdealState and ResourceConfig!");
return badRequest("Input does not contain both IdealState and ResourceConfig!");
}
// Add using HelixAdmin API
try {
admin.addResourceWithWeight(clusterId, new IdealState(idealStateRecord),
new ResourceConfig(resourceConfigRecord));
} catch (HelixException e) {
String errMsg = String.format("Failed to add resource %s with weight in cluster %s!",
idealStateRecord.getId(), clusterId);
_logger.error(errMsg, e);
return badRequest(errMsg);
}
break;
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
}
} catch (Exception e) {
_logger.error("Error in adding a resource: " + resourceName, e);
return serverError(e);
}
return OK();
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
@Path("{resourceName}")
public Response updateResource(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName, @QueryParam("command") String command,
@DefaultValue("-1") @QueryParam("replicas") int replicas,
@DefaultValue("") @QueryParam("keyPrefix") String keyPrefix,
@DefaultValue("") @QueryParam("group") String group) {
Command cmd;
try {
cmd = Command.valueOf(command);
} catch (Exception e) {
return badRequest("Invalid command : " + command);
}
HelixAdmin admin = getHelixAdmin();
try {
switch (cmd) {
case enable:
admin.enableResource(clusterId, resourceName, true);
break;
case disable:
admin.enableResource(clusterId, resourceName, false);
break;
case rebalance:
if (replicas == -1) {
return badRequest("Number of replicas is needed for rebalancing!");
}
keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
break;
case enableWagedRebalance:
try {
admin.enableWagedRebalance(clusterId, Collections.singletonList(resourceName));
} catch (HelixException e) {
return badRequest(e.getMessage());
}
break;
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
}
} catch (Exception e) {
_logger.error("Failed in updating resource : " + resourceName, e);
return badRequest(e.getMessage());
}
return OK();
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@DELETE
@Path("{resourceName}")
public Response deleteResource(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName) {
HelixAdmin admin = getHelixAdmin();
try {
admin.dropResource(clusterId, resourceName);
} catch (Exception e) {
_logger.error("Error in deleting a resource: " + resourceName, e);
return serverError();
}
return OK();
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
@Path("{resourceName}/configs")
public Response getResourceConfig(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName) {
ConfigAccessor accessor = getConfigAccessor();
ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
if (resourceConfig != null) {
return JSONRepresentation(resourceConfig.getRecord());
}
return notFound();
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
@Path("{resourceName}/configs")
public Response updateResourceConfig(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName, @QueryParam("command") String commandStr,
String content) {
Command command;
if (commandStr == null || commandStr.isEmpty()) {
command = Command.update; // Default behavior to keep it backward-compatible
} else {
try {
command = getCommand(commandStr);
} catch (HelixException ex) {
return badRequest(ex.getMessage());
}
}
ZNRecord record;
try {
record = toZNRecord(content);
} catch (IOException e) {
_logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
return badRequest("Input is not a valid ZNRecord!");
}
if (!resourceName.equals(record.getId())) {
return badRequest("ID does not match the resourceName name in input!");
}
ResourceConfig resourceConfig = new ResourceConfig(record);
ConfigAccessor configAccessor = getConfigAccessor();
try {
switch (command) {
case update:
configAccessor.updateResourceConfig(clusterId, resourceName, resourceConfig);
break;
case delete:
HelixConfigScope resourceScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
.forCluster(clusterId).forResource(resourceName).build();
configAccessor.remove(resourceScope, record);
break;
default:
return badRequest(String.format("Unsupported command: %s", command));
}
} catch (HelixException ex) {
return notFound(ex.getMessage());
} catch (Exception ex) {
_logger.error(String.format("Error in update resource config for resource: %s", resourceName),
ex);
return serverError(ex);
}
return OK();
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
@Path("{resourceName}/idealState")
public Response getResourceIdealState(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName) {
HelixAdmin admin = getHelixAdmin();
IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
if (idealState != null) {
return JSONRepresentation(idealState.getRecord());
}
return notFound();
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
@Path("{resourceName}/idealState")
public Response updateResourceIdealState(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName, @QueryParam("command") String commandStr,
String content) {
Command command;
if (commandStr == null || commandStr.isEmpty()) {
command = Command.update; // Default behavior is update
} else {
try {
command = getCommand(commandStr);
} catch (HelixException ex) {
return badRequest(ex.getMessage());
}
}
ZNRecord record;
try {
record = toZNRecord(content);
} catch (IOException e) {
_logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
return badRequest("Input is not a valid ZNRecord!");
}
IdealState idealState = new IdealState(record);
HelixAdmin helixAdmin = getHelixAdmin();
try {
switch (command) {
case update:
helixAdmin.updateIdealState(clusterId, resourceName, idealState);
break;
case delete: {
helixAdmin.removeFromIdealState(clusterId, resourceName, idealState);
}
break;
default:
return badRequest(String.format("Unsupported command: %s", command));
}
} catch (HelixException ex) {
return notFound(ex.getMessage()); // HelixAdmin throws a HelixException if it doesn't
// exist already
} catch (Exception ex) {
_logger.error(String.format("Failed to update the IdealState for resource: %s", resourceName),
ex);
return serverError(ex);
}
return OK();
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
@Path("{resourceName}/externalView")
public Response getResourceExternalView(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName) {
HelixAdmin admin = getHelixAdmin();
ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
if (externalView != null) {
return JSONRepresentation(externalView.getRecord());
}
return notFound();
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
@Path("{resourceName}/{customizedStateType}/customizedView")
public Response getResourceCustomizedView(@PathParam("clusterId") String clusterId,
@PathParam("resourceName") String resourceName,
@PathParam("customizedStateType") String customizedStateType) {
HelixAdmin admin = getHelixAdmin();
CustomizedView customizedView =
admin.getResourceCustomizedView(clusterId, resourceName, customizedStateType);
if (customizedView != null) {
return JSONRepresentation(customizedView.getRecord());
}
return notFound();
}
private Map<String, String> computePartitionHealth(String clusterId, String resourceName) {
HelixAdmin admin = getHelixAdmin();
IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
StateModelDefinition stateModelDef =
admin.getStateModelDef(clusterId, idealState.getStateModelDefRef());
String initialState = stateModelDef.getInitialState();
List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
statesPriorityList = statesPriorityList.subList(0, statesPriorityList.indexOf(initialState)); // Trim
// stateList
// to
// initialState
// and
// above
int minActiveReplicas = idealState.getMinActiveReplicas();
// Start the logic that determines the health status of each partition
Map<String, String> partitionHealthResult = new HashMap<>();
Set<String> allPartitionNames = idealState.getPartitionSet();
if (!allPartitionNames.isEmpty()) {
for (String partitionName : allPartitionNames) {
int replicaCount =
idealState.getReplicaCount(idealState.getPreferenceList(partitionName).size());
// Simplify expectedStateCountMap by assuming that all instances are available to reduce
// computation load on this REST endpoint
LinkedHashMap<String, Integer> expectedStateCountMap =
stateModelDef.getStateCountMap(replicaCount, replicaCount);
// Extract all states into Collections from ExternalView
Map<String, String> stateMapInExternalView = externalView.getStateMap(partitionName);
Collection<String> allReplicaStatesInExternalView =
(stateMapInExternalView != null && !stateMapInExternalView.isEmpty())
? stateMapInExternalView.values()
: Collections.<String> emptyList();
int numActiveReplicasInExternalView = 0;
HealthStatus status = HealthStatus.HEALTHY;
// Go through all states that are "active" states (higher priority than InitialState)
for (int statePriorityIndex = 0; statePriorityIndex < statesPriorityList
.size(); statePriorityIndex++) {
String currentState = statesPriorityList.get(statePriorityIndex);
int currentStateCountInIdealState = expectedStateCountMap.get(currentState);
int currentStateCountInExternalView =
Collections.frequency(allReplicaStatesInExternalView, currentState);
numActiveReplicasInExternalView += currentStateCountInExternalView;
// Top state counts must match, if not, unhealthy
if (statePriorityIndex == 0
&& currentStateCountInExternalView != currentStateCountInIdealState) {
status = HealthStatus.UNHEALTHY;
break;
} else if (currentStateCountInExternalView < currentStateCountInIdealState) {
// For non-top states, if count in ExternalView is less than count in IdealState,
// partially healthy
status = HealthStatus.PARTIAL_HEALTHY;
}
}
if (numActiveReplicasInExternalView < minActiveReplicas) {
// If this partition does not satisfy the number of minimum active replicas, unhealthy
status = HealthStatus.UNHEALTHY;
}
partitionHealthResult.put(partitionName, status.name());
}
}
return partitionHealthResult;
}
}