| package org.apache.helix.manager.zk; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.DataInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.I0Itec.zkclient.DataUpdater; |
| import org.I0Itec.zkclient.exception.ZkNoNodeException; |
| import org.apache.helix.AccessOption; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixConstants; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixDefinedState; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.InstanceType; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.PropertyKey.Builder; |
| import org.apache.helix.PropertyPathConfig; |
| import org.apache.helix.PropertyType; |
| import org.apache.helix.ZNRecord; |
| import org.apache.helix.alerts.AlertsHolder; |
| import org.apache.helix.alerts.StatsHolder; |
| import org.apache.helix.model.Alerts; |
| import org.apache.helix.model.ClusterConstraints; |
| import org.apache.helix.model.ClusterConstraints.ConstraintType; |
| import org.apache.helix.model.ConfigScope; |
| import org.apache.helix.model.ConstraintItem; |
| import org.apache.helix.model.CurrentState; |
| import org.apache.helix.model.ExternalView; |
| import org.apache.helix.model.HelixConfigScope; |
| import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.IdealState.IdealStateModeProperty; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.InstanceConfig.InstanceConfigProperty; |
| import org.apache.helix.model.LiveInstance; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.model.Message.MessageState; |
| import org.apache.helix.model.Message.MessageType; |
| import org.apache.helix.model.PauseSignal; |
| import org.apache.helix.model.PersistentStats; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.tools.DefaultIdealStateCalculator; |
| import org.apache.helix.util.HelixUtil; |
| import org.apache.helix.util.RebalanceUtil; |
| import org.apache.log4j.Logger; |
| |
| |
| public class ZKHelixAdmin implements HelixAdmin |
| { |
| |
| private final ZkClient _zkClient; |
| private final ConfigAccessor _configAccessor; |
| |
| private static Logger logger = Logger.getLogger(ZKHelixAdmin.class); |
| |
| public ZKHelixAdmin(ZkClient zkClient) |
| { |
| _zkClient = zkClient; |
| _configAccessor = new ConfigAccessor(zkClient); |
| } |
| |
| public ZKHelixAdmin(String zkAddress) |
| { |
| _zkClient = new ZkClient(zkAddress); |
| _zkClient.setZkSerializer(new ZNRecordSerializer()); |
| _zkClient.waitUntilConnected(30, TimeUnit.SECONDS); |
| _configAccessor = new ConfigAccessor(_zkClient); |
| } |
| |
| @Override |
| public void addInstance(String clusterName, InstanceConfig instanceConfig) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| String instanceConfigsPath = |
| PropertyPathConfig.getPath(PropertyType.CONFIGS, |
| clusterName, |
| ConfigScopeProperty.PARTICIPANT.toString()); |
| String nodeId = instanceConfig.getId(); |
| String instanceConfigPath = instanceConfigsPath + "/" + nodeId; |
| |
| if (_zkClient.exists(instanceConfigPath)) |
| { |
| throw new HelixException("Node " + nodeId + " already exists in cluster " |
| + clusterName); |
| } |
| |
| ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); |
| |
| _zkClient.createPersistent(HelixUtil.getMessagePath(clusterName, nodeId), true); |
| _zkClient.createPersistent(HelixUtil.getCurrentStateBasePath(clusterName, nodeId), |
| true); |
| _zkClient.createPersistent(HelixUtil.getErrorsPath(clusterName, nodeId), true); |
| _zkClient.createPersistent(HelixUtil.getStatusUpdatesPath(clusterName, nodeId), true); |
| } |
| |
| @Override |
| public void dropInstance(String clusterName, InstanceConfig instanceConfig) |
| { |
| // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName); |
| String instanceConfigsPath = |
| PropertyPathConfig.getPath(PropertyType.CONFIGS, |
| clusterName, |
| ConfigScopeProperty.PARTICIPANT.toString()); |
| String nodeId = instanceConfig.getId(); |
| String instanceConfigPath = instanceConfigsPath + "/" + nodeId; |
| String instancePath = HelixUtil.getInstancePath(clusterName, nodeId); |
| |
| if (!_zkClient.exists(instanceConfigPath)) |
| { |
| throw new HelixException("Node " + nodeId |
| + " does not exist in config for cluster " + clusterName); |
| } |
| |
| if (!_zkClient.exists(instancePath)) |
| { |
| throw new HelixException("Node " + nodeId |
| + " does not exist in instances for cluster " + clusterName); |
| } |
| |
| // delete config path |
| ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); |
| |
| // delete instance path |
| _zkClient.deleteRecursive(instancePath); |
| } |
| |
| @Override |
| public InstanceConfig getInstanceConfig(String clusterName, String instanceName) |
| { |
| // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName); |
| |
| // String instanceConfigPath = instanceConfigsPath + "/" + instanceName; |
| String instanceConfigPath = |
| PropertyPathConfig.getPath(PropertyType.CONFIGS, |
| clusterName, |
| ConfigScopeProperty.PARTICIPANT.toString(), |
| instanceName); |
| if (!_zkClient.exists(instanceConfigPath)) |
| { |
| throw new HelixException("instance" + instanceName + " does not exist in cluster " |
| + clusterName); |
| } |
| |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| return accessor.getProperty(keyBuilder.instanceConfig(instanceName)); |
| } |
| |
| @Override |
| public void enableInstance(final String clusterName, |
| final String instanceName, |
| final boolean enabled) |
| { |
| String path = |
| PropertyPathConfig.getPath(PropertyType.CONFIGS, |
| clusterName, |
| ConfigScopeProperty.PARTICIPANT.toString(), |
| instanceName); |
| |
| ZkBaseDataAccessor<ZNRecord> baseAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| if (!baseAccessor.exists(path, 0)) |
| { |
| throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName |
| + ", instance config does not exist"); |
| } |
| |
| baseAccessor.update(path, new DataUpdater<ZNRecord>() |
| { |
| @Override |
| public ZNRecord update(ZNRecord currentData) |
| { |
| if (currentData == null) |
| { |
| throw new HelixException("Cluster: " + clusterName + ", instance: " |
| + instanceName + ", participant config is null"); |
| } |
| |
| InstanceConfig config = new InstanceConfig(currentData); |
| config.setInstanceEnabled(enabled); |
| return config.getRecord(); |
| } |
| }, AccessOption.PERSISTENT); |
| } |
| |
| @Override |
| public void enablePartition(final boolean enabled, |
| final String clusterName, |
| final String instanceName, |
| final String resourceName, |
| final List<String> partitionNames) |
| { |
| String path = |
| PropertyPathConfig.getPath(PropertyType.CONFIGS, |
| clusterName, |
| ConfigScopeProperty.PARTICIPANT.toString(), |
| instanceName); |
| |
| ZkBaseDataAccessor<ZNRecord> baseAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| |
| // check instanceConfig exists |
| if (!baseAccessor.exists(path, 0)) |
| { |
| throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName |
| + ", instance config does not exist"); |
| } |
| |
| // check resource exists |
| String idealStatePath = |
| PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName); |
| |
| ZNRecord idealStateRecord = null; |
| try |
| { |
| idealStateRecord = baseAccessor.get(idealStatePath, null, 0); |
| } |
| catch (ZkNoNodeException e) |
| { |
| // OK. |
| } |
| |
| // check resource exist. warn if not. |
| if (idealStateRecord == null) |
| { |
| // throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName |
| // + ", ideal state does not exist"); |
| logger.warn("Disable partitions: " + partitionNames + " but Cluster: " + clusterName |
| + ", resource: " + resourceName + " does not exists. probably disable it during ERROR->DROPPED transtition"); |
| |
| } else { |
| // check partitions exist. warn if not |
| IdealState idealState = new IdealState(idealStateRecord); |
| for (String partitionName : partitionNames) |
| { |
| if ((idealState.getIdealStateMode() == IdealStateModeProperty.AUTO && idealState.getPreferenceList(partitionName) == null) |
| || (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED && idealState.getInstanceStateMap(partitionName) == null)) |
| { |
| logger.warn("Cluster: " + clusterName + ", resource: " + resourceName |
| + ", partition: " + partitionName |
| + ", partition does not exist in ideal state"); |
| } |
| } |
| } |
| |
| // update participantConfig |
| // could not use ZNRecordUpdater since it doesn't do listField merge/subtract |
| baseAccessor.update(path, new DataUpdater<ZNRecord>() |
| { |
| @Override |
| public ZNRecord update(ZNRecord currentData) |
| { |
| if (currentData == null) |
| { |
| throw new HelixException("Cluster: " + clusterName + ", instance: " |
| + instanceName + ", participant config is null"); |
| } |
| |
| // TODO: merge with InstanceConfig.setInstanceEnabledForPartition |
| List<String> list = |
| currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString()); |
| Set<String> disabledPartitions = new HashSet<String>(); |
| if (list != null) |
| { |
| disabledPartitions.addAll(list); |
| } |
| |
| if (enabled) |
| { |
| disabledPartitions.removeAll(partitionNames); |
| } |
| else |
| { |
| disabledPartitions.addAll(partitionNames); |
| } |
| |
| list = new ArrayList<String>(disabledPartitions); |
| Collections.sort(list); |
| currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), |
| list); |
| return currentData; |
| } |
| }, |
| AccessOption.PERSISTENT); |
| } |
| |
| @Override |
| public void enableCluster(String clusterName, boolean enabled) |
| { |
| HelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| if (enabled) |
| { |
| accessor.removeProperty(keyBuilder.pause()); |
| } |
| else |
| { |
| accessor.createProperty(keyBuilder.pause(), new PauseSignal("pause")); |
| } |
| } |
| |
| @Override |
| public void resetPartition(String clusterName, |
| String instanceName, |
| String resourceName, |
| List<String> partitionNames) |
| { |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| // check the instance is alive |
| LiveInstance liveInstance = |
| accessor.getProperty(keyBuilder.liveInstance(instanceName)); |
| if (liveInstance == null) |
| { |
| throw new HelixException("Can't reset state for " + resourceName + "/" |
| + partitionNames + " on " + instanceName + ", because " + instanceName |
| + " is not alive"); |
| } |
| |
| // check resource group exists |
| IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName)); |
| if (idealState == null) |
| { |
| throw new HelixException("Can't reset state for " + resourceName + "/" |
| + partitionNames + " on " + instanceName + ", because " + resourceName |
| + " is not added"); |
| } |
| |
| // check partition exists in resource group |
| Set<String> resetPartitionNames = new HashSet<String>(partitionNames); |
| if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED) |
| { |
| Set<String> partitions = |
| new HashSet<String>(idealState.getRecord().getMapFields().keySet()); |
| if (!partitions.containsAll(resetPartitionNames)) |
| { |
| throw new HelixException("Can't reset state for " + resourceName + "/" |
| + partitionNames + " on " + instanceName + ", because not all " |
| + partitionNames + " exist"); |
| } |
| } |
| else |
| { |
| Set<String> partitions = |
| new HashSet<String>(idealState.getRecord().getListFields().keySet()); |
| if (!partitions.containsAll(resetPartitionNames)) |
| { |
| throw new HelixException("Can't reset state for " + resourceName + "/" |
| + partitionNames + " on " + instanceName + ", because not all " |
| + partitionNames + " exist"); |
| } |
| } |
| |
| // check partition is in ERROR state |
| String sessionId = liveInstance.getSessionId(); |
| CurrentState curState = |
| accessor.getProperty(keyBuilder.currentState(instanceName, |
| sessionId, |
| resourceName)); |
| for (String partitionName : resetPartitionNames) |
| { |
| if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) |
| { |
| throw new HelixException("Can't reset state for " + resourceName + "/" |
| + partitionNames + " on " + instanceName + ", because not all " |
| + partitionNames + " are in ERROR state"); |
| } |
| } |
| |
| // check stateModelDef exists and get initial state |
| String stateModelDef = idealState.getStateModelDefRef(); |
| StateModelDefinition stateModel = |
| accessor.getProperty(keyBuilder.stateModelDef(stateModelDef)); |
| if (stateModel == null) |
| { |
| throw new HelixException("Can't reset state for " + resourceName + "/" |
| + partitionNames + " on " + instanceName + ", because " + stateModelDef |
| + " is NOT found"); |
| } |
| |
| // check there is no pending messages for the partitions exist |
| List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName)); |
| for (Message message : messages) |
| { |
| if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType()) |
| || !sessionId.equals(message.getTgtSessionId()) |
| || !resourceName.equals(message.getResourceName()) |
| || !resetPartitionNames.contains(message.getPartitionName())) |
| { |
| continue; |
| } |
| |
| throw new HelixException("Can't reset state for " + resourceName + "/" |
| + partitionNames + " on " + instanceName |
| + ", because a pending message exists: " + message); |
| } |
| |
| String adminName = null; |
| try |
| { |
| adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN"; |
| } |
| catch (UnknownHostException e) |
| { |
| // can ignore it |
| logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e); |
| adminName = "UNKNOWN"; |
| } |
| |
| List<Message> resetMessages = new ArrayList<Message>(); |
| List<PropertyKey> messageKeys = new ArrayList<PropertyKey>(); |
| for (String partitionName : resetPartitionNames) |
| { |
| // send ERROR to initialState message |
| String msgId = UUID.randomUUID().toString(); |
| Message message = new Message(MessageType.STATE_TRANSITION, msgId); |
| message.setSrcName(adminName); |
| message.setTgtName(instanceName); |
| message.setMsgState(MessageState.NEW); |
| message.setPartitionName(partitionName); |
| message.setResourceName(resourceName); |
| message.setTgtSessionId(sessionId); |
| message.setStateModelDef(stateModelDef); |
| message.setFromState(HelixDefinedState.ERROR.toString()); |
| message.setToState(stateModel.getInitialState()); |
| message.setStateModelFactoryName(idealState.getStateModelFactoryName()); |
| |
| resetMessages.add(message); |
| messageKeys.add(keyBuilder.message(instanceName, message.getId())); |
| } |
| |
| accessor.setChildren(messageKeys, resetMessages); |
| } |
| |
| @Override |
| public void resetInstance(String clusterName, List<String> instanceNames) |
| { |
| // TODO: not mp-safe |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews()); |
| |
| Set<String> resetInstanceNames = new HashSet<String>(instanceNames); |
| for (String instanceName : resetInstanceNames) |
| { |
| List<String> resetPartitionNames = new ArrayList<String>(); |
| for (ExternalView extView : extViews) |
| { |
| Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields(); |
| for (String partitionName : stateMap.keySet()) |
| { |
| Map<String, String> instanceStateMap = stateMap.get(partitionName); |
| |
| if (instanceStateMap.containsKey(instanceName) |
| && instanceStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString())) |
| { |
| resetPartitionNames.add(partitionName); |
| } |
| } |
| resetPartition(clusterName, |
| instanceName, |
| extView.getResourceName(), |
| resetPartitionNames); |
| } |
| } |
| } |
| |
| @Override |
| public void resetResource(String clusterName, List<String> resourceNames) |
| { |
| // TODO: not mp-safe |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews()); |
| |
| Set<String> resetResourceNames = new HashSet<String>(resourceNames); |
| for (ExternalView extView : extViews) |
| { |
| if (!resetResourceNames.contains(extView.getResourceName())) |
| { |
| continue; |
| } |
| |
| // instanceName -> list of resetPartitionNames |
| Map<String, List<String>> resetPartitionNames = new HashMap<String, List<String>>(); |
| |
| Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields(); |
| for (String partitionName : stateMap.keySet()) |
| { |
| Map<String, String> instanceStateMap = stateMap.get(partitionName); |
| for (String instanceName : instanceStateMap.keySet()) |
| { |
| if (instanceStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString())) |
| { |
| if (!resetPartitionNames.containsKey(instanceName)) |
| { |
| resetPartitionNames.put(instanceName, new ArrayList<String>()); |
| } |
| resetPartitionNames.get(instanceName).add(partitionName); |
| } |
| } |
| } |
| |
| for (String instanceName : resetPartitionNames.keySet()) |
| { |
| resetPartition(clusterName, |
| instanceName, |
| extView.getResourceName(), |
| resetPartitionNames.get(instanceName)); |
| } |
| } |
| } |
| |
| @Override |
| public boolean addCluster(String clusterName){ |
| return addCluster(clusterName, false); |
| } |
| |
| @Override |
| public boolean addCluster(String clusterName, boolean recreateIfExists) |
| { |
| String root = "/" + clusterName; |
| |
| if (_zkClient.exists(root)) |
| { |
| if (recreateIfExists) |
| { |
| logger.warn("Root directory exists.Cleaning the root directory:" + root); |
| _zkClient.deleteRecursive(root); |
| } |
| else |
| { |
| logger.info("Cluster " + clusterName + " already exists"); |
| return true; |
| } |
| } |
| try |
| { |
| _zkClient.createPersistent(root, true); |
| } |
| catch (Exception e) |
| { |
| //some other process might have created the cluster |
| if(_zkClient.exists(root)){ |
| return true; |
| } |
| logger.error("Error creating cluster:"+ clusterName,e); |
| return false; |
| } |
| try |
| { |
| createZKPaths(clusterName); |
| } |
| catch (Exception e) |
| { |
| logger.error("Error creating cluster:"+ clusterName,e); |
| return false; |
| } |
| logger.info("Created cluster:"+ clusterName); |
| return true; |
| } |
| |
| private void createZKPaths(String clusterName){ |
| String path; |
| |
| // IDEAL STATE |
| _zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName)); |
| // CONFIGURATIONS |
| // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName)); |
| path = |
| PropertyPathConfig.getPath(PropertyType.CONFIGS, |
| clusterName, |
| ConfigScopeProperty.CLUSTER.toString(), |
| clusterName); |
| _zkClient.createPersistent(path, true); |
| _zkClient.writeData(path, new ZNRecord(clusterName)); |
| path = |
| PropertyPathConfig.getPath(PropertyType.CONFIGS, |
| clusterName, |
| ConfigScopeProperty.PARTICIPANT.toString()); |
| _zkClient.createPersistent(path); |
| path = |
| PropertyPathConfig.getPath(PropertyType.CONFIGS, |
| clusterName, |
| ConfigScopeProperty.RESOURCE.toString()); |
| _zkClient.createPersistent(path); |
| // PROPERTY STORE |
| path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName); |
| _zkClient.createPersistent(path); |
| // LIVE INSTANCES |
| _zkClient.createPersistent(HelixUtil.getLiveInstancesPath(clusterName)); |
| // MEMBER INSTANCES |
| _zkClient.createPersistent(HelixUtil.getMemberInstancesPath(clusterName)); |
| // External view |
| _zkClient.createPersistent(HelixUtil.getExternalViewPath(clusterName)); |
| // State model definition |
| _zkClient.createPersistent(HelixUtil.getStateModelDefinitionPath(clusterName)); |
| |
| // controller |
| _zkClient.createPersistent(HelixUtil.getControllerPath(clusterName)); |
| path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName); |
| final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString()); |
| final List<String> emptyList = new ArrayList<String>(); |
| emptyHistory.setListField(clusterName, emptyList); |
| _zkClient.createPersistent(path, emptyHistory); |
| |
| path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName); |
| _zkClient.createPersistent(path); |
| |
| path = PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName); |
| _zkClient.createPersistent(path); |
| |
| path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName); |
| _zkClient.createPersistent(path); |
| } |
| |
| @Override |
| public List<String> getInstancesInCluster(String clusterName) |
| { |
| String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName); |
| return _zkClient.getChildren(memberInstancesPath); |
| } |
| |
| @Override |
| public List<String> getInstancesInClusterWithTag(String clusterName, String tag) |
| { |
| String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName); |
| List<String> instances = _zkClient.getChildren(memberInstancesPath); |
| List<String> result = new ArrayList<String>(); |
| |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| for(String instanceName : instances) |
| { |
| InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); |
| if(config.containsTag(tag)) |
| { |
| result.add(instanceName); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public void addResource(String clusterName, |
| String resourceName, |
| int partitions, |
| String stateModelRef) |
| { |
| addResource(clusterName, |
| resourceName, |
| partitions, |
| stateModelRef, |
| IdealStateModeProperty.AUTO.toString(), |
| 0); |
| } |
| |
| @Override |
| public void addResource(String clusterName, |
| String resourceName, |
| int partitions, |
| String stateModelRef, |
| String idealStateMode) |
| { |
| addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode, 0); |
| } |
| |
| @Override |
| public void addResource(String clusterName, |
| String resourceName, |
| IdealState idealstate) |
| { |
| String stateModelRef = idealstate.getStateModelDefRef(); |
| String stateModelDefPath = |
| PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, |
| clusterName, |
| stateModelRef); |
| if (!_zkClient.exists(stateModelDefPath)) |
| { |
| throw new HelixException("State model " + stateModelRef |
| + " not found in the cluster STATEMODELDEFS path"); |
| } |
| |
| String idealStatePath = HelixUtil.getIdealStatePath(clusterName); |
| String resourceIdealStatePath = idealStatePath + "/" + resourceName; |
| if (_zkClient.exists(resourceIdealStatePath)) |
| { |
| throw new HelixException("Skip the operation. Resource ideal state directory already exists:" |
| + resourceIdealStatePath); |
| } |
| |
| ZKUtil.createChildren(_zkClient, idealStatePath, idealstate.getRecord()); |
| } |
| |
| @Override |
| public void addResource(String clusterName, |
| String resourceName, |
| int partitions, |
| String stateModelRef, |
| String idealStateMode, |
| int bucketSize) |
| { |
| addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode, |
| bucketSize, -1); |
| |
| } |
| |
| @Override |
| public void addResource(String clusterName, String resourceName, |
| int partitions, String stateModelRef, String idealStateMode, |
| int bucketSize, int maxPartitionsPerInstance) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| |
| IdealStateModeProperty mode = IdealStateModeProperty.AUTO; |
| try |
| { |
| mode = IdealStateModeProperty.valueOf(idealStateMode); |
| } |
| catch (Exception e) |
| { |
| logger.error("", e); |
| } |
| IdealState idealState = new IdealState(resourceName); |
| idealState.setNumPartitions(partitions); |
| idealState.setStateModelDefRef(stateModelRef); |
| idealState.setIdealStateMode(mode.toString()); |
| idealState.setReplicas("" + 0); |
| idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY); |
| if(maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) |
| { |
| idealState.setMaxPartitionsPerInstance(maxPartitionsPerInstance); |
| } |
| if (bucketSize > 0) |
| { |
| idealState.setBucketSize(bucketSize); |
| } |
| addResource(clusterName, resourceName, idealState); |
| } |
| |
| @Override |
| public List<String> getClusters() |
| { |
| List<String> zkToplevelPathes = _zkClient.getChildren("/"); |
| List<String> result = new ArrayList<String>(); |
| for (String pathName : zkToplevelPathes) |
| { |
| if (ZKUtil.isClusterSetup(pathName, _zkClient)) |
| { |
| result.add(pathName); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public List<String> getResourcesInCluster(String clusterName) |
| { |
| return _zkClient.getChildren(HelixUtil.getIdealStatePath(clusterName)); |
| } |
| |
| @Override |
| public IdealState getResourceIdealState(String clusterName, String resourceName) |
| { |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| return accessor.getProperty(keyBuilder.idealStates(resourceName)); |
| } |
| |
| @Override |
| public void setResourceIdealState(String clusterName, |
| String resourceName, |
| IdealState idealState) |
| { |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| accessor.setProperty(keyBuilder.idealStates(resourceName), idealState); |
| } |
| |
| @Override |
| public ExternalView getResourceExternalView(String clusterName, String resourceName) |
| { |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| return accessor.getProperty(keyBuilder.externalView(resourceName)); |
| } |
| |
| @Override |
| public void addStateModelDef(String clusterName, |
| String stateModelDef, |
| StateModelDefinition stateModel) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName); |
| String stateModelPath = stateModelDefPath + "/" + stateModelDef; |
| if (_zkClient.exists(stateModelPath)) |
| { |
| logger.warn("Skip the operation.State Model directory exists:" + stateModelPath); |
| throw new HelixException("State model path " + stateModelPath + " already exists."); |
| } |
| |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel); |
| } |
| |
| @Override |
| public void dropResource(String clusterName, String resourceName) |
| { |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| accessor.removeProperty(keyBuilder.idealStates(resourceName)); |
| accessor.removeProperty(keyBuilder.resourceConfig(resourceName)); |
| } |
| |
| @Override |
| public List<String> getStateModelDefs(String clusterName) |
| { |
| return _zkClient.getChildren(HelixUtil.getStateModelDefinitionPath(clusterName)); |
| } |
| |
| @Override |
| public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) |
| { |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| return accessor.getProperty(keyBuilder.stateModelDef(stateModelName)); |
| } |
| |
| @Override |
| public void addStat(String clusterName, final String statName) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| |
| String persistentStatsPath = |
| PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName); |
| ZkBaseDataAccessor<ZNRecord> baseAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| |
| baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() |
| { |
| |
| @Override |
| public ZNRecord update(ZNRecord statsRec) |
| { |
| if (statsRec == null) |
| { |
| // TODO: fix naming of this record, if it matters |
| statsRec = new ZNRecord(PersistentStats.nodeName); |
| } |
| |
| Map<String, Map<String, String>> currStatMap = statsRec.getMapFields(); |
| Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName); |
| for (String newStat : newStatMap.keySet()) |
| { |
| if (!currStatMap.containsKey(newStat)) |
| { |
| currStatMap.put(newStat, newStatMap.get(newStat)); |
| } |
| } |
| statsRec.setMapFields(currStatMap); |
| |
| return statsRec; |
| } |
| }, AccessOption.PERSISTENT); |
| } |
| |
| @Override |
| public void addAlert(final String clusterName, final String alertName) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| |
| ZkBaseDataAccessor<ZNRecord> baseAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| |
| String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName); |
| |
| baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() |
| { |
| |
| @Override |
| public ZNRecord update(ZNRecord alertsRec) |
| { |
| if (alertsRec == null) |
| { |
| // TODO: fix naming of this record, if it matters |
| alertsRec = new ZNRecord(Alerts.nodeName); |
| |
| } |
| |
| Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields(); |
| StringBuilder newStatName = new StringBuilder(); |
| Map<String, String> newAlertMap = new HashMap<String, String>(); |
| |
| // use AlertsHolder to get map of new stats and map for this alert |
| AlertsHolder.parseAlert(alertName, newStatName, newAlertMap); |
| |
| // add stat |
| addStat(clusterName, newStatName.toString()); |
| |
| // add alert |
| currAlertMap.put(alertName, newAlertMap); |
| |
| alertsRec.setMapFields(currAlertMap); |
| |
| return alertsRec; |
| } |
| }, AccessOption.PERSISTENT); |
| } |
| |
| @Override |
| public void dropCluster(String clusterName) |
| { |
| logger.info("Deleting cluster " + clusterName); |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| String root = "/" + clusterName; |
| if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0) |
| { |
| throw new HelixException("There are still live instances in the cluster, shut them down first."); |
| } |
| |
| if (accessor.getProperty(keyBuilder.controllerLeader()) != null) |
| { |
| throw new HelixException("There are still LEADER in the cluster, shut them down first."); |
| } |
| |
| _zkClient.deleteRecursive(root); |
| } |
| |
| @Override |
| public void dropStat(String clusterName, final String statName) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| |
| String persistentStatsPath = |
| PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName); |
| ZkBaseDataAccessor<ZNRecord> baseAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| |
| baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() |
| { |
| |
| @Override |
| public ZNRecord update(ZNRecord statsRec) |
| { |
| if (statsRec == null) |
| { |
| throw new HelixException("No stats record in ZK, nothing to drop"); |
| } |
| |
| Map<String, Map<String, String>> currStatMap = statsRec.getMapFields(); |
| Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName); |
| |
| // delete each stat from stat map |
| for (String newStat : newStatMap.keySet()) |
| { |
| if (currStatMap.containsKey(newStat)) |
| { |
| currStatMap.remove(newStat); |
| } |
| } |
| statsRec.setMapFields(currStatMap); |
| |
| return statsRec; |
| } |
| }, AccessOption.PERSISTENT); |
| } |
| |
| @Override |
| public void dropAlert(String clusterName, final String alertName) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| |
| String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName); |
| |
| ZkBaseDataAccessor<ZNRecord> baseAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| |
| if (!baseAccessor.exists(alertsPath, 0)) |
| { |
| throw new HelixException("No alerts node in ZK, nothing to drop"); |
| } |
| |
| baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() |
| { |
| @Override |
| public ZNRecord update(ZNRecord alertsRec) |
| { |
| if (alertsRec == null) |
| { |
| throw new HelixException("No alerts record in ZK, nothing to drop"); |
| } |
| |
| Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields(); |
| currAlertMap.remove(alertName); |
| alertsRec.setMapFields(currAlertMap); |
| |
| return alertsRec; |
| } |
| }, AccessOption.PERSISTENT); |
| } |
| |
| @Override |
| public void addClusterToGrandCluster(String clusterName, String grandCluster) |
| { |
| if (!ZKUtil.isClusterSetup(grandCluster, _zkClient)) |
| { |
| throw new HelixException("Grand cluster " + grandCluster + " is not setup yet"); |
| } |
| |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("Cluster " + clusterName + " is not setup yet"); |
| } |
| |
| IdealState idealState = new IdealState(clusterName); |
| |
| idealState.setNumPartitions(1); |
| idealState.setStateModelDefRef("LeaderStandby"); |
| |
| List<String> controllers = getInstancesInCluster(grandCluster); |
| if (controllers.size() == 0) |
| { |
| throw new HelixException("Grand cluster " + grandCluster + " has no instances"); |
| } |
| idealState.setReplicas(Integer.toString(controllers.size())); |
| Collections.shuffle(controllers); |
| idealState.getRecord().setListField(clusterName, controllers); |
| idealState.setPartitionState(clusterName, controllers.get(0), "LEADER"); |
| for (int i = 1; i < controllers.size(); i++) |
| { |
| idealState.setPartitionState(clusterName, controllers.get(i), "STANDBY"); |
| } |
| |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState); |
| } |
| |
| @Override |
| public void setConfig(HelixConfigScope scope, Map<String, String> properties) |
| { |
| _configAccessor.set(scope, properties); |
| } |
| |
| @Override |
| public Map<String, String> getConfig(HelixConfigScope scope, List<String> keys) |
| { |
| return _configAccessor.get(scope, keys); |
| } |
| |
| @Override |
| public List<String> getConfigKeys(HelixConfigScope scope) |
| { |
| return _configAccessor.getKeys(scope); |
| } |
| |
| @Override |
| public void removeConfig(HelixConfigScope scope, List<String> keys) |
| { |
| _configAccessor.remove(scope, keys); |
| } |
| |
| @Override |
| public void rebalance(String clusterName, String resourceName, int replica) |
| { |
| rebalance(clusterName, resourceName, replica, resourceName, ""); |
| } |
| |
| @Override |
| public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix, String group) |
| { |
| List<String> instanceNames = new LinkedList<String>(); |
| if(keyPrefix == null || keyPrefix.length() == 0) |
| { |
| keyPrefix = resourceName; |
| } |
| if(group != null && group.length() > 0) |
| { |
| instanceNames = getInstancesInClusterWithTag(clusterName, group); |
| } |
| if(instanceNames.size() == 0) |
| { |
| logger.info("No tags found for resource " + resourceName + ", use all instances"); |
| instanceNames = getInstancesInCluster(clusterName); |
| group = ""; |
| } |
| else |
| { |
| logger.info("Found instances with tag for " + resourceName + " " + instanceNames); |
| } |
| rebalance(clusterName, resourceName, replica, keyPrefix, instanceNames, group); |
| } |
| |
| @Override |
| public void rebalance(String clusterName, String resourceName, int replica, List<String> instances) |
| { |
| rebalance(clusterName, resourceName, replica, resourceName, instances, ""); |
| } |
| |
| |
| void rebalance(String clusterName, |
| String resourceName, |
| int replica, |
| String keyPrefix, |
| List<String> instanceNames, |
| String groupId) |
| { |
| // ensure we get the same idealState with the same set of instances |
| Collections.sort(instanceNames); |
| |
| IdealState idealState = getResourceIdealState(clusterName, resourceName); |
| if (idealState == null) |
| { |
| throw new HelixException("Resource: " + resourceName + " has NOT been added yet"); |
| } |
| |
| if(groupId != null && groupId.length() > 0) |
| { |
| idealState.setInstanceGroupTag(groupId); |
| } |
| idealState.setReplicas(Integer.toString(replica)); |
| int partitions = idealState.getNumPartitions(); |
| String stateModelName = idealState.getStateModelDefRef(); |
| StateModelDefinition stateModDef = getStateModelDef(clusterName, stateModelName); |
| |
| if (stateModDef == null) |
| { |
| throw new HelixException("cannot find state model: " + stateModelName); |
| } |
| // StateModelDefinition def = new StateModelDefinition(stateModDef); |
| |
| List<String> statePriorityList = stateModDef.getStatesPriorityList(); |
| |
| String masterStateValue = null; |
| String slaveStateValue = null; |
| replica--; |
| |
| for (String state : statePriorityList) |
| { |
| String count = stateModDef.getNumInstancesPerState(state); |
| if (count.equals("1")) |
| { |
| if (masterStateValue != null) |
| { |
| throw new HelixException("Invalid or unsupported state model definition"); |
| } |
| masterStateValue = state; |
| } |
| else if (count.equalsIgnoreCase("R")) |
| { |
| if (slaveStateValue != null) |
| { |
| throw new HelixException("Invalid or unsupported state model definition"); |
| } |
| slaveStateValue = state; |
| } |
| else if (count.equalsIgnoreCase("N")) |
| { |
| if (!(masterStateValue == null && slaveStateValue == null)) |
| { |
| throw new HelixException("Invalid or unsupported state model definition"); |
| } |
| replica = instanceNames.size() - 1; |
| masterStateValue = slaveStateValue = state; |
| } |
| } |
| if (masterStateValue == null && slaveStateValue == null) |
| { |
| throw new HelixException("Invalid or unsupported state model definition"); |
| } |
| |
| if (masterStateValue == null) |
| { |
| masterStateValue = slaveStateValue; |
| } |
| if (idealState.getIdealStateMode() != IdealStateModeProperty.AUTO_REBALANCE) |
| { |
| ZNRecord newIdealState = |
| DefaultIdealStateCalculator.calculateIdealState(instanceNames, |
| partitions, |
| replica, |
| keyPrefix, |
| masterStateValue, |
| slaveStateValue); |
| |
| // for now keep mapField in AUTO mode and remove listField in CUSTOMIZED mode |
| if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO) |
| { |
| idealState.getRecord().setListFields(newIdealState.getListFields()); |
| idealState.getRecord().setMapFields(newIdealState.getMapFields()); |
| } |
| if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED) |
| { |
| idealState.getRecord().setMapFields(newIdealState.getMapFields()); |
| } |
| } |
| else |
| { |
| for (int i = 0; i < partitions; i++) |
| { |
| String partitionName = keyPrefix + "_" + i; |
| idealState.getRecord().setMapField(partitionName, new HashMap<String, String>()); |
| idealState.getRecord().setListField(partitionName, new ArrayList<String>()); |
| } |
| } |
| setResourceIdealState(clusterName, resourceName, idealState); |
| } |
| |
| @Override |
| public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException |
| { |
| ZNRecord idealStateRecord = |
| (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateFile))); |
| if (idealStateRecord.getId() == null |
| || !idealStateRecord.getId().equals(resourceName)) |
| { |
| throw new IllegalArgumentException("ideal state must have same id as resource name"); |
| } |
| setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord)); |
| } |
| |
| private static byte[] readFile(String filePath) throws IOException |
| { |
| File file = new File(filePath); |
| |
| int size = (int) file.length(); |
| byte[] bytes = new byte[size]; |
| DataInputStream dis = new DataInputStream(new FileInputStream(file)); |
| int read = 0; |
| int numRead = 0; |
| while (read < bytes.length |
| && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) |
| { |
| read = read + numRead; |
| } |
| return bytes; |
| } |
| |
| public void addStateModelDef(String clusterName, |
| String stateModelDefName, |
| String stateModelDefFile) throws IOException |
| { |
| ZNRecord record = |
| (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile))); |
| if (record == null || record.getId() == null |
| || !record.getId().equals(stateModelDefName)) |
| { |
| throw new IllegalArgumentException("state model definition must have same id as state model def name"); |
| } |
| addStateModelDef(clusterName, stateModelDefName, new StateModelDefinition(record)); |
| |
| } |
| |
| @Override |
| public void setConstraint(String clusterName, |
| final ConstraintType constraintType, |
| final String constraintId, |
| final ConstraintItem constraintItem) |
| { |
| ZkBaseDataAccessor<ZNRecord> baseAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| |
| Builder keyBuilder = new Builder(clusterName); |
| String path = keyBuilder.constraint(constraintType.toString()).getPath(); |
| |
| baseAccessor.update(path, new DataUpdater<ZNRecord>() |
| { |
| @Override |
| public ZNRecord update(ZNRecord currentData) |
| { |
| ClusterConstraints constraints = currentData == null? |
| new ClusterConstraints(constraintType) : new ClusterConstraints(currentData); |
| |
| constraints.addConstraintItem(constraintId, constraintItem); |
| return constraints.getRecord(); |
| } |
| }, AccessOption.PERSISTENT); |
| } |
| |
| @Override |
| public void removeConstraint(String clusterName, |
| final ConstraintType constraintType, |
| final String constraintId) |
| { |
| ZkBaseDataAccessor<ZNRecord> baseAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| |
| Builder keyBuilder = new Builder(clusterName); |
| String path = keyBuilder.constraint(constraintType.toString()).getPath(); |
| |
| baseAccessor.update(path, new DataUpdater<ZNRecord>() |
| { |
| @Override |
| public ZNRecord update(ZNRecord currentData) |
| { |
| if (currentData != null) { |
| ClusterConstraints constraints = new ClusterConstraints(currentData); |
| |
| constraints.removeConstraintItem(constraintId); |
| return constraints.getRecord(); |
| } |
| return null; |
| } |
| }, AccessOption.PERSISTENT); |
| } |
| |
| @Override |
| public ClusterConstraints getConstraints(String clusterName, ConstraintType constraintType) |
| { |
| HelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| |
| Builder keyBuilder = new Builder(clusterName); |
| return accessor.getProperty(keyBuilder.constraint(constraintType.toString())); |
| } |
| |
| /** |
| * Takes the existing idealstate as input and computes newIdealState such that |
| * the partition movement is minimized. The partitions are redistributed among the instances provided. |
| * @param clusterName |
| * @param currentIdealState |
| * @param instanceNames |
| * @return |
| */ |
| @Override |
| public void rebalance(String clusterName, |
| IdealState currentIdealState, |
| List<String> instanceNames) |
| { |
| Set<String> activeInstances = new HashSet<String>(); |
| for (String partition : currentIdealState.getPartitionSet()) |
| { |
| activeInstances.addAll(currentIdealState.getRecord().getListField(partition)); |
| } |
| instanceNames.removeAll(activeInstances); |
| Map<String, Object> previousIdealState = RebalanceUtil.buildInternalIdealState(currentIdealState); |
| |
| Map<String, Object> balancedRecord = |
| DefaultIdealStateCalculator.calculateNextIdealState(instanceNames, |
| previousIdealState); |
| StateModelDefinition stateModDef = |
| this.getStateModelDef(clusterName, currentIdealState.getStateModelDefRef()); |
| |
| if (stateModDef == null) |
| { |
| throw new HelixException("cannot find state model: " + currentIdealState.getStateModelDefRef()); |
| } |
| String[] states = RebalanceUtil.parseStates(clusterName, stateModDef); |
| |
| ZNRecord newIdealStateRecord = |
| DefaultIdealStateCalculator.convertToZNRecord(balancedRecord, |
| currentIdealState.getResourceName(), |
| states[0], |
| states[1]); |
| Set<String> partitionSet = new HashSet<String>(); |
| partitionSet.addAll(newIdealStateRecord.getMapFields().keySet()); |
| partitionSet.addAll(newIdealStateRecord.getListFields().keySet()); |
| |
| Map<String, String> reversePartitionIndex = |
| (Map<String, String>) balancedRecord.get("reversePartitionIndex"); |
| for (String partition : partitionSet) |
| { |
| if (reversePartitionIndex.containsKey(partition)) |
| { |
| String originPartitionName = reversePartitionIndex.get(partition); |
| if (partition.equals(originPartitionName)) |
| { |
| continue; |
| } |
| newIdealStateRecord.getMapFields() |
| .put(originPartitionName, |
| newIdealStateRecord.getMapField(partition)); |
| newIdealStateRecord.getMapFields().remove(partition); |
| |
| newIdealStateRecord.getListFields() |
| .put(originPartitionName, |
| newIdealStateRecord.getListField(partition)); |
| newIdealStateRecord.getListFields().remove(partition); |
| } |
| } |
| |
| newIdealStateRecord.getSimpleFields() |
| .putAll(currentIdealState.getRecord().getSimpleFields()); |
| IdealState newIdealState = new IdealState(newIdealStateRecord); |
| setResourceIdealState(clusterName, newIdealStateRecord.getId(), newIdealState); |
| } |
| |
| @Override |
| public void addInstanceTag(String clusterName, String instanceName, String tag) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| |
| if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) |
| { |
| throw new HelixException("cluster " + clusterName + " instance "+instanceName +" is not setup yet"); |
| } |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); |
| config.addTag(tag); |
| accessor.setProperty(keyBuilder.instanceConfig(instanceName), config); |
| } |
| |
| @Override |
| public void removeInstanceTag(String clusterName, String instanceName, |
| String tag) |
| { |
| if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) |
| { |
| throw new HelixException("cluster " + clusterName + " is not setup yet"); |
| } |
| |
| if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) |
| { |
| throw new HelixException("cluster " + clusterName + " instance "+instanceName +" is not setup yet"); |
| } |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); |
| config.removeTag(tag); |
| accessor.setProperty(keyBuilder.instanceConfig(instanceName), config); |
| } |
| |
| public void close() |
| { |
| if(_zkClient!=null) |
| { |
| _zkClient.close(); |
| } |
| } |
| |
| } |