blob: f6b9b823ba9e18826c8aab71fd90699752217496 [file] [log] [blame]
package org.apache.helix.rest.server.resources.helix;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Path("/clusters/{clusterId}/partitionAssignment")
public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
private static Logger LOG = LoggerFactory.getLogger(
org.apache.helix.rest.server.resources.helix.ResourceAssignmentOptimizerAccessor.class
.getName());
private static class InputFields {
List<String> newInstances = new ArrayList<>();
List<String> instancesToRemove = new ArrayList<>();
Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance.
Set<String> instanceFilter = new HashSet<>();
Set<String> resourceFilter = new HashSet<>();
}
// TODO: We could add a data cache here to avoid read latency.
private static class ClusterState {
List<InstanceConfig> instanceConfigs = new ArrayList<>();
ClusterConfig clusterConfig;
List<String> resources = new ArrayList<>();
List<String> instances; // cluster LiveInstance + addInstances - instancesToRemove.
}
// Result format: Map of resource -> partition -> instance -> state.
private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> {
public AssignmentResult() {
super();
}
}
private static class InputJsonContent {
@JsonProperty("InstanceChange")
InstanceChangeMap instanceChangeMap;
@JsonProperty("Options")
OptionsMap optionsMap;
}
private static class InstanceChangeMap {
@JsonProperty("AddInstances")
List<String> addInstances;
@JsonProperty("RemoveInstances")
List<String> removeInstances;
@JsonProperty("SwapInstances")
Map<String, String> swapInstances;
}
private static class OptionsMap {
@JsonProperty("InstanceFilter")
Set<String> instanceFilter;
@JsonProperty("ResourceFilter")
Set<String> resourceFilter;
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
public Response computePotentialAssignment(@PathParam("clusterId") String clusterId,
String content) {
InputFields inputFields;
ClusterState clusterState;
AssignmentResult result;
try {
// 1. Try to parse the content string. If parseable, use it as a KV map. Otherwise, return a REASON String
inputFields = readInput(content);
// 2. Read cluster status from ZK.
clusterState = readClusterStateAndValidateInput(clusterId, inputFields);
// 3. Call rebalancer tools for each resource.
result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId);
// 4. Serialize result to JSON and return.
return JSONRepresentation(result);
} catch (InvalidParameterException ex) {
return badRequest(ex.getMessage());
} catch (JsonProcessingException e) {
return badRequest("Invalid input: Input can not be parsed into a KV map." + e.getMessage());
} catch (OutOfMemoryError e) {
LOG.error("OutOfMemoryError while calling partitionAssignment" + Arrays
.toString(e.getStackTrace()));
return badRequest(
"Response size is too large to serialize. Please query by resources or instance filter");
} catch (Exception e) {
LOG.error("Failed to compute partition assignment:" + Arrays.toString(e.getStackTrace()));
return badRequest("Failed to compute partition assignment: " + e);
}
}
private InputFields readInput(String content)
throws InvalidParameterException, JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
InputJsonContent inputJsonContent = objectMapper.readValue(content, InputJsonContent.class);
InputFields inputFields = new InputFields();
if (inputJsonContent.instanceChangeMap != null) {
Optional.ofNullable(inputJsonContent.instanceChangeMap.addInstances)
.ifPresent(inputFields.newInstances::addAll);
Optional.ofNullable(inputJsonContent.instanceChangeMap.removeInstances)
.ifPresent(inputFields.instancesToRemove::addAll);
Optional.ofNullable(inputJsonContent.instanceChangeMap.swapInstances)
.ifPresent(inputFields.nodeSwap::putAll);
}
if (inputJsonContent.optionsMap != null) {
Optional.ofNullable(inputJsonContent.optionsMap.resourceFilter)
.ifPresent(inputFields.resourceFilter::addAll);
Optional.ofNullable(inputJsonContent.optionsMap.instanceFilter)
.ifPresent(inputFields.instanceFilter::addAll);
}
return inputFields;
}
private ClusterState readClusterStateAndValidateInput(String clusterId, InputFields inputFields)
throws InvalidParameterException {
ClusterState clusterState = new ClusterState();
ConfigAccessor cfgAccessor = getConfigAccessor();
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
clusterState.resources = dataAccessor.getChildNames(dataAccessor.keyBuilder().idealStates());
// Add existing live instances and new instances from user input to instances list.
clusterState.instances = dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
clusterState.instances.addAll(inputFields.newInstances);
// Check if to be removed instances and old instances in swap node exist in live instance.
if (!inputFields.nodeSwap.isEmpty() || !inputFields.instancesToRemove.isEmpty()) {
Set<String> liveInstanceSet = new HashSet<>(clusterState.instances);
for (Map.Entry<String, String> nodeSwapPair : inputFields.nodeSwap.entrySet()) {
if (!liveInstanceSet.contains(nodeSwapPair.getKey())) {
throw new InvalidParameterException("Invalid input: instance [" + nodeSwapPair.getKey()
+ "] in SwapInstances does not exist in cluster.");
}
}
for (String instanceToRemove : inputFields.instancesToRemove) {
if (!liveInstanceSet.contains(instanceToRemove)) {
throw new InvalidParameterException("Invalid input: instance [" + instanceToRemove
+ "] in RemoveInstances does not exist in cluster.");
}
}
if (!inputFields.instancesToRemove.isEmpty()) {
clusterState.instances.removeIf(inputFields.instancesToRemove::contains);
}
}
// Read instance and cluster config.
// It will throw exception if there is no instanceConfig for newly added instance.
for (String instance : clusterState.instances) {
InstanceConfig config = cfgAccessor.getInstanceConfig(clusterId, instance);
clusterState.instanceConfigs.add(config);
}
clusterState.clusterConfig = cfgAccessor.getClusterConfig(clusterId);
return clusterState;
}
private AssignmentResult computeOptimalAssignmentForResources(InputFields inputFields,
ClusterState clusterState, String clusterId) throws Exception {
AssignmentResult result = new AssignmentResult();
// Iterate through resources, read resource level info and get potential assignment.
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
List<IdealState> wagedResourceIdealState = new ArrayList<>();
for (String resource : clusterState.resources) {
IdealState idealState =
dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates(resource));
// Compute all Waged resources in a batch later.
if (idealState.getRebalancerClassName() != null && idealState.getRebalancerClassName()
.equals(WagedRebalancer.class.getName())) {
wagedResourceIdealState.add(idealState);
continue;
}
// For non Waged resources, we don't compute resources not in white list.
if (!inputFields.resourceFilter.isEmpty() && !inputFields.resourceFilter.contains(resource)) {
continue;
}
// Use getIdealAssignmentForFullAuto for FULL_AUTO resource.
Map<String, Map<String, String>> partitionAssignments;
if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
String rebalanceStrategy = idealState.getRebalanceStrategy();
if (rebalanceStrategy == null || rebalanceStrategy
.equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
rebalanceStrategy = AutoRebalanceStrategy.class.getName();
}
partitionAssignments = new TreeMap<>(HelixUtil
.getIdealAssignmentForFullAuto(clusterState.clusterConfig, clusterState.instanceConfigs,
clusterState.instances, idealState, new ArrayList<>(idealState.getPartitionSet()),
rebalanceStrategy));
instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
} else if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO) {
// Use computeIdealMapping for SEMI_AUTO resource.
Map<String, List<String>> preferenceLists = idealState.getPreferenceLists();
partitionAssignments = new TreeMap<>();
HashSet<String> liveInstances = new HashSet<>(clusterState.instances);
List<String> disabledInstance =
clusterState.instanceConfigs.stream().filter(enabled -> !enabled.getInstanceEnabled())
.map(InstanceConfig::getInstanceName).collect(Collectors.toList());
liveInstances.removeAll(disabledInstance);
StateModelDefinition stateModelDef = dataAccessor
.getProperty(dataAccessor.keyBuilder().stateModelDef(idealState.getStateModelDefRef()));
for (String partitionName : preferenceLists.keySet()) {
if (!preferenceLists.get(partitionName).isEmpty() && preferenceLists.get(partitionName)
.get(0)
.equalsIgnoreCase(ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.name())) {
partitionAssignments.put(partitionName, HelixUtil
.computeIdealMapping(clusterState.instances, stateModelDef, liveInstances));
} else {
partitionAssignments.put(partitionName, HelixUtil
.computeIdealMapping(preferenceLists.get(partitionName), stateModelDef,
liveInstances));
}
}
instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
}
}
if (!wagedResourceIdealState.isEmpty()) {
computeWagedAssignmentResult(wagedResourceIdealState, inputFields, clusterState, clusterId,
result);
}
return result;
}
private void computeWagedAssignmentResult(List<IdealState> wagedResourceIdealState,
InputFields inputFields, ClusterState clusterState, String clusterId,
AssignmentResult result) {
// Use getTargetAssignmentForWagedFullAuto for Waged resources.
ConfigAccessor cfgAccessor = getConfigAccessor();
List<ResourceConfig> wagedResourceConfigs = new ArrayList<>();
for (IdealState idealState : wagedResourceIdealState) {
wagedResourceConfigs
.add(cfgAccessor.getResourceConfig(clusterId, idealState.getResourceName()));
}
Map<String, ResourceAssignment> wagedAssignmentResult;
wagedAssignmentResult = HelixUtil.getTargetAssignmentForWagedFullAuto(getZkBucketDataAccessor(),
new ZkBaseDataAccessor<>(getRealmAwareZkClient()), clusterState.clusterConfig,
clusterState.instanceConfigs, clusterState.instances, wagedResourceIdealState,
wagedResourceConfigs);
// Convert ResourceAssignment to plain map.
for (Map.Entry<String, ResourceAssignment> wagedAssignment : wagedAssignmentResult.entrySet()) {
String resource = wagedAssignment.getKey();
if (!inputFields.resourceFilter.isEmpty() && !inputFields.resourceFilter.contains(resource)) {
continue;
}
Map<String, Map<String, String>> partitionAssignments = new TreeMap<>();
wagedAssignment.getValue().getMappedPartitions().forEach(partition -> partitionAssignments
.put(partition.getPartitionName(), wagedAssignment.getValue().getReplicaMap(partition)));
instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
}
}
private void instanceSwapAndFilter(InputFields inputFields,
Map<String, Map<String, String>> partitionAssignments, String resource,
AssignmentResult result) {
if (!inputFields.nodeSwap.isEmpty() || !inputFields.instanceFilter.isEmpty()) {
for (Iterator<Map.Entry<String, Map<String, String>>> partitionAssignmentIt =
partitionAssignments.entrySet().iterator(); partitionAssignmentIt.hasNext(); ) {
Map.Entry<String, Map<String, String>> partitionAssignment = partitionAssignmentIt.next();
Map<String, String> instanceStates = partitionAssignment.getValue();
Map<String, String> tempInstanceState = new HashMap<>();
// Add new pairs to tempInstanceState
instanceStates.entrySet().stream()
.filter(entry -> inputFields.nodeSwap.containsKey(entry.getKey())).forEach(
entry -> tempInstanceState
.put(inputFields.nodeSwap.get(entry.getKey()), entry.getValue()));
instanceStates.putAll(tempInstanceState);
// Only keep instance in instanceFilter
instanceStates.entrySet().removeIf(e ->
(!inputFields.instanceFilter.isEmpty() && !inputFields.instanceFilter
.contains(e.getKey())) || inputFields.nodeSwap.containsKey(e.getKey()));
if (instanceStates.isEmpty()) {
partitionAssignmentIt.remove();
}
}
}
result.put(resource, partitionAssignments);
}
}