| package org.apache.helix.tools; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.DataInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.ObjectReader; |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.GnuParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.OptionBuilder; |
| import org.apache.commons.cli.OptionGroup; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixConstants; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.SystemPropertyKeys; |
| import org.apache.helix.cloud.azure.AzureConstants; |
| import org.apache.helix.cloud.constants.CloudProvider; |
| import org.apache.helix.manager.zk.GenericZkHelixApiBuilder; |
| import org.apache.helix.manager.zk.ZKHelixAdmin; |
| import org.apache.helix.manager.zk.ZKHelixDataAccessor; |
| import org.apache.helix.manager.zk.ZkBaseDataAccessor; |
| import org.apache.helix.model.BuiltInStateModelDefinitions; |
| import org.apache.helix.model.CloudConfig; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.ClusterConstraints; |
| import org.apache.helix.model.ClusterConstraints.ConstraintType; |
| import org.apache.helix.model.ConstraintItem; |
| import org.apache.helix.model.ExternalView; |
| import org.apache.helix.model.HelixConfigScope; |
| import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.IdealState.RebalanceMode; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.LiveInstance; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.model.builder.ConstraintItemBuilder; |
| import org.apache.helix.model.builder.HelixConfigScopeBuilder; |
| import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; |
| import org.apache.helix.util.HelixUtil; |
| import org.apache.helix.zookeeper.api.client.HelixZkClient; |
| import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; |
| import org.apache.helix.zookeeper.impl.client.FederatedZkClient; |
| import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; |
| import org.apache.helix.zookeeper.introspect.CodehausJacksonIntrospector; |
| import org.apache.helix.zookeeper.zkclient.DataUpdater; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ClusterSetup { |
| private static Logger logger = LoggerFactory.getLogger(ClusterSetup.class); |
| public static final String zkServerAddress = "zkSvr"; |
| |
| // List info about the cluster / resource / Instances |
| public static final String listClusters = "listClusters"; |
| public static final String listResources = "listResources"; |
| public static final String listInstances = "listInstances"; |
| |
| // Add, drop, and rebalance |
| public static final String addCluster = "addCluster"; |
| public static final String activateCluster = "activateCluster"; |
| public static final String dropCluster = "dropCluster"; |
| public static final String dropResource = "dropResource"; |
| public static final String addInstance = "addNode"; |
| public static final String addResource = "addResource"; |
| public static final String addStateModelDef = "addStateModelDef"; |
| public static final String addIdealState = "addIdealState"; |
| public static final String swapInstance = "swapInstance"; |
| public static final String dropInstance = "dropNode"; |
| public static final String rebalance = "rebalance"; |
| public static final String expandCluster = "expandCluster"; |
| public static final String expandResource = "expandResource"; |
| public static final String mode = "mode"; |
| public static final String tag = "tag"; |
| public static final String instanceGroupTag = "instanceGroupTag"; |
| public static final String bucketSize = "bucketSize"; |
| public static final String resourceKeyPrefix = "key"; |
| public static final String maxPartitionsPerNode = "maxPartitionsPerNode"; |
| |
| public static final String addResourceProperty = "addResourceProperty"; |
| public static final String removeResourceProperty = "removeResourceProperty"; |
| |
| public static final String addInstanceTag = "addInstanceTag"; |
| public static final String removeInstanceTag = "removeInstanceTag"; |
| |
| public static final String enableResource = "enableResource"; |
| |
| // Query info (TBD in V2) |
| public static final String listClusterInfo = "listClusterInfo"; |
| public static final String listInstanceInfo = "listInstanceInfo"; |
| public static final String listResourceInfo = "listResourceInfo"; |
| public static final String listPartitionInfo = "listPartitionInfo"; |
| public static final String listStateModels = "listStateModels"; |
| public static final String listStateModel = "listStateModel"; |
| |
| // enable/disable/reset instances/cluster/resource/partition |
| public static final String enableInstance = "enableInstance"; |
| public static final String enablePartition = "enablePartition"; |
| public static final String enableCluster = "enableCluster"; |
| public static final String resetPartition = "resetPartition"; |
| public static final String resetInstance = "resetInstance"; |
| public static final String resetResource = "resetResource"; |
| |
| // help |
| public static final String help = "help"; |
| |
| // get/set/remove configs |
| public static final String getConfig = "getConfig"; |
| public static final String setConfig = "setConfig"; |
| public static final String removeConfig = "removeConfig"; |
| |
| // set/remove cloud configs |
| public static final String setCloudConfig = "setCloudConfig"; |
| public static final String removeCloudConfig = "removeCloudConfig"; |
| |
| |
| // get/set/remove constraints |
| public static final String getConstraints = "getConstraints"; |
| public static final String setConstraint = "setConstraint"; |
| public static final String removeConstraint = "removeConstraint"; |
| |
| private static final Logger _logger = LoggerFactory.getLogger(ClusterSetup.class); |
| private final RealmAwareZkClient _zkClient; |
| // true if ZkBaseDataAccessor was instantiated with a RealmAwareZkClient, false otherwise |
| // This is used for close() to determine how ZkBaseDataAccessor should close the underlying |
| // ZkClient |
| private final boolean _usesExternalZkClient; |
| private final HelixAdmin _admin; |
| |
| protected static ObjectReader ZNRECORD_READER = new ObjectMapper() |
| .setAnnotationIntrospector(new CodehausJacksonIntrospector()) |
| .readerFor(ZNRecord.class); |
| |
| @Deprecated |
| public ClusterSetup(String zkServerAddress) { |
| // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode |
| if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkServerAddress == null) { |
| try { |
| _zkClient = new FederatedZkClient( |
| new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), |
| new RealmAwareZkClient.RealmAwareZkClientConfig() |
| .setZkSerializer(new ZNRecordSerializer())); |
| } catch (InvalidRoutingDataException | IllegalStateException e) { |
| throw new HelixException("Failed to create ConfigAccessor!", e); |
| } |
| } else { |
| _zkClient = SharedZkClientFactory.getInstance() |
| .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkServerAddress)); |
| _zkClient.setZkSerializer(new ZNRecordSerializer()); |
| } |
| |
| _admin = new ZKHelixAdmin(_zkClient); |
| _usesExternalZkClient = false; |
| } |
| |
| @Deprecated |
| public ClusterSetup(RealmAwareZkClient zkClient) { |
| _zkClient = zkClient; |
| _admin = new ZKHelixAdmin(_zkClient); |
| _usesExternalZkClient = true; |
| } |
| |
| @Deprecated |
| public ClusterSetup(RealmAwareZkClient zkClient, HelixAdmin zkHelixAdmin) { |
| _zkClient = zkClient; |
| _admin = zkHelixAdmin; |
| _usesExternalZkClient = true; |
| } |
| |
| private ClusterSetup(RealmAwareZkClient zkClient, boolean usesExternalZkClient) { |
| _zkClient = zkClient; |
| _admin = new ZKHelixAdmin(_zkClient); |
| _usesExternalZkClient = usesExternalZkClient; |
| } |
| |
| /** |
| * Closes any stateful resources in ClusterSetup. |
| */ |
| public void close() { |
| if (_zkClient != null && !_usesExternalZkClient) { |
| _admin.close(); |
| _zkClient.close(); |
| } |
| } |
| |
| @Override |
| public void finalize() { |
| close(); |
| } |
| |
| public void addCluster(String clusterName, boolean overwritePrevious, CloudConfig cloudConfig) |
| throws HelixException { |
| _admin.addCluster(clusterName, overwritePrevious); |
| for (BuiltInStateModelDefinitions def : BuiltInStateModelDefinitions.values()) { |
| addStateModelDef(clusterName, def.getStateModelDefinition().getId(), |
| def.getStateModelDefinition(), overwritePrevious); |
| } |
| |
| if (cloudConfig != null) { |
| _admin.addCloudConfig(clusterName, cloudConfig); |
| // If cloud is enabled and Cloud Provider is Azure, populated the Topology information in cluster config |
| if (cloudConfig.isCloudEnabled() |
| && cloudConfig.getCloudProvider().equals(CloudProvider.AZURE.name())) { |
| ConfigAccessor configAccessor = new ConfigAccessor(_zkClient); |
| ClusterConfig clusterConfig = new ClusterConfig(clusterName); |
| clusterConfig.setTopology(AzureConstants.AZURE_TOPOLOGY); |
| clusterConfig.setTopologyAwareEnabled(true); |
| clusterConfig.setFaultZoneType(AzureConstants.AZURE_FAULT_ZONE_TYPE); |
| configAccessor.updateClusterConfig(clusterName, clusterConfig); |
| } |
| } |
| } |
| |
| public void addCluster(String clusterName, boolean overwritePrevious) { |
| addCluster(clusterName, overwritePrevious, null); |
| } |
| |
| public void activateCluster(String clusterName, String grandCluster, boolean enable) { |
| if (enable) { |
| _admin.addClusterToGrandCluster(clusterName, grandCluster); |
| } else { |
| _admin.dropResource(grandCluster, clusterName); |
| } |
| } |
| |
| public void deleteCluster(String clusterName) { |
| _admin.dropCluster(clusterName); |
| } |
| |
| public void addInstancesToCluster(String clusterName, String[] instanceInfoArray) { |
| for (String instanceInfo : instanceInfoArray) { |
| if (instanceInfo.length() > 0) { |
| addInstanceToCluster(clusterName, instanceInfo); |
| } |
| } |
| } |
| |
| public void addInstanceToCluster(String clusterName, String instanceId) { |
| InstanceConfig config = InstanceConfig.toInstanceConfig(instanceId); |
| _admin.addInstance(clusterName, config); |
| } |
| |
| public void addInstanceTag(String clusterName, String instanceName, String tag) { |
| _admin.addInstanceTag(clusterName, instanceName, tag); |
| } |
| |
| public void dropInstancesFromCluster(String clusterName, String[] instanceInfoArray) { |
| for (String instanceInfo : instanceInfoArray) { |
| if (instanceInfo.length() > 0) { |
| dropInstanceFromCluster(clusterName, instanceInfo); |
| } |
| } |
| } |
| |
| public void dropInstanceFromCluster(String clusterName, String instanceId) { |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| PropertyKey.Builder keyBuilder = accessor.keyBuilder(); |
| |
| InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(instanceId); |
| instanceId = instanceConfig.getInstanceName(); |
| |
| // ensure node is not live |
| LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceId)); |
| if (liveInstance != null) { |
| throw new HelixException(String |
| .format("Cannot drop instance %s as it is still live. Please stop it first", instanceId)); |
| } |
| |
| InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceId)); |
| if (config == null) { |
| String error = "Node " + instanceId + " does not exist, cannot drop"; |
| _logger.warn(error); |
| throw new HelixException(error); |
| } |
| |
| ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); |
| // ensure node is disabled, otherwise fail |
| if (config.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null |
| || !clusterConfig.getDisabledInstances().containsKey(instanceId))) { |
| String error = "Node " + instanceId + " is enabled, cannot drop"; |
| _logger.warn(error); |
| throw new HelixException(error); |
| } |
| _admin.dropInstance(clusterName, config); |
| } |
| |
| /** |
| * For CUSTOMIZED and SEMI_AUTO resources, this tool is used to change instance mapping |
| * in the cluster. When a node is replaced in the cluster, we just change preference list |
| * and map field in IdealState lf all resource, to replace old instance with new instance |
| * |
| * This method will ignore all resource with FULL_AUTO. |
| * This method will ensure that old instance is disabled AND not alive, but it's OK that new |
| * instance is just created, not live / enabled yet |
| * |
| * @param clusterName cluster name |
| * @param oldInstanceName old instance to swap out |
| * @param newInstanceName new instance to add to |
| */ |
| public void swapInstance(String clusterName, final String oldInstanceName, final String newInstanceName) { |
| if (oldInstanceName.equals(newInstanceName)) { |
| _logger.info("Old instance has same name as new instance, no need to swap"); |
| return; |
| } |
| |
| ZKHelixDataAccessor accessor = |
| new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); |
| PropertyKey.Builder keyBuilder = accessor.keyBuilder(); |
| |
| // If new instance config is missing, new instance is not in good state and therefore |
| // should not perform swap. |
| // It is OK that we miss old instance config for idempotency of this method |
| InstanceConfig newConfig = accessor.getProperty(keyBuilder.instanceConfig(newInstanceName)); |
| if (newConfig == null) { |
| String error = "New instance " + newInstanceName + " does not exist, cannot swap"; |
| _logger.warn(error); |
| throw new HelixException(error); |
| } |
| |
| try { |
| // drop instance will ensure the old instance is disabled, and not live, or it will |
| // throw exception |
| dropInstanceFromCluster(clusterName, oldInstanceName); |
| } catch (HelixException e) { |
| // If old instance is already gone, continue to swap. Note that it is possible |
| // that do to some error, we still keep a disabled record of old instance in |
| // cluster config, we don't strictly check and fix that |
| if (e.toString().contains("does not exist")) { |
| _logger.warn("Instance {} does not exist, continue to swap instance for cluster {}", |
| oldInstanceName, clusterName); |
| } else { |
| _logger.warn("Failed to drop instance {} from cluster {}", oldInstanceName, clusterName, e); |
| throw e; |
| } |
| } |
| |
| // When the amount of ideal state data is huge, we might only read partially from ZK |
| // so the safest way is to list first and read each individual ideal state |
| List<String> existingIdealStateNames = |
| accessor.getChildNames(accessor.keyBuilder().idealStates()); |
| |
| for (final String resourceName : existingIdealStateNames) { |
| IdealState resourceIdealState = |
| accessor.getProperty(accessor.keyBuilder().idealStates(resourceName)); |
| if (resourceIdealState.getRebalanceMode().equals(RebalanceMode.FULL_AUTO)) { |
| _logger.warn("Resource {} is in FULL_AUTO rebalance mode, don't swap", resourceName); |
| continue; |
| } |
| // For CUSTOMIZED and SEMI_AUTO rebalance mode, swap instance |
| swapInstanceInIdealState(resourceIdealState, oldInstanceName, newInstanceName); |
| |
| // Update ideal state |
| accessor.updateProperty(accessor.keyBuilder().idealStates(resourceName), |
| new DataUpdater<ZNRecord>() { |
| @Override |
| public ZNRecord update(ZNRecord znRecord) { |
| if (znRecord == null) { |
| throw new HelixException(String.format( |
| "swapInstance DataUpdater: IdealState for resource %s no longer exists!", |
| resourceName)); |
| } |
| // Need to swap again in case there are added partition with old instance |
| swapInstanceInIdealState(new IdealState(znRecord), oldInstanceName, newInstanceName); |
| return znRecord; |
| } |
| }, resourceIdealState); |
| _logger.info("Successfully swapped instance for resource {}", resourceName); |
| } |
| } |
| |
| /** |
| * Replace old instance name in map field and list field with new instance name |
| * @param idealState ideal state object |
| * @param oldInstance old instance name |
| * @param newInstance new instance name |
| */ |
| void swapInstanceInIdealState(IdealState idealState, String oldInstance, String newInstance) { |
| for (String partition : idealState.getRecord().getMapFields().keySet()) { |
| Map<String, String> valMap = idealState.getRecord().getMapField(partition); |
| if (valMap.containsKey(oldInstance)) { |
| valMap.put(newInstance, valMap.get(oldInstance)); |
| valMap.remove(oldInstance); |
| } |
| } |
| |
| for (String partition : idealState.getRecord().getListFields().keySet()) { |
| List<String> valList = idealState.getRecord().getListField(partition); |
| for (int i = 0; i < valList.size(); i++) { |
| if (valList.get(i).equals(oldInstance)) { |
| valList.remove(i); |
| valList.add(i, newInstance); |
| } |
| } |
| } |
| } |
| |
| public HelixAdmin getClusterManagementTool() { |
| return _admin; |
| } |
| |
| public void addStateModelDef(String clusterName, String stateModelDef, |
| StateModelDefinition record) { |
| _admin.addStateModelDef(clusterName, stateModelDef, record); |
| } |
| |
| public void addStateModelDef(String clusterName, String stateModelDef, |
| StateModelDefinition record, boolean overwritePrevious) { |
| _admin.addStateModelDef(clusterName, stateModelDef, record, overwritePrevious); |
| } |
| |
| public void addResourceToCluster(String clusterName, String resourceName, IdealState idealState) { |
| _admin.addResource(clusterName, resourceName, idealState); |
| } |
| |
| public void addResourceToCluster(String clusterName, String resourceName, int numPartitions, |
| String stateModelRef) { |
| addResourceToCluster(clusterName, resourceName, numPartitions, stateModelRef, |
| RebalanceMode.SEMI_AUTO.toString()); |
| } |
| |
| public void addResourceToCluster(String clusterName, String resourceName, int numPartitions, |
| String stateModelRef, String rebalancerMode) { |
| _admin.addResource(clusterName, resourceName, numPartitions, stateModelRef, rebalancerMode); |
| } |
| |
| public void addResourceToCluster(String clusterName, String resourceName, int numPartitions, |
| String stateModelRef, String rebalancerMode, String rebalanceStrategy) { |
| _admin.addResource(clusterName, resourceName, numPartitions, stateModelRef, rebalancerMode, |
| rebalanceStrategy); |
| } |
| |
| public void addResourceToCluster(String clusterName, String resourceName, int numPartitions, |
| String stateModelRef, String rebalancerMode, int bucketSize) { |
| _admin.addResource(clusterName, resourceName, numPartitions, stateModelRef, rebalancerMode, |
| bucketSize); |
| } |
| |
| public void addResourceToCluster(String clusterName, String resourceName, int numPartitions, |
| String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance) { |
| _admin.addResource(clusterName, resourceName, numPartitions, stateModelRef, rebalancerMode, |
| bucketSize, maxPartitionsPerInstance); |
| } |
| |
| /** |
| * Get the mangled IdealState name if resourceGroup/resourceTag is enable. |
| */ |
| public static String genIdealStateNameWithResourceTag(String resourceName, String resourceTag) { |
| return resourceName + "$" + resourceTag; |
| } |
| |
| /** |
| * Create an IdealState for a resource that belongs to a resource group We use |
| * "resourceGroupName$resourceInstanceTag" as the IdealState znode name to differetiate different |
| * resources from the same resourceGroup. |
| */ |
| public IdealState createIdealStateForResourceGroup(String resourceGroupName, |
| String resourceTag, int numPartition, int replica, String rebalanceMode, String stateModelDefName) { |
| String idealStateId = genIdealStateNameWithResourceTag(resourceGroupName, resourceTag); |
| IdealState idealState = new IdealState(idealStateId); |
| idealState.setNumPartitions(numPartition); |
| idealState.setStateModelDefRef(stateModelDefName); |
| IdealState.RebalanceMode mode = |
| idealState.rebalanceModeFromString(rebalanceMode, IdealState.RebalanceMode.SEMI_AUTO); |
| idealState.setRebalanceMode(mode); |
| idealState.setReplicas("" + replica); |
| idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY); |
| idealState.setResourceGroupName(resourceGroupName); |
| idealState.setInstanceGroupTag(resourceTag); |
| idealState.enableGroupRouting(true); |
| |
| return idealState; |
| } |
| |
| /** |
| * Enable or disable a resource within a resource group associated with a given resource tag |
| * |
| * @param clusterName |
| * @param resourceName |
| * @param resourceTag |
| */ |
| public void enableResource(String clusterName, String resourceName, String resourceTag, |
| boolean enabled) { |
| String idealStateId = genIdealStateNameWithResourceTag(resourceName, resourceTag); |
| _admin.enableResource(clusterName, idealStateId, enabled); |
| } |
| |
| public void dropResourceFromCluster(String clusterName, String resourceName) { |
| _admin.dropResource(clusterName, resourceName); |
| } |
| |
| // TODO: remove this. has moved to ZkHelixAdmin |
| public void rebalanceStorageCluster(String clusterName, String resourceName, int replica) { |
| rebalanceStorageCluster(clusterName, resourceName, replica, resourceName); |
| } |
| |
| public void rebalanceResource(String clusterName, String resourceName, int replica) { |
| rebalanceStorageCluster(clusterName, resourceName, replica, resourceName); |
| } |
| |
| public void expandResource(String clusterName, String resourceName) { |
| IdealState idealState = _admin.getResourceIdealState(clusterName, resourceName); |
| if (idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO |
| || idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) { |
| _logger.info("Skipping idealState " + idealState.getResourceName() + " " |
| + idealState.getRebalanceMode()); |
| return; |
| } |
| boolean anyLiveInstance = false; |
| for (List<String> list : idealState.getRecord().getListFields().values()) { |
| if (list.contains(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString())) { |
| _logger.info("Skipping idealState " + idealState.getResourceName() |
| + " with ANY_LIVEINSTANCE"); |
| anyLiveInstance = true; |
| continue; |
| } |
| } |
| if (anyLiveInstance) { |
| return; |
| } |
| try { |
| int replica = Integer.parseInt(idealState.getReplicas()); |
| } catch (Exception e) { |
| _logger.error("", e); |
| return; |
| } |
| if (idealState.getRecord().getListFields().size() == 0) { |
| _logger.warn("Resource " + resourceName + " not balanced, skip"); |
| return; |
| } |
| balanceIdealState(clusterName, idealState); |
| } |
| |
| public void expandCluster(String clusterName) { |
| List<String> resources = _admin.getResourcesInCluster(clusterName); |
| for (String resourceName : resources) { |
| expandResource(clusterName, resourceName); |
| } |
| } |
| |
| public void balanceIdealState(String clusterName, IdealState idealState) { |
| // The new instances are added into the cluster already. So we need to find out the |
| // instances that |
| // already have partitions assigned to them. |
| List<String> instanceNames = _admin.getInstancesInCluster(clusterName); |
| rebalanceResource(clusterName, idealState, instanceNames); |
| |
| } |
| |
| private void rebalanceResource(String clusterName, IdealState idealState, |
| List<String> instanceNames) { |
| _admin.rebalance(clusterName, idealState, instanceNames); |
| } |
| |
| public void rebalanceStorageCluster(String clusterName, String resourceName, int replica, |
| String keyPrefix) { |
| _admin.rebalance(clusterName, resourceName, replica, keyPrefix, ""); |
| } |
| |
| public void rebalanceCluster(String clusterName, String resourceName, int replica, |
| String keyPrefix, String group) { |
| _admin.rebalance(clusterName, resourceName, replica, keyPrefix, group); |
| } |
| |
| public void rebalanceStorageCluster(String clusterName, String resourceName, String group, |
| int replica) { |
| _admin.rebalance(clusterName, resourceName, replica, resourceName, group); |
| } |
| |
| /** |
| * set configs |
| * @param type config-scope type, e.g. CLUSTER, RESOURCE, etc. |
| * @param scopeArgsCsv scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB |
| * @param keyValuePairs csv-formatted key-value pairs. e.g. k1=v1,k2=v2 |
| */ |
| public void setConfig(ConfigScopeProperty type, String scopeArgsCsv, String keyValuePairs) { |
| // ConfigScope scope = new ConfigScopeBuilder().build(scopesKeyValuePairs); |
| String[] scopeArgs = scopeArgsCsv.split("[\\s,]"); |
| HelixConfigScope scope = new HelixConfigScopeBuilder(type, scopeArgs).build(); |
| |
| Map<String, String> keyValueMap = HelixUtil.parseCsvFormatedKeyValuePairs(keyValuePairs); |
| _admin.setConfig(scope, keyValueMap); |
| } |
| |
| /** |
| * remove configs |
| * @param type config-scope type, e.g. CLUSTER, RESOURCE, etc. |
| * @param scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB |
| * @param keysCsv csv-formatted keys. e.g. k1,k2 |
| */ |
| public void removeConfig(ConfigScopeProperty type, String scopeArgsCsv, String keysCsv) { |
| // ConfigScope scope = new ConfigScopeBuilder().build(scopesStr); |
| // |
| // // parse keys |
| // String[] keys = keysStr.split("[\\s,]"); |
| // Set<String> keysSet = new HashSet<String>(Arrays.asList(keys)); |
| |
| String[] scopeArgs = scopeArgsCsv.split("[\\s,]"); |
| HelixConfigScope scope = new HelixConfigScopeBuilder(type, scopeArgs).build(); |
| |
| String[] keys = keysCsv.split("[\\s,]"); |
| |
| _admin.removeConfig(scope, Arrays.asList(keys)); |
| } |
| |
| /** |
| * set cloud configs |
| * @param clusterName |
| * @param cloudConfigManifest |
| */ |
| public void setCloudConfig(String clusterName, String cloudConfigManifest) { |
| ZNRecord record; |
| try { |
| record = ZNRECORD_READER.readValue(cloudConfigManifest); |
| } catch (IOException e) { |
| _logger |
| .error("Failed to deserialize user's input " + cloudConfigManifest + ", Exception: " + e); |
| throw new IllegalArgumentException("Failed to deserialize user's input "); |
| } |
| |
| CloudConfig cloudConfig = new CloudConfig.Builder(record).build(); |
| _admin.addCloudConfig(clusterName, cloudConfig); |
| } |
| |
| /** |
| * remove cloud configs |
| * @param clusterName |
| */ |
| public void removeCloudConfig(String clusterName) { |
| _admin.removeCloudConfig(clusterName); |
| } |
| |
| /** |
| * get configs |
| * @param type config-scope-type, e.g. CLUSTER, RESOURCE, etc. |
| * @param scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB |
| * @param keysCsv csv-formatted keys. e.g. k1,k2 |
| * @return json-formated key-value pairs, e.g. {k1=v1,k2=v2} |
| */ |
| public String getConfig(ConfigScopeProperty type, String scopeArgsCsv, String keysCsv) { |
| // ConfigScope scope = new ConfigScopeBuilder().build(scopesStr); |
| |
| String[] scopeArgs = scopeArgsCsv.split("[\\s,]"); |
| HelixConfigScope scope = new HelixConfigScopeBuilder(type, scopeArgs).build(); |
| |
| String[] keys = keysCsv.split("[\\s,]"); |
| // parse keys |
| // String[] keys = keysStr.split("[\\s,]"); |
| // Set<String> keysSet = new HashSet<String>(Arrays.asList(keys)); |
| |
| Map<String, String> keyValueMap = _admin.getConfig(scope, Arrays.asList(keys)); |
| ZNRecord record = new ZNRecord(type.toString()); |
| // record.setMapField(scopesStr, propertiesMap); |
| record.getSimpleFields().putAll(keyValueMap); |
| ZNRecordSerializer serializer = new ZNRecordSerializer(); |
| return new String(serializer.serialize(record)); |
| } |
| |
| /** |
| * set constraint |
| * @param clusterName |
| * @param constraintType |
| * @param constraintId |
| * @param constraintAttributesMap : csv-formated constraint key-value pairs |
| */ |
| public void setConstraint(String clusterName, String constraintType, String constraintId, |
| String constraintAttributesMap) { |
| if (clusterName == null || constraintType == null || constraintId == null |
| || constraintAttributesMap == null) { |
| throw new IllegalArgumentException( |
| "fail to set constraint. missing clusterName|constraintType|constraintId|constraintAttributesMap"); |
| } |
| |
| ConstraintType type = ConstraintType.valueOf(constraintType); |
| ConstraintItemBuilder builder = new ConstraintItemBuilder(); |
| Map<String, String> constraintAttributes = |
| HelixUtil.parseCsvFormatedKeyValuePairs(constraintAttributesMap); |
| ConstraintItem constraintItem = builder.addConstraintAttributes(constraintAttributes).build(); |
| _admin.setConstraint(clusterName, type, constraintId, constraintItem); |
| } |
| |
| /** |
| * remove constraint |
| * @param clusterName |
| * @param constraintType |
| * @param constraintId |
| */ |
| public void removeConstraint(String clusterName, String constraintType, String constraintId) { |
| if (clusterName == null || constraintType == null || constraintId == null) { |
| throw new IllegalArgumentException( |
| "fail to remove constraint. missing clusterName|constraintType|constraintId"); |
| } |
| |
| ConstraintType type = ConstraintType.valueOf(constraintType); |
| _admin.removeConstraint(clusterName, type, constraintId); |
| } |
| |
| /** |
| * get constraints associated with given type |
| * @param constraintType : constraint-type. e.g. MESSAGE_CONSTRAINT |
| * @return json-formated constraints |
| */ |
| public String getConstraints(String clusterName, String constraintType) { |
| if (clusterName == null || constraintType == null) { |
| throw new IllegalArgumentException( |
| "fail to get constraint. missing clusterName|constraintType"); |
| } |
| |
| ConstraintType type = ConstraintType.valueOf(constraintType); |
| ClusterConstraints constraints = _admin.getConstraints(clusterName, type); |
| return new String(constraints.serialize(new ZNRecordSerializer())); |
| } |
| |
| /** |
| * Sets up a cluster<br/> |
| * 6 Instances[localhost:8900 to localhost:8905], <br/> |
| * 1 resource[TestDB] with a replication factor of 3 and using MasterSlave state model<br/> |
| * @param clusterName |
| */ |
| public void setupTestCluster(String clusterName) { |
| addCluster(clusterName, true); |
| String instanceInfoArray[] = new String[6]; |
| for (int i = 0; i < instanceInfoArray.length; i++) { |
| instanceInfoArray[i] = "localhost_" + (8900 + i); |
| } |
| addInstancesToCluster(clusterName, instanceInfoArray); |
| addResourceToCluster(clusterName, "TestDB", 10, "MasterSlave"); |
| rebalanceStorageCluster(clusterName, "TestDB", 3); |
| } |
| |
| public static void printUsage(Options cliOptions) { |
| HelpFormatter helpFormatter = new HelpFormatter(); |
| helpFormatter.setWidth(1000); |
| helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions); |
| } |
| |
| @SuppressWarnings("static-access") |
| private static Options constructCommandLineOptions() { |
| Option helpOption = |
| OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info") |
| .create(); |
| |
| Option zkServerOption = |
| OptionBuilder.withLongOpt(zkServerAddress).withDescription("Provide zookeeper address") |
| .create(); |
| zkServerOption.setArgs(1); |
| zkServerOption.setRequired(true); |
| zkServerOption.setArgName("ZookeeperServerAddress(Required)"); |
| |
| Option listClustersOption = |
| OptionBuilder.withLongOpt(listClusters).withDescription("List existing clusters").create(); |
| listClustersOption.setArgs(0); |
| listClustersOption.setRequired(false); |
| |
| Option listResourceOption = |
| OptionBuilder.withLongOpt(listResources) |
| .withDescription("List resources hosted in a cluster").create(); |
| listResourceOption.setArgs(1); |
| listResourceOption.setRequired(false); |
| listResourceOption.setArgName("clusterName <-tag TagValue>"); |
| |
| Option listInstancesOption = |
| OptionBuilder.withLongOpt(listInstances).withDescription("List Instances in a cluster") |
| .create(); |
| listInstancesOption.setArgs(1); |
| listInstancesOption.setRequired(false); |
| listInstancesOption.setArgName("clusterName <-tag tagName>"); |
| |
| Option addClusterOption = |
| OptionBuilder.withLongOpt(addCluster).withDescription("Add a new cluster").create(); |
| addClusterOption.setArgs(1); |
| addClusterOption.setRequired(false); |
| addClusterOption.setArgName("clusterName"); |
| |
| Option activateClusterOption = |
| OptionBuilder.withLongOpt(activateCluster) |
| .withDescription("Enable/disable a cluster in distributed controller mode").create(); |
| activateClusterOption.setArgs(3); |
| activateClusterOption.setRequired(false); |
| activateClusterOption.setArgName("clusterName grandCluster true/false"); |
| |
| Option deleteClusterOption = |
| OptionBuilder.withLongOpt(dropCluster).withDescription("Delete a cluster").create(); |
| deleteClusterOption.setArgs(1); |
| deleteClusterOption.setRequired(false); |
| deleteClusterOption.setArgName("clusterName"); |
| |
| Option addInstanceOption = |
| OptionBuilder.withLongOpt(addInstance).withDescription("Add a new Instance to a cluster") |
| .create(); |
| addInstanceOption.setArgs(2); |
| addInstanceOption.setRequired(false); |
| addInstanceOption.setArgName("clusterName InstanceId"); |
| |
| Option addResourceOption = |
| OptionBuilder.withLongOpt(addResource).withDescription("Add a resource to a cluster") |
| .create(); |
| addResourceOption.setArgs(4); |
| addResourceOption.setRequired(false); |
| addResourceOption |
| .setArgName("clusterName resourceName partitionNum stateModelRef <-mode modeValue>"); |
| |
| Option expandResourceOption = |
| OptionBuilder.withLongOpt(expandResource) |
| .withDescription("Expand resource to additional nodes").create(); |
| expandResourceOption.setArgs(2); |
| expandResourceOption.setRequired(false); |
| expandResourceOption.setArgName("clusterName resourceName"); |
| |
| Option expandClusterOption = |
| OptionBuilder.withLongOpt(expandCluster) |
| .withDescription("Expand a cluster and all the resources").create(); |
| expandClusterOption.setArgs(1); |
| expandClusterOption.setRequired(false); |
| expandClusterOption.setArgName("clusterName"); |
| |
| Option resourceModeOption = |
| OptionBuilder.withLongOpt(mode) |
| .withDescription("Specify resource mode, used with addResourceGroup command").create(); |
| resourceModeOption.setArgs(1); |
| resourceModeOption.setRequired(false); |
| resourceModeOption.setArgName("IdealState mode"); |
| |
| Option resourceTagOption = |
| OptionBuilder.withLongOpt(tag) |
| .withDescription("Specify resource tag, used with listResources command").create(); |
| resourceTagOption.setArgs(1); |
| resourceTagOption.setRequired(false); |
| resourceTagOption.setArgName("tag"); |
| |
| Option resourceBucketSizeOption = |
| OptionBuilder.withLongOpt(bucketSize) |
| .withDescription("Specify size of a bucket, used with addResourceGroup command") |
| .create(); |
| resourceBucketSizeOption.setArgs(1); |
| resourceBucketSizeOption.setRequired(false); |
| resourceBucketSizeOption.setArgName("Size of a bucket for a resource"); |
| |
| Option maxPartitionsPerNodeOption = |
| OptionBuilder.withLongOpt(maxPartitionsPerNode) |
| .withDescription("Specify max partitions per node, used with addResourceGroup command") |
| .create(); |
| maxPartitionsPerNodeOption.setArgs(1); |
| maxPartitionsPerNodeOption.setRequired(false); |
| maxPartitionsPerNodeOption.setArgName("Max partitions per node for a resource"); |
| |
| Option resourceKeyOption = |
| OptionBuilder.withLongOpt(resourceKeyPrefix) |
| .withDescription("Specify resource key prefix, used with rebalance command").create(); |
| resourceKeyOption.setArgs(1); |
| resourceKeyOption.setRequired(false); |
| resourceKeyOption.setArgName("Resource key prefix"); |
| |
| Option instanceGroupTagOption = |
| OptionBuilder.withLongOpt(instanceGroupTag) |
| .withDescription("Specify instance group tag, used with rebalance command").create(); |
| instanceGroupTagOption.setArgs(1); |
| instanceGroupTagOption.setRequired(false); |
| instanceGroupTagOption.setArgName("Instance group tag"); |
| |
| Option addStateModelDefOption = |
| OptionBuilder.withLongOpt(addStateModelDef) |
| .withDescription("Add a State model to a cluster").create(); |
| addStateModelDefOption.setArgs(2); |
| addStateModelDefOption.setRequired(false); |
| addStateModelDefOption.setArgName("clusterName <filename>"); |
| |
| Option addIdealStateOption = |
| OptionBuilder.withLongOpt(addIdealState).withDescription("Add a State model to a cluster") |
| .create(); |
| addIdealStateOption.setArgs(3); |
| addIdealStateOption.setRequired(false); |
| addIdealStateOption.setArgName("clusterName resourceName <filename>"); |
| |
| Option dropInstanceOption = |
| OptionBuilder.withLongOpt(dropInstance) |
| .withDescription("Drop an existing Instance from a cluster").create(); |
| dropInstanceOption.setArgs(2); |
| dropInstanceOption.setRequired(false); |
| dropInstanceOption.setArgName("clusterName InstanceId"); |
| |
| Option swapInstanceOption = |
| OptionBuilder.withLongOpt(swapInstance) |
| .withDescription("Swap an old instance from a cluster with a new instance").create(); |
| swapInstanceOption.setArgs(3); |
| swapInstanceOption.setRequired(false); |
| swapInstanceOption.setArgName("clusterName oldInstance newInstance"); |
| |
| Option dropResourceOption = |
| OptionBuilder.withLongOpt(dropResource) |
| .withDescription("Drop an existing resource from a cluster").create(); |
| dropResourceOption.setArgs(2); |
| dropResourceOption.setRequired(false); |
| dropResourceOption.setArgName("clusterName resourceName"); |
| |
| Option enableResourceOption = |
| OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource") |
| .hasArgs(3).isRequired(false) |
| .withArgName("clusterName resourceName true/false <-tag resourceTag>") |
| .create(); |
| |
| Option rebalanceOption = |
| OptionBuilder.withLongOpt(rebalance).withDescription("Rebalance a resource in a cluster") |
| .create(); |
| rebalanceOption.setArgs(3); |
| rebalanceOption.setRequired(false); |
| rebalanceOption.setArgName("clusterName resourceName replicas"); |
| |
| Option instanceInfoOption = |
| OptionBuilder.withLongOpt(listInstanceInfo) |
| .withDescription("Query info of a Instance in a cluster").create(); |
| instanceInfoOption.setArgs(2); |
| instanceInfoOption.setRequired(false); |
| instanceInfoOption.setArgName("clusterName InstanceName"); |
| |
| Option clusterInfoOption = |
| OptionBuilder.withLongOpt(listClusterInfo).withDescription("Query info of a cluster") |
| .create(); |
| clusterInfoOption.setArgs(1); |
| clusterInfoOption.setRequired(false); |
| clusterInfoOption.setArgName("clusterName"); |
| |
| Option resourceInfoOption = |
| OptionBuilder.withLongOpt(listResourceInfo).withDescription("Query info of a resource") |
| .create(); |
| resourceInfoOption.setArgs(2); |
| resourceInfoOption.setRequired(false); |
| resourceInfoOption.setArgName("clusterName resourceName"); |
| |
| Option addResourcePropertyOption = |
| OptionBuilder.withLongOpt(addResourceProperty).withDescription("Add a resource property") |
| .create(); |
| addResourcePropertyOption.setArgs(4); |
| addResourcePropertyOption.setRequired(false); |
| addResourcePropertyOption.setArgName("clusterName resourceName propertyName propertyValue"); |
| |
| Option removeResourcePropertyOption = |
| OptionBuilder.withLongOpt(removeResourceProperty) |
| .withDescription("Remove a resource property").create(); |
| removeResourcePropertyOption.setArgs(3); |
| removeResourcePropertyOption.setRequired(false); |
| removeResourcePropertyOption.setArgName("clusterName resourceName propertyName"); |
| |
| Option partitionInfoOption = |
| OptionBuilder.withLongOpt(listPartitionInfo).withDescription("Query info of a partition") |
| .create(); |
| partitionInfoOption.setArgs(3); |
| partitionInfoOption.setRequired(false); |
| partitionInfoOption.setArgName("clusterName resourceName partitionName"); |
| |
| Option enableInstanceOption = |
| OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable an instance") |
| .create(); |
| enableInstanceOption.setArgs(3); |
| enableInstanceOption.setRequired(false); |
| enableInstanceOption.setArgName("clusterName instanceName true/false"); |
| |
| Option enablePartitionOption = |
| OptionBuilder.hasArgs().withLongOpt(enablePartition) |
| .withDescription("Enable/disable partitions").create(); |
| enablePartitionOption.setRequired(false); |
| enablePartitionOption |
| .setArgName("true/false clusterName instanceName resourceName partitionName1..."); |
| |
| Option enableClusterOption = |
| OptionBuilder.withLongOpt(enableCluster) |
| .withDescription("pause/resume the controller of a cluster").create(); |
| enableClusterOption.setArgs(2); |
| enableClusterOption.setRequired(false); |
| enableClusterOption.setArgName("clusterName true/false"); |
| |
| Option resetPartitionOption = |
| OptionBuilder.withLongOpt(resetPartition) |
| .withDescription("Reset a partition in error state").create(); |
| resetPartitionOption.setArgs(4); |
| resetPartitionOption.setRequired(false); |
| resetPartitionOption.setArgName("clusterName instanceName resourceName partitionName"); |
| |
| Option resetInstanceOption = |
| OptionBuilder.withLongOpt(resetInstance) |
| .withDescription("Reset all partitions in error state for an instance").create(); |
| resetInstanceOption.setArgs(2); |
| resetInstanceOption.setRequired(false); |
| resetInstanceOption.setArgName("clusterName instanceName"); |
| |
| Option resetResourceOption = |
| OptionBuilder.withLongOpt(resetResource) |
| .withDescription("Reset all partitions in error state for a resource").create(); |
| resetResourceOption.setArgs(2); |
| resetResourceOption.setRequired(false); |
| resetResourceOption.setArgName("clusterName resourceName"); |
| |
| Option listStateModelsOption = |
| OptionBuilder.withLongOpt(listStateModels) |
| .withDescription("Query info of state models in a cluster").create(); |
| listStateModelsOption.setArgs(1); |
| listStateModelsOption.setRequired(false); |
| listStateModelsOption.setArgName("clusterName"); |
| |
| Option listStateModelOption = |
| OptionBuilder.withLongOpt(listStateModel) |
| .withDescription("Query info of a state model in a cluster").create(); |
| listStateModelOption.setArgs(2); |
| listStateModelOption.setRequired(false); |
| listStateModelOption.setArgName("clusterName stateModelName"); |
| |
| Option addInstanceTagOption = |
| OptionBuilder.withLongOpt(addInstanceTag).withDescription("Add a tag to instance").create(); |
| addInstanceTagOption.setArgs(3); |
| addInstanceTagOption.setRequired(false); |
| addInstanceTagOption.setArgName("clusterName instanceName tag"); |
| Option removeInstanceTagOption = |
| OptionBuilder.withLongOpt(removeInstanceTag).withDescription("Remove tag from instance") |
| .create(); |
| removeInstanceTagOption.setArgs(3); |
| removeInstanceTagOption.setRequired(false); |
| removeInstanceTagOption.setArgName("clusterName instanceName tag"); |
| |
| // TODO need deal with resource-names containing "," |
| // set/get/remove configs options |
| Option setConfOption = |
| OptionBuilder |
| .hasArgs(3) |
| .isRequired(false) |
| .withArgName( |
| "ConfigScope(e.g. RESOURCE) ConfigScopeArgs(e.g. myCluster,testDB) KeyValueMap(e.g. k1=v1,k2=v2)") |
| .withLongOpt(setConfig).withDescription("Set configs").create(); |
| |
| Option getConfOption = |
| OptionBuilder |
| .hasArgs(3) |
| .isRequired(false) |
| .withArgName( |
| "ConfigScope(e.g. RESOURCE) ConfigScopeArgs(e.g. myCluster,testDB) Keys(e.g. k1,k2)") |
| .withLongOpt(getConfig).withDescription("Get configs").create(); |
| |
| Option removeConfOption = |
| OptionBuilder |
| .hasArgs(3) |
| .isRequired(false) |
| .withArgName( |
| "ConfigScope(e.g. RESOURCE) ConfigScopeArgs(e.g. myCluster,testDB) Keys(e.g. k1,k2)") |
| .withLongOpt(removeConfig).withDescription("Remove configs").create(); |
| |
| // set/get/remove constraints options |
| Option setConstraintOption = |
| OptionBuilder |
| .hasArgs(4) |
| .isRequired(false) |
| .withArgName( |
| "clusterName ConstraintType(e.g. MESSAGE_CONSTRAINT) ConstraintId KeyValueMap(e.g. k1=v1,k2=v2)") |
| .withLongOpt(setConstraint) |
| .withDescription("Set a constraint associated with a give id. create if not exist") |
| .create(); |
| |
| Option getConstraintsOption = |
| OptionBuilder.hasArgs(2).isRequired(false) |
| .withArgName("clusterName ConstraintType(e.g. MESSAGE_CONSTRAINT)") |
| .withLongOpt(getConstraints) |
| .withDescription("Get constraints associated with given type").create(); |
| |
| Option removeConstraintOption = |
| OptionBuilder.hasArgs(3).isRequired(false) |
| .withArgName("clusterName ConstraintType(e.g. MESSAGE_CONSTRAINT) ConstraintId") |
| .withLongOpt(removeConstraint) |
| .withDescription("Remove a constraint associated with given id").create(); |
| |
| Option setCloudConfigOption = OptionBuilder.withLongOpt(setCloudConfig).withDescription( |
| "Set the Cloud Configuration of the cluster. Example:\n sh helix-admin.sh --zkSvr ZookeeperServerAddress --setCloudConfig ClusterName '{\"simpleFields\" : {\"CLOUD_ENABLED\" : \"true\",\"CLOUD_PROVIDER\": \"AZURE\"}}'") |
| .create(); |
| setCloudConfigOption.setArgs(2); |
| setCloudConfigOption.setRequired(false); |
| setCloudConfigOption.setArgName("clusterName CloudConfigurationManifest"); |
| |
| Option removeCloudConfigOption = OptionBuilder.withLongOpt(removeCloudConfig) |
| .withDescription("Remove the Cloud Configuration of the cluster").create(); |
| removeCloudConfigOption.setArgs(1); |
| removeCloudConfigOption.setRequired(false); |
| removeCloudConfigOption.setArgName("clusterName"); |
| |
| OptionGroup group = new OptionGroup(); |
| group.setRequired(true); |
| group.addOption(rebalanceOption); |
| group.addOption(addResourceOption); |
| group.addOption(resourceModeOption); |
| group.addOption(resourceTagOption); |
| group.addOption(resourceBucketSizeOption); |
| group.addOption(maxPartitionsPerNodeOption); |
| group.addOption(expandResourceOption); |
| group.addOption(expandClusterOption); |
| group.addOption(resourceKeyOption); |
| group.addOption(addClusterOption); |
| group.addOption(activateClusterOption); |
| group.addOption(deleteClusterOption); |
| group.addOption(addInstanceOption); |
| group.addOption(listInstancesOption); |
| group.addOption(listResourceOption); |
| group.addOption(listClustersOption); |
| group.addOption(addIdealStateOption); |
| group.addOption(rebalanceOption); |
| group.addOption(dropInstanceOption); |
| group.addOption(swapInstanceOption); |
| group.addOption(dropResourceOption); |
| group.addOption(enableResourceOption); |
| group.addOption(instanceInfoOption); |
| group.addOption(clusterInfoOption); |
| group.addOption(resourceInfoOption); |
| group.addOption(partitionInfoOption); |
| group.addOption(enableInstanceOption); |
| group.addOption(enablePartitionOption); |
| group.addOption(enableClusterOption); |
| group.addOption(resetPartitionOption); |
| group.addOption(resetInstanceOption); |
| group.addOption(resetResourceOption); |
| group.addOption(addStateModelDefOption); |
| group.addOption(listStateModelsOption); |
| group.addOption(listStateModelOption); |
| group.addOption(addResourcePropertyOption); |
| group.addOption(removeResourcePropertyOption); |
| |
| // set/get/remove config options |
| group.addOption(setConfOption); |
| group.addOption(getConfOption); |
| group.addOption(removeConfOption); |
| |
| // set/get/remove constraint options |
| group.addOption(setConstraintOption); |
| group.addOption(getConstraintsOption); |
| group.addOption(removeConstraintOption); |
| |
| // set/remove cloud configs |
| group.addOption(setCloudConfigOption); |
| group.addOption(removeCloudConfigOption); |
| |
| group.addOption(addInstanceTagOption); |
| group.addOption(removeInstanceTagOption); |
| group.addOption(instanceGroupTagOption); |
| |
| Options options = new Options(); |
| options.addOption(helpOption); |
| options.addOption(zkServerOption); |
| options.addOptionGroup(group); |
| return options; |
| } |
| |
| // TODO: remove this. has moved to ZkHelixAdmin |
| private static byte[] readFile(String filePath) throws IOException { |
| File file = new File(filePath); |
| |
| int size = (int) file.length(); |
| byte[] bytes = new byte[size]; |
| DataInputStream dis = new DataInputStream(new FileInputStream(file)); |
| int read = 0; |
| int numRead = 0; |
| while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) { |
| read = read + numRead; |
| } |
| return bytes; |
| } |
| |
| public static int processCommandLineArgs(String[] cliArgs) throws Exception { |
| CommandLineParser cliParser = new GnuParser(); |
| Options cliOptions = constructCommandLineOptions(); |
| CommandLine cmd = null; |
| |
| try { |
| cmd = cliParser.parse(cliOptions, cliArgs); |
| } catch (ParseException pe) { |
| System.err.println("CommandLineClient: failed to parse command-line options: " |
| + pe.toString()); |
| printUsage(cliOptions); |
| System.exit(1); |
| } |
| |
| ClusterSetup setupTool = new ClusterSetup(cmd.getOptionValue(zkServerAddress)); |
| |
| if (cmd.hasOption(addCluster)) { |
| String clusterName = cmd.getOptionValue(addCluster); |
| setupTool.addCluster(clusterName, false); |
| return 0; |
| } |
| |
| if (cmd.hasOption(activateCluster)) { |
| String clusterName = cmd.getOptionValues(activateCluster)[0]; |
| String grandCluster = cmd.getOptionValues(activateCluster)[1]; |
| boolean enable = Boolean.parseBoolean(cmd.getOptionValues(activateCluster)[2]); |
| setupTool.activateCluster(clusterName, grandCluster, enable); |
| return 0; |
| } |
| |
| if (cmd.hasOption(dropCluster)) { |
| String clusterName = cmd.getOptionValue(dropCluster); |
| setupTool.deleteCluster(clusterName); |
| return 0; |
| } |
| |
| if (cmd.hasOption(addInstance)) { |
| String clusterName = cmd.getOptionValues(addInstance)[0]; |
| String instanceAddressInfo = cmd.getOptionValues(addInstance)[1]; |
| String[] instanceAddresses = instanceAddressInfo.split(";"); |
| setupTool.addInstancesToCluster(clusterName, instanceAddresses); |
| return 0; |
| } |
| |
| if (cmd.hasOption(addResource)) { |
| String clusterName = cmd.getOptionValues(addResource)[0]; |
| String resourceName = cmd.getOptionValues(addResource)[1]; |
| int partitions = Integer.parseInt(cmd.getOptionValues(addResource)[2]); |
| String stateModelRef = cmd.getOptionValues(addResource)[3]; |
| String modeValue = RebalanceMode.SEMI_AUTO.toString(); |
| if (cmd.hasOption(mode)) { |
| modeValue = cmd.getOptionValues(mode)[0]; |
| } |
| |
| int bucketSizeVal = 0; |
| if (cmd.hasOption(bucketSize)) { |
| bucketSizeVal = Integer.parseInt(cmd.getOptionValues(bucketSize)[0]); |
| } |
| |
| int maxPartitionsPerNodeVal = -1; |
| if (cmd.hasOption(maxPartitionsPerNode)) { |
| maxPartitionsPerNodeVal = Integer.parseInt(cmd.getOptionValues(maxPartitionsPerNode)[0]); |
| } |
| setupTool.addResourceToCluster(clusterName, resourceName, partitions, stateModelRef, |
| modeValue, bucketSizeVal, maxPartitionsPerNodeVal); |
| return 0; |
| } |
| |
| if (cmd.hasOption(rebalance)) { |
| String clusterName = cmd.getOptionValues(rebalance)[0]; |
| String resourceName = cmd.getOptionValues(rebalance)[1]; |
| int replicas = Integer.parseInt(cmd.getOptionValues(rebalance)[2]); |
| String keyPrefixVal = ""; |
| String instanceGroupTagVal = ""; |
| if (cmd.hasOption(resourceKeyPrefix)) { |
| keyPrefixVal = cmd.getOptionValue(resourceKeyPrefix); |
| } |
| if (cmd.hasOption(instanceGroupTag)) { |
| instanceGroupTagVal = cmd.getOptionValue(instanceGroupTag); |
| } |
| setupTool.rebalanceCluster(clusterName, resourceName, replicas, keyPrefixVal, |
| instanceGroupTagVal); |
| return 0; |
| } |
| |
| if (cmd.hasOption(expandCluster)) { |
| String clusterName = cmd.getOptionValues(expandCluster)[0]; |
| |
| setupTool.expandCluster(clusterName); |
| return 0; |
| } |
| |
| if (cmd.hasOption(expandResource)) { |
| String clusterName = cmd.getOptionValues(expandResource)[0]; |
| String resourceName = cmd.getOptionValues(expandResource)[1]; |
| setupTool.expandResource(clusterName, resourceName); |
| return 0; |
| } |
| |
| if (cmd.hasOption(dropInstance)) { |
| String clusterName = cmd.getOptionValues(dropInstance)[0]; |
| String instanceAddressInfo = cmd.getOptionValues(dropInstance)[1]; |
| String[] instanceAddresses = instanceAddressInfo.split(";"); |
| setupTool.dropInstancesFromCluster(clusterName, instanceAddresses); |
| return 0; |
| } |
| |
| if (cmd.hasOption(listClusters)) { |
| List<String> clusters = setupTool.getClusterManagementTool().getClusters(); |
| |
| System.out.println("Existing clusters:"); |
| for (String cluster : clusters) { |
| System.out.println(cluster); |
| } |
| return 0; |
| } |
| |
| if (cmd.hasOption(listResources)) { |
| String clusterName = cmd.getOptionValue(listResources); |
| List<String> resourceNames = null; |
| if (cmd.hasOption(tag)) { |
| String tagValue = cmd.getOptionValues(tag)[0]; |
| resourceNames = setupTool.getClusterManagementTool() |
| .getResourcesInClusterWithTag(clusterName, tagValue); |
| System.out.println( |
| "Existing resources in cluster " + clusterName + " with tag " + tagValue + " :"); |
| } else { |
| resourceNames = setupTool.getClusterManagementTool().getResourcesInCluster(clusterName); |
| System.out.println("Existing resources in cluster " + clusterName + ":"); |
| } |
| |
| for (String resourceName : resourceNames) { |
| System.out.println(resourceName); |
| } |
| return 0; |
| } else if (cmd.hasOption(listClusterInfo)) { |
| String clusterName = cmd.getOptionValue(listClusterInfo); |
| List<String> resourceNames = |
| setupTool.getClusterManagementTool().getResourcesInCluster(clusterName); |
| List<String> instances = |
| setupTool.getClusterManagementTool().getInstancesInCluster(clusterName); |
| |
| System.out.println("Existing resources in cluster " + clusterName + ":"); |
| for (String resourceName : resourceNames) { |
| System.out.println(resourceName); |
| } |
| |
| System.out.println("Instances in cluster " + clusterName + ":"); |
| for (String InstanceName : instances) { |
| System.out.println(InstanceName); |
| } |
| return 0; |
| } else if (cmd.hasOption(listInstances)) { |
| String clusterName = cmd.getOptionValue(listInstances); |
| |
| List<String> instances; |
| if (cmd.hasOption(tag)) { |
| String instanceTag = cmd.getOptionValues(tag)[0]; |
| instances = setupTool.getClusterManagementTool() |
| .getInstancesInClusterWithTag(clusterName, instanceTag); |
| } else { |
| instances = |
| setupTool.getClusterManagementTool().getInstancesInCluster(clusterName); |
| } |
| |
| System.out.println("Instances in cluster " + clusterName + ":"); |
| for (String instanceName : instances) { |
| System.out.println(instanceName); |
| } |
| return 0; |
| } else if (cmd.hasOption(listInstanceInfo)) { |
| String clusterName = cmd.getOptionValues(listInstanceInfo)[0]; |
| String instanceName = cmd.getOptionValues(listInstanceInfo)[1]; |
| InstanceConfig config = |
| setupTool.getClusterManagementTool().getInstanceConfig(clusterName, instanceName); |
| |
| String result = new String(config.serialize(new ZNRecordSerializer())); |
| System.out.println("InstanceConfig: " + result); |
| return 0; |
| } else if (cmd.hasOption(listResourceInfo)) { |
| // print out partition number, resource name and replication number |
| // Also the ideal states and current states |
| String clusterName = cmd.getOptionValues(listResourceInfo)[0]; |
| String resourceName = cmd.getOptionValues(listResourceInfo)[1]; |
| IdealState idealState = |
| setupTool.getClusterManagementTool().getResourceIdealState(clusterName, resourceName); |
| ExternalView externalView = |
| setupTool.getClusterManagementTool().getResourceExternalView(clusterName, resourceName); |
| |
| if (idealState != null) { |
| System.out.println("IdealState for " + resourceName + ":"); |
| System.out.println(new String(idealState.serialize(new ZNRecordSerializer()))); |
| } else { |
| System.out.println("No idealState for " + resourceName); |
| } |
| |
| System.out.println(); |
| |
| if (externalView != null) { |
| System.out.println("ExternalView for " + resourceName + ":"); |
| System.out.println(new String(externalView.serialize(new ZNRecordSerializer()))); |
| } else { |
| System.out.println("No externalView for " + resourceName); |
| } |
| return 0; |
| |
| } else if (cmd.hasOption(listPartitionInfo)) { |
| // print out where the partition master / slaves locates |
| String clusterName = cmd.getOptionValues(listPartitionInfo)[0]; |
| String resourceName = cmd.getOptionValues(listPartitionInfo)[1]; |
| String partitionName = cmd.getOptionValues(listPartitionInfo)[2]; |
| IdealState idealState = |
| setupTool.getClusterManagementTool().getResourceIdealState(clusterName, resourceName); |
| ExternalView externalView = |
| setupTool.getClusterManagementTool().getResourceExternalView(clusterName, resourceName); |
| |
| if (idealState != null) { |
| ZNRecord partInfo = new ZNRecord(resourceName + "/" + partitionName); |
| ZNRecord idealStateRec = idealState.getRecord(); |
| partInfo.setSimpleFields(idealStateRec.getSimpleFields()); |
| if (idealStateRec.getMapField(partitionName) != null) { |
| partInfo.setMapField(partitionName, idealStateRec.getMapField(partitionName)); |
| } |
| if (idealStateRec.getListField(partitionName) != null) { |
| partInfo.setListField(partitionName, idealStateRec.getListField(partitionName)); |
| } |
| System.out.println("IdealState for " + resourceName + "/" + partitionName + ":"); |
| System.out.println(new String(new ZNRecordSerializer().serialize(partInfo))); |
| } else { |
| System.out.println("No idealState for " + resourceName + "/" + partitionName); |
| } |
| |
| System.out.println(); |
| |
| if (externalView != null) { |
| ZNRecord partInfo = new ZNRecord(resourceName + "/" + partitionName); |
| ZNRecord extViewRec = externalView.getRecord(); |
| partInfo.setSimpleFields(extViewRec.getSimpleFields()); |
| if (extViewRec.getMapField(partitionName) != null) { |
| partInfo.setMapField(partitionName, extViewRec.getMapField(partitionName)); |
| } |
| if (extViewRec.getListField(partitionName) != null) { |
| partInfo.setListField(partitionName, extViewRec.getListField(partitionName)); |
| } |
| |
| System.out.println("ExternalView for " + resourceName + "/" + partitionName + ":"); |
| System.out.println(new String(new ZNRecordSerializer().serialize(partInfo))); |
| } else { |
| System.out.println("No externalView for " + resourceName + "/" + partitionName); |
| } |
| return 0; |
| |
| } else if (cmd.hasOption(enableInstance)) { |
| String clusterName = cmd.getOptionValues(enableInstance)[0]; |
| String instanceName = cmd.getOptionValues(enableInstance)[1]; |
| if (instanceName.contains(":")) { |
| instanceName = instanceName.replaceAll(":", "_"); |
| } |
| boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableInstance)[2].toLowerCase()); |
| |
| setupTool.getClusterManagementTool().enableInstance(clusterName, instanceName, enabled); |
| return 0; |
| } else if (cmd.hasOption(enableResource)) { |
| String clusterName = cmd.getOptionValues(enableResource)[0]; |
| String resourceName = cmd.getOptionValues(enableResource)[1]; |
| boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase()); |
| if (cmd.hasOption(tag)) { |
| String resourceTag = cmd.getOptionValues(tag)[0]; |
| setupTool.enableResource(clusterName, resourceName, resourceTag, enabled); |
| } else { |
| setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled); |
| } |
| } else if (cmd.hasOption(enablePartition)) { |
| String[] args = cmd.getOptionValues(enablePartition); |
| |
| boolean enabled = Boolean.parseBoolean(args[0].toLowerCase()); |
| String clusterName = args[1]; |
| String instanceName = args[2]; |
| String resourceName = args[3]; |
| |
| List<String> partitionNames = Arrays.asList(Arrays.copyOfRange(args, 4, args.length)); |
| setupTool.getClusterManagementTool().enablePartition(enabled, clusterName, instanceName, |
| resourceName, partitionNames); |
| return 0; |
| } else if (cmd.hasOption(resetPartition)) { |
| String[] args = cmd.getOptionValues(resetPartition); |
| |
| String clusterName = args[0]; |
| String instanceName = args[1]; |
| String resourceName = args[2]; |
| List<String> partitionNames = Arrays.asList(Arrays.copyOfRange(args, 3, args.length)); |
| |
| setupTool.getClusterManagementTool().resetPartition(clusterName, instanceName, resourceName, |
| partitionNames); |
| return 0; |
| } else if (cmd.hasOption(resetInstance)) { |
| String[] args = cmd.getOptionValues(resetInstance); |
| |
| String clusterName = args[0]; |
| List<String> instanceNames = Arrays.asList(Arrays.copyOfRange(args, 1, args.length)); |
| |
| setupTool.getClusterManagementTool().resetInstance(clusterName, instanceNames); |
| return 0; |
| } else if (cmd.hasOption(resetResource)) { |
| String[] args = cmd.getOptionValues(resetResource); |
| |
| String clusterName = args[0]; |
| List<String> resourceNames = Arrays.asList(Arrays.copyOfRange(args, 1, args.length)); |
| |
| setupTool.getClusterManagementTool().resetResource(clusterName, resourceNames); |
| return 0; |
| } else if (cmd.hasOption(enableCluster)) { |
| String[] params = cmd.getOptionValues(enableCluster); |
| String clusterName = params[0]; |
| boolean enabled = Boolean.parseBoolean(params[1].toLowerCase()); |
| setupTool.getClusterManagementTool().enableCluster(clusterName, enabled); |
| |
| return 0; |
| } else if (cmd.hasOption(listStateModels)) { |
| String clusterName = cmd.getOptionValues(listStateModels)[0]; |
| |
| List<String> stateModels = |
| setupTool.getClusterManagementTool().getStateModelDefs(clusterName); |
| |
| System.out.println("Existing state models:"); |
| for (String stateModel : stateModels) { |
| System.out.println(stateModel); |
| } |
| return 0; |
| } else if (cmd.hasOption(listStateModel)) { |
| String clusterName = cmd.getOptionValues(listStateModel)[0]; |
| String stateModel = cmd.getOptionValues(listStateModel)[1]; |
| StateModelDefinition stateModelDef = |
| setupTool.getClusterManagementTool().getStateModelDef(clusterName, stateModel); |
| String result = new String(new ZNRecordSerializer().serialize(stateModelDef.getRecord())); |
| System.out.println("StateModelDefinition: " + result); |
| return 0; |
| } else if (cmd.hasOption(addStateModelDef)) { |
| String clusterName = cmd.getOptionValues(addStateModelDef)[0]; |
| String stateModelFile = cmd.getOptionValues(addStateModelDef)[1]; |
| |
| ZNRecord stateModelRecord = |
| (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelFile))); |
| if (stateModelRecord.getId() == null || stateModelRecord.getId().length() == 0) { |
| throw new IllegalArgumentException("ZNRecord for state model definition must have an id"); |
| } |
| setupTool.getClusterManagementTool().addStateModelDef(clusterName, stateModelRecord.getId(), |
| new StateModelDefinition(stateModelRecord)); |
| return 0; |
| } else if (cmd.hasOption(addIdealState)) { |
| String clusterName = cmd.getOptionValues(addIdealState)[0]; |
| String resourceName = cmd.getOptionValues(addIdealState)[1]; |
| String idealStateFile = cmd.getOptionValues(addIdealState)[2]; |
| |
| setupTool.addIdealState(clusterName, resourceName, idealStateFile); |
| return 0; |
| } else if (cmd.hasOption(dropResource)) { |
| String clusterName = cmd.getOptionValues(dropResource)[0]; |
| String resourceName = cmd.getOptionValues(dropResource)[1]; |
| |
| setupTool.getClusterManagementTool().dropResource(clusterName, resourceName); |
| } else if (cmd.hasOption(swapInstance)) { |
| String clusterName = cmd.getOptionValues(swapInstance)[0]; |
| String oldInstanceName = cmd.getOptionValues(swapInstance)[1]; |
| String newInstanceName = cmd.getOptionValues(swapInstance)[2]; |
| |
| setupTool.swapInstance(clusterName, oldInstanceName, newInstanceName); |
| } |
| // set/get/remove config options |
| else if (cmd.hasOption(setConfig)) { |
| String values[] = cmd.getOptionValues(setConfig); |
| ConfigScopeProperty type = ConfigScopeProperty.valueOf(values[0]); |
| String scopeArgs = values[1]; |
| String keyValueMap = values[2]; |
| setupTool.setConfig(type, scopeArgs, keyValueMap); |
| } else if (cmd.hasOption(getConfig)) { |
| String values[] = cmd.getOptionValues(getConfig); |
| ConfigScopeProperty type = ConfigScopeProperty.valueOf(values[0]); |
| String scopeArgs = values[1]; |
| String keys = values[2]; |
| setupTool.getConfig(type, scopeArgs, keys); |
| } else if (cmd.hasOption(removeConfig)) { |
| String values[] = cmd.getOptionValues(removeConfig); |
| ConfigScopeProperty type = ConfigScopeProperty.valueOf(values[0]); |
| String scoepArgs = values[1]; |
| String keys = values[2]; |
| setupTool.removeConfig(type, scoepArgs, keys); |
| } |
| // set/get/remove constraint options |
| else if (cmd.hasOption(setConstraint)) { |
| String values[] = cmd.getOptionValues(setConstraint); |
| String clusterName = values[0]; |
| String constraintType = values[1]; |
| String constraintId = values[2]; |
| String constraintAttributesMap = values[3]; |
| setupTool.setConstraint(clusterName, constraintType, constraintId, constraintAttributesMap); |
| } else if (cmd.hasOption(getConstraints)) { |
| String values[] = cmd.getOptionValues(getConstraints); |
| String clusterName = values[0]; |
| String constraintType = values[1]; |
| setupTool.getConstraints(clusterName, constraintType); |
| } else if (cmd.hasOption(removeConstraint)) { |
| String values[] = cmd.getOptionValues(removeConstraint); |
| String clusterName = values[0]; |
| String constraintType = values[1]; |
| String constraintId = values[2]; |
| setupTool.removeConstraint(clusterName, constraintType, constraintId); |
| } else if (cmd.hasOption(addInstanceTag)) { |
| String clusterName = cmd.getOptionValues(addInstanceTag)[0]; |
| String instanceName = cmd.getOptionValues(addInstanceTag)[1]; |
| String tag = cmd.getOptionValues(addInstanceTag)[2]; |
| setupTool.getClusterManagementTool().addInstanceTag(clusterName, instanceName, tag); |
| } else if (cmd.hasOption(removeInstanceTag)) { |
| String clusterName = cmd.getOptionValues(removeInstanceTag)[0]; |
| String instanceName = cmd.getOptionValues(removeInstanceTag)[1]; |
| String tag = cmd.getOptionValues(removeInstanceTag)[2]; |
| setupTool.getClusterManagementTool().removeInstanceTag(clusterName, instanceName, tag); |
| } |
| // help option |
| else if (cmd.hasOption(help)) { |
| printUsage(cliOptions); |
| return 0; |
| } else if (cmd.hasOption(addResourceProperty)) { |
| String clusterName = cmd.getOptionValues(addResourceProperty)[0]; |
| String resourceName = cmd.getOptionValues(addResourceProperty)[1]; |
| String propertyKey = cmd.getOptionValues(addResourceProperty)[2]; |
| String propertyVal = cmd.getOptionValues(addResourceProperty)[3]; |
| |
| setupTool.addResourceProperty(clusterName, resourceName, propertyKey, propertyVal); |
| return 0; |
| } else if (cmd.hasOption(removeResourceProperty)) { |
| String clusterName = cmd.getOptionValues(removeResourceProperty)[0]; |
| String resourceName = cmd.getOptionValues(removeResourceProperty)[1]; |
| String propertyKey = cmd.getOptionValues(removeResourceProperty)[2]; |
| |
| setupTool.removeResourceProperty(clusterName, resourceName, propertyKey); |
| return 0; |
| } else if (cmd.hasOption(setCloudConfig)) { |
| String clusterName = cmd.getOptionValues(setCloudConfig)[0]; |
| String cloudConfigManifest = cmd.getOptionValues(setCloudConfig)[1]; |
| setupTool.setCloudConfig(clusterName, cloudConfigManifest); |
| return 0; |
| } else if (cmd.hasOption(removeCloudConfig)) { |
| String clusterName = cmd.getOptionValues(removeCloudConfig)[0]; |
| setupTool.removeCloudConfig(clusterName); |
| return 0; |
| } |
| return 0; |
| } |
| |
| // TODO: remove this. has moved to ZkHelixAdmin |
| public void addIdealState(String clusterName, String resourceName, String idealStateFile) |
| throws IOException { |
| ZNRecord idealStateRecord = |
| (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateFile))); |
| if (idealStateRecord.getId() == null || !idealStateRecord.getId().equals(resourceName)) { |
| throw new IllegalArgumentException("ideal state must have same id as resource name"); |
| } |
| _admin.setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord)); |
| } |
| |
| public void addResourceProperty(String clusterName, String resourceName, String propertyKey, |
| String propertyVal) { |
| IdealState idealState = _admin.getResourceIdealState(clusterName, resourceName); |
| if (idealState == null) { |
| throw new HelixException("Resource: " + resourceName + " has NOT been added yet"); |
| } |
| idealState.getRecord().setSimpleField(propertyKey, propertyVal); |
| _admin.setResourceIdealState(clusterName, resourceName, idealState); |
| } |
| |
| public void removeResourceProperty(String clusterName, String resourceName, String propertyKey) { |
| IdealState idealState = _admin.getResourceIdealState(clusterName, resourceName); |
| if (idealState == null) { |
| throw new HelixException("Resource: " + resourceName + " has NOT been added yet"); |
| } |
| idealState.getRecord().getSimpleFields().remove(propertyKey); |
| _admin.setResourceIdealState(clusterName, resourceName, idealState); |
| } |
| |
| /** |
| * @param args |
| * @throws Exception |
| */ |
| public static void main(String[] args) throws Exception { |
| if (args.length == 1 && args[0].equals("setup-test-cluster")) { |
| System.out |
| .println("By default setting up TestCluster with 6 instances, 10 partitions, Each partition will have 3 replicas"); |
| new ClusterSetup("localhost:2181").setupTestCluster("TestCluster"); |
| System.exit(0); |
| } |
| |
| int ret = processCommandLineArgs(args); |
| System.exit(ret); |
| } |
| |
| public static class Builder extends GenericZkHelixApiBuilder<Builder> { |
| public Builder() { |
| } |
| |
| public ClusterSetup build() { |
| validate(); |
| return new ClusterSetup( |
| createZkClient(_realmMode, _realmAwareZkConnectionConfig, _realmAwareZkClientConfig, |
| _zkAddress), false); |
| } |
| } |
| } |