blob: 22e03145b37e639043f1b2d609ec47d3d45bedbd [file] [log] [blame]
package org.apache.helix.rest.server.resources.helix;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementService;
import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.rest.server.filters.ClusterAuth;
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
import org.apache.helix.rest.server.resources.exceptions.HelixHealthException;
import org.apache.helix.rest.server.service.ClusterService;
import org.apache.helix.rest.server.service.ClusterServiceImpl;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ClusterAuth
@Path("/clusters/{clusterId}/instances")
public class InstancesAccessor extends AbstractHelixResource {
private final static Logger _logger = LoggerFactory.getLogger(InstancesAccessor.class);
// This type does not belongs to real HealthCheck failed reason. Also if we add this type
// to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl
// loops all the types to do corresponding checks.
private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST";
public enum InstancesProperties {
instances,
online,
disabled,
selection_base,
zone_order,
customized_values,
instance_stoppable_parallel,
instance_not_stoppable_with_reasons
}
public enum InstanceHealthSelectionBase {
instance_based,
zone_based
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
public Response getAllInstances(@PathParam("clusterId") String clusterId,
@DefaultValue("getAllInstances") @QueryParam("command") String command) {
// Get the command. If not provided, the default would be "getAllInstances"
Command cmd;
try {
cmd = Command.valueOf(command);
} catch (Exception e) {
return badRequest("Invalid command : " + command);
}
HelixDataAccessor accessor = getDataAccssor(clusterId);
List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs());
if (instances == null) {
return notFound();
}
switch (cmd) {
case getAllInstances:
ObjectNode root = JsonNodeFactory.instance.objectNode();
root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
ArrayNode instancesNode =
root.putArray(InstancesAccessor.InstancesProperties.instances.name());
instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
ArrayNode onlineNode = root.putArray(InstancesAccessor.InstancesProperties.online.name());
ArrayNode disabledNode = root.putArray(InstancesAccessor.InstancesProperties.disabled.name());
List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
for (String instanceName : instances) {
InstanceConfig instanceConfig =
accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
if (instanceConfig != null) {
if (!InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig)) {
disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
}
if (liveInstances.contains(instanceName)) {
onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
}
}
}
return JSONRepresentation(root);
case validateWeight:
// Validate all instances for WAGED rebalance
HelixAdmin admin = getHelixAdmin();
Map<String, Boolean> validationResultMap;
try {
validationResultMap = admin.validateInstancesForWagedRebalance(clusterId, instances);
} catch (HelixException e) {
return badRequest(e.getMessage());
}
return JSONRepresentation(validationResultMap);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
}
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
public Response instancesOperations(
@PathParam("clusterId") String clusterId,
@QueryParam("command") String command,
@QueryParam("continueOnFailures") boolean continueOnFailures,
@QueryParam("skipZKRead") boolean skipZKRead,
String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
} catch (Exception e) {
return badRequest("Invalid command : " + command);
}
HelixAdmin admin = getHelixAdmin();
try {
JsonNode node = null;
if (content.length() != 0) {
node = OBJECT_MAPPER.readTree(content);
}
if (node == null) {
return badRequest("Invalid input for content : " + content);
}
List<String> enableInstances = OBJECT_MAPPER
.readValue(node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
switch (cmd) {
case enable:
admin.enableInstance(clusterId, enableInstances, true);
break;
case disable:
admin.enableInstance(clusterId, enableInstances, false);
break;
case stoppable:
return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
}
} catch (HelixHealthException e) {
_logger
.error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
return serverError(e);
} catch (Exception e) {
_logger.error("Failed in updating instances : " + content, e);
return badRequest(e.getMessage());
}
return OK();
}
private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead,
boolean continueOnFailures) throws IOException {
try {
// TODO: Process input data from the content
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
InstancesAccessor.InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());
List<String> instances = OBJECT_MAPPER
.readValue(node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
List<String> orderOfZone = null;
String customizedInput = null;
if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) {
customizedInput = node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString();
}
if (node.get(InstancesAccessor.InstancesProperties.zone_order.name()) != null) {
orderOfZone = OBJECT_MAPPER
.readValue(node.get(InstancesAccessor.InstancesProperties.zone_order.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
}
// Prepare output result
ObjectNode result = JsonNodeFactory.instance.objectNode();
ArrayNode stoppableInstances =
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
MaintenanceManagementService maintenanceService =
new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
getConfigAccessor(), skipZKRead, continueOnFailures, getNamespace());
ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
switch (selectionBase) {
case zone_based:
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, orderOfZone, clusterTopology.toZoneMapping());
Map<String, StoppableCheck> instancesStoppableChecks = maintenanceService.batchGetInstancesStoppableChecks(
clusterId, zoneBasedInstance, customizedInput);
for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
if (!stoppableCheck.isStoppable()) {
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
for (String failedReason : stoppableCheck.getFailedChecks()) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
}
} else {
stoppableInstances.add(instance);
}
}
// Adding following logic to check whether instances exist or not. An instance exist could be
// checking following scenario:
// 1. Instance got dropped. (InstanceConfig is gone.)
// 2. Instance name has typo.
// If we dont add this check, the instance, which does not exist, will be disappeared from
// result since Helix skips instances for instances not in the selected zone. User may get
// confused with the output.
Set<String> nonSelectedInstances = new HashSet<>(instances);
nonSelectedInstances.removeAll(clusterTopology.getAllInstances());
for (String nonSelectedInstance : nonSelectedInstances) {
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance);
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
}
break;
case instance_based:
default:
throw new UnsupportedOperationException("instance_based selection is not supported yet!");
}
return JSONRepresentation(result);
} catch (HelixException e) {
_logger
.error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
throw new HelixHealthException(e);
} catch (Exception e) {
_logger.error(String.format(
"Failed to get parallel stoppable instances for cluster %s with a HelixException!",
clusterId), e);
throw e;
}
}
/**
* Get instances belongs to the first zone. If the zone is already empty, Helix will iterate zones
* by order until find the zone contains instances.
*
* The order of zones can directly come from user input. If user did not specify it, Helix will order
* zones with alphabetical order.
*
* @param instances
* @param orderedZones
* @return
*/
private List<String> getZoneBasedInstances(List<String> instances, List<String> orderedZones,
Map<String, Set<String>> zoneMapping) {
if (orderedZones == null) {
orderedZones = new ArrayList<>(zoneMapping.keySet());
}
Collections.sort(orderedZones);
if (orderedZones.isEmpty()) {
return orderedZones;
}
Set<String> instanceSet = null;
for (String zone : orderedZones) {
instanceSet = new TreeSet<>(instances);
Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
instanceSet.retainAll(currentZoneInstanceSet);
if (instanceSet.size() > 0) {
return new ArrayList<>(instanceSet);
}
}
return Collections.EMPTY_LIST;
}
}