| package org.apache.helix.util; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.PropertyType; |
| import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; |
| import org.apache.helix.controller.rebalancer.AbstractRebalancer; |
| import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; |
| import org.apache.helix.model.BuiltInStateModelDefinitions; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Joiner; |
| |
| public final class HelixUtil { |
| static private Logger LOG = LoggerFactory.getLogger(HelixUtil.class); |
| |
| private HelixUtil() { |
| } |
| |
| public static String getInstanceNameFromPath(String path) { |
| // path structure |
| // /<cluster_name>/instances/<instance_name>/[currentStates/messages] |
| if (path.contains("/" + PropertyType.INSTANCES + "/")) { |
| String[] split = path.split("\\/"); |
| if (split.length > 3) { |
| return split[3]; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * get the parent-path of given path |
| * return "/" string if path = "/xxx", null if path = "/" |
| * @param path |
| * @return |
| */ |
| public static String getZkParentPath(String path) { |
| if (path.equals("/")) { |
| return null; |
| } |
| |
| int idx = path.lastIndexOf('/'); |
| return idx == 0 ? "/" : path.substring(0, idx); |
| } |
| |
| /** |
| * get the last part of the zk-path |
| * @param path |
| * @return |
| */ |
| public static String getZkName(String path) { |
| return path.substring(path.lastIndexOf('/') + 1); |
| } |
| |
| public static String serializeByComma(List<String> objects) { |
| return Joiner.on(",").join(objects); |
| } |
| |
| public static List<String> deserializeByComma(String object) { |
| if (object.length() == 0) { |
| return Collections.EMPTY_LIST; |
| } |
| return Arrays.asList(object.split(",")); |
| } |
| |
| /** |
| * parse a csv-formated key-value pairs |
| * @param keyValuePairs : csv-formatted key-value pairs. e.g. k1=v1,k2=v2,... |
| * @return |
| */ |
| public static Map<String, String> parseCsvFormatedKeyValuePairs(String keyValuePairs) { |
| String[] pairs = keyValuePairs.split("[\\s,]"); |
| Map<String, String> keyValueMap = new TreeMap<String, String>(); |
| for (String pair : pairs) { |
| int idx = pair.indexOf('='); |
| if (idx == -1) { |
| LOG.error("Invalid key-value pair: " + pair + ". Igonore it."); |
| continue; |
| } |
| |
| String key = pair.substring(0, idx); |
| String value = pair.substring(idx + 1); |
| keyValueMap.put(key, value); |
| } |
| return keyValueMap; |
| } |
| |
| /** |
| * Attempts to load the class and delegates to TCCL if class is not found. |
| * Note: The approach is used as a last resort for environments like OSGi. |
| * @param className |
| * @return |
| * @throws ClassNotFoundException |
| */ |
| public static <T> Class<?> loadClass(Class<T> clazz, String className) |
| throws ClassNotFoundException { |
| try { |
| return clazz.getClassLoader().loadClass(className); |
| } catch (ClassNotFoundException ex) { |
| if (Thread.currentThread().getContextClassLoader() != null) { |
| return Thread.currentThread().getContextClassLoader().loadClass(className); |
| } else { |
| throw ex; |
| } |
| } |
| } |
| |
| /** |
| * This method provides the ideal state mapping with corresponding rebalance strategy |
| * @param clusterConfig The cluster config |
| * @param instanceConfigs List of instance configs |
| * @param liveInstances List of live instance names |
| * @param idealState The ideal state of current resource. If input is null, will be |
| * treated as newly created resource. |
| * @param partitions The list of partition names |
| * @param strategyClassName The rebalance strategy. e.g. AutoRebalanceStrategy |
| * @return A map of ideal state assignment as partition -> instance -> state |
| */ |
| public static Map<String, Map<String, String>> getIdealAssignmentForFullAuto( |
| ClusterConfig clusterConfig, List<InstanceConfig> instanceConfigs, List<String> liveInstances, |
| IdealState idealState, List<String> partitions, String strategyClassName) |
| throws ClassNotFoundException, IllegalAccessException, InstantiationException { |
| List<String> allNodes = new ArrayList<>(); |
| Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); |
| for (InstanceConfig instanceConfig : instanceConfigs) { |
| allNodes.add(instanceConfig.getInstanceName()); |
| instanceConfigMap.put(instanceConfig.getInstanceName(), instanceConfig); |
| } |
| ResourceControllerDataProvider cache = new ResourceControllerDataProvider(); |
| cache.setClusterConfig(clusterConfig); |
| cache.setInstanceConfigMap(instanceConfigMap); |
| |
| StateModelDefinition stateModelDefinition = |
| BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef()) |
| .getStateModelDefinition(); |
| |
| RebalanceStrategy strategy = |
| RebalanceStrategy.class.cast(loadClass(HelixUtil.class, strategyClassName).newInstance()); |
| |
| strategy.init(idealState.getResourceName(), partitions, stateModelDefinition |
| .getStateCountMap(liveInstances.size(), Integer.parseInt(idealState.getReplicas())), |
| idealState.getMaxPartitionsPerInstance()); |
| |
| Map<String, List<String>> preferenceLists = strategy |
| .computePartitionAssignment(allNodes, liveInstances, |
| new HashMap<String, Map<String, String>>(), cache).getListFields(); |
| |
| Map<String, Map<String, String>> idealStateMapping = new HashMap<>(); |
| Set<String> liveInstanceSet = new HashSet<>(liveInstances); |
| for (String partitionName : preferenceLists.keySet()) { |
| idealStateMapping.put(partitionName, |
| computeIdealMapping(preferenceLists.get(partitionName), stateModelDefinition, |
| liveInstanceSet)); |
| } |
| return idealStateMapping; |
| } |
| |
| /** |
| * compute the ideal mapping for resource in Full-Auto and Semi-Auto based on its preference list |
| */ |
| public static Map<String, String> computeIdealMapping(List<String> preferenceList, |
| StateModelDefinition stateModelDef, Set<String> liveAndEnabled) { |
| Map<String, String> idealStateMap = new HashMap<String, String>(); |
| |
| if (preferenceList == null) { |
| return idealStateMap; |
| } |
| |
| List<String> statesPriorityList = stateModelDef.getStatesPriorityList(); |
| Set<String> assigned = new HashSet<String>(); |
| |
| for (String state : statesPriorityList) { |
| int stateCount = AbstractRebalancer.getStateCount(state, stateModelDef, liveAndEnabled.size(), |
| preferenceList.size()); |
| for (String instance : preferenceList) { |
| if (stateCount <= 0) { |
| break; |
| } |
| if (!assigned.contains(instance)) { |
| idealStateMap.put(instance, state); |
| assigned.add(instance); |
| stateCount--; |
| } |
| } |
| } |
| |
| return idealStateMap; |
| } |
| |
| /** |
| * Remove the given message from ZK using the given accessor. This function will |
| * not throw exception |
| * @param accessor HelixDataAccessor |
| * @param msg message to remove |
| * @param instanceName name of the instance on which the message sits |
| * @return true if success else false |
| */ |
| public static boolean removeMessageFromZK(HelixDataAccessor accessor, Message msg, |
| String instanceName) { |
| try { |
| return accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName)); |
| } catch (Exception e) { |
| LOG.error("Caught exception while removing message {}.", msg, e); |
| } |
| return false; |
| } |
| |
| /** |
| * Get the value of system property |
| * @param propertyKey |
| * @param propertyDefaultValue |
| * @return |
| */ |
| public static int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) { |
| String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue); |
| |
| try { |
| int value = Integer.parseInt(valueString); |
| if (value > 0) { |
| return value; |
| } |
| } catch (NumberFormatException e) { |
| LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString |
| + ", using default value: " + propertyDefaultValue); |
| } |
| |
| return propertyDefaultValue; |
| } |
| |
| /** |
| * Get the value of system property |
| * @param propertyKey |
| * @param propertyDefaultValue |
| * @return |
| */ |
| public static long getSystemPropertyAsLong(String propertyKey, long propertyDefaultValue) { |
| String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue); |
| |
| try { |
| long value = Long.parseLong(valueString); |
| if (value > 0) { |
| return value; |
| } |
| } catch (NumberFormatException e) { |
| LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString |
| + ", using default value: " + propertyDefaultValue); |
| } |
| |
| return propertyDefaultValue; |
| } |
| |
| } |