blob: 3ebba04ed513ced54f0cab5635e96a0e4949cb5a [file] [log] [blame]
package org.apache.helix.tools;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.task.TaskConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is deprecated, please use dedicated verifier classes, such as BestPossibleExternViewVerifier, etc, in tools.ClusterVerifiers
*/
@Deprecated
public class ClusterStateVerifier {
public static String cluster = "cluster";
public static String zkServerAddress = "zkSvr";
public static String help = "help";
public static String timeout = "timeout";
public static String period = "period";
public static String resources = "resources";
private static Logger LOG = LoggerFactory.getLogger(ClusterStateVerifier.class);
public interface Verifier {
boolean verify();
}
public interface ZkVerifier extends Verifier {
ZkClient getZkClient();
String getClusterName();
}
/** Use BestPossibleExternViewVerifier instead */
@Deprecated
static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
final CountDownLatch _countDown;
final HelixZkClient _zkClient;
final Verifier _verifier;
public ExtViewVeriferZkListener(CountDownLatch countDown, HelixZkClient zkClient,
ZkVerifier verifier) {
_countDown = countDown;
_zkClient = zkClient;
_verifier = verifier;
}
@Override
@PreFetch(enabled = false)
public void handleDataChange(String dataPath, Object data) throws Exception {
boolean result = _verifier.verify();
if (result == true) {
_countDown.countDown();
}
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
for (String child : currentChilds) {
String childPath = parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
_zkClient.subscribeDataChanges(childPath, this);
}
boolean result = _verifier.verify();
if (result == true) {
_countDown.countDown();
}
}
}
private static HelixZkClient validateAndGetClient(String zkAddr, String clusterName) {
if (zkAddr == null || clusterName == null) {
throw new IllegalArgumentException("requires zkAddr|clusterName");
}
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new ZNRecordSerializer());
return DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
}
public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
private final String clusterName;
private final Map<String, Map<String, String>> errStates;
private final HelixZkClient zkClient;
private final Set<String> resources;
public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
this(zkAddr, clusterName, null);
}
public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
Map<String, Map<String, String>> errStates) {
this(zkAddr, clusterName, errStates, null);
}
public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
Map<String, Map<String, String>> errStates, Set<String> resources) {
this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
}
public BestPossAndExtViewZkVerifier(HelixZkClient zkClient, String clusterName,
Map<String, Map<String, String>> errStates, Set<String> resources) {
if (zkClient == null || clusterName == null) {
throw new IllegalArgumentException("requires zkClient|clusterName");
}
this.clusterName = clusterName;
this.errStates = errStates;
this.zkClient = zkClient;
this.resources = resources;
}
@Override
public boolean verify() {
try {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
return verifyBestPossAndExtView(accessor, errStates, clusterName, resources);
} catch (Exception e) {
LOG.error("exception in verification", e);
}
return false;
}
private boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
Map<String, Map<String, String>> errStates, String clusterName, Set<String> resources) {
try {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
// read cluster once and do verification
ResourceControllerDataProvider cache = new ResourceControllerDataProvider(clusterName);
cache.refresh(accessor);
Map<String, IdealState> idealStates = new HashMap<>(cache.getIdealStates());
// filter out all resources that use Task state model
Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, IdealState> pair = it.next();
if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
it.remove();
}
}
Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
if (extViews == null) {
extViews = Collections.emptyMap();
}
// Filter resources if requested
if (resources != null && !resources.isEmpty()) {
idealStates.keySet().retainAll(resources);
extViews.keySet().retainAll(resources);
}
// if externalView is not empty and idealState doesn't exist
// add empty idealState for the resource
for (String resource : extViews.keySet()) {
if (!idealStates.containsKey(resource)) {
idealStates.put(resource, new IdealState(resource));
}
}
// calculate best possible state
BestPossibleStateOutput bestPossOutput = calcBestPossState(cache, resources);
Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
bestPossOutput.getStateMap();
// set error states
if (errStates != null) {
for (String resourceName : errStates.keySet()) {
Map<String, String> partErrStates = errStates.get(resourceName);
for (String partitionName : partErrStates.keySet()) {
String instanceName = partErrStates.get(partitionName);
if (!bestPossStateMap.containsKey(resourceName)) {
bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
}
Partition partition = new Partition(partitionName);
if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
}
bestPossStateMap.get(resourceName).get(partition)
.put(instanceName, HelixDefinedState.ERROR.toString());
}
}
}
// System.out.println("stateMap: " + bestPossStateMap);
for (String resourceName : idealStates.keySet()) {
ExternalView extView = extViews.get(resourceName);
if (extView == null) {
IdealState is = idealStates.get(resourceName);
if (is.isExternalViewDisabled()) {
continue;
} else {
LOG.info("externalView for " + resourceName + " is not available");
return false;
}
}
// step 0: remove empty map and DROPPED state from best possible state
Map<Partition, Map<String, String>> bpStateMap =
bestPossOutput.getResourceMap(resourceName);
Iterator<Map.Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Partition, Map<String, String>> entry = iter.next();
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.isEmpty()) {
iter.remove();
} else {
// remove instances with DROPPED state
Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
while (insIter.hasNext()) {
Map.Entry<String, String> insEntry = insIter.next();
String state = insEntry.getValue();
if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
insIter.remove();
}
}
}
}
// System.err.println("resource: " + resourceName + ", bpStateMap: " + bpStateMap);
// step 1: externalView and bestPossibleState has equal size
int extViewSize = extView.getRecord().getMapFields().size();
int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
if (extViewSize != bestPossStateSize) {
LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
+ bestPossStateSize + ") for resource: " + resourceName);
// System.err.println("exterView size (" + extViewSize
// + ") is different from bestPossState size (" + bestPossStateSize
// + ") for resource: " + resourceName);
// System.out.println("extView: " + extView.getRecord().getMapFields());
// System.out.println("bestPossState: " +
// bestPossOutput.getResourceMap(resourceName));
return false;
}
// step 2: every entry in external view is contained in best possible state
for (String partition : extView.getRecord().getMapFields().keySet()) {
Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
Map<String, String> bpInstanceStateMap =
bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
boolean result = compareMap(evInstanceStateMap, bpInstanceStateMap);
if (result == false) {
LOG.info("externalView is different from bestPossibleState for partition:" + partition);
// System.err.println("externalView is different from bestPossibleState for partition: "
// + partition + ", actual: " + evInstanceStateMap + ", bestPoss: " +
// bpInstanceStateMap);
return false;
}
}
}
return true;
} catch (Exception e) {
LOG.error("exception in verification", e);
return false;
}
}
/**
* calculate the best possible state note that DROPPED states are not checked since when
* kick off the BestPossibleStateCalcStage we are providing an empty current state map
*
* @param cache
* @return
* @throws Exception
*/
private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider cache,
Set<String> resources) throws Exception {
ClusterEvent event = new ClusterEvent(ClusterEventType.StateVerifier);
event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
ResourceComputationStage rcState = new ResourceComputationStage();
CurrentStateComputationStage csStage = new CurrentStateComputationStage();
BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
runStage(event, rcState);
// Filter resources if specified
if (resources != null) {
Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
resourceMap.keySet().retainAll(resources);
}
runStage(event, csStage);
runStage(event, bpStage);
BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
// System.out.println("output:" + output);
return output;
}
private void runStage(ClusterEvent event, Stage stage) throws Exception {
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
stage.process(event);
stage.postProcess();
}
private <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2) {
boolean isEqual = true;
if (map1 == null && map2 == null) {
// OK
} else if (map1 == null && map2 != null) {
if (!map2.isEmpty()) {
isEqual = false;
}
} else if (map1 != null && map2 == null) {
if (!map1.isEmpty()) {
isEqual = false;
}
} else {
// verify size
if (map1.size() != map2.size()) {
isEqual = false;
}
// verify each <key, value> in map1 is contained in map2
for (K key : map1.keySet()) {
if (!map1.get(key).equals(map2.get(key))) {
LOG.debug(
"different value for key: " + key + "(map1: " + map1.get(key) + ", map2: " + map2
.get(key) + ")");
isEqual = false;
break;
}
}
}
return isEqual;
}
@Override
public ZkClient getZkClient() {
return (ZkClient) zkClient;
}
@Override
public String getClusterName() {
return clusterName;
}
@Override
public String toString() {
String verifierName = getClass().getName();
verifierName = verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
return verifierName + "(" + clusterName + "@" + zkClient.getServers() + ")";
}
}
public static class MasterNbInExtViewVerifier implements ZkVerifier {
private final String clusterName;
private final HelixZkClient zkClient;
public MasterNbInExtViewVerifier(String zkAddr, String clusterName) {
this(validateAndGetClient(zkAddr, clusterName), clusterName);
}
public MasterNbInExtViewVerifier(HelixZkClient zkClient, String clusterName) {
if (zkClient == null || clusterName == null) {
throw new IllegalArgumentException("requires zkClient|clusterName");
}
this.clusterName = clusterName;
this.zkClient = zkClient;
}
@Override
public boolean verify() {
try {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
return verifyMasterNbInExtView(accessor);
} catch (Exception e) {
LOG.error("exception in verification", e);
}
return false;
}
@Override
public ZkClient getZkClient() {
return (ZkClient) zkClient;
}
@Override
public String getClusterName() {
return clusterName;
}
private boolean verifyMasterNbInExtView(HelixDataAccessor accessor) {
Builder keyBuilder = accessor.keyBuilder();
Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates());
if (idealStates == null || idealStates.size() == 0) {
LOG.info("No resource idealState");
return true;
}
Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
if (extViews == null || extViews.size() < idealStates.size()) {
LOG.info("No externalViews | externalView.size() < idealState.size()");
return false;
}
for (String resource : extViews.keySet()) {
int partitions = idealStates.get(resource).getNumPartitions();
Map<String, Map<String, String>> instanceStateMap =
extViews.get(resource).getRecord().getMapFields();
if (instanceStateMap.size() < partitions) {
LOG.info("Number of externalViews (" + instanceStateMap.size() + ") < partitions ("
+ partitions + ")");
return false;
}
for (String partition : instanceStateMap.keySet()) {
boolean foundMaster = false;
for (String instance : instanceStateMap.get(partition).keySet()) {
if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER")) {
foundMaster = true;
break;
}
}
if (!foundMaster) {
LOG.info("No MASTER for partition: " + partition);
return false;
}
}
}
return true;
}
}
public static boolean verifyByPolling(Verifier verifier) {
return verifyByPolling(verifier, 30 * 1000);
}
public static boolean verifyByPolling(Verifier verifier, long timeout) {
return verifyByPolling(verifier, timeout, 1000);
}
public static boolean verifyByPolling(Verifier verifier, long timeout, long period) {
long startTime = System.currentTimeMillis();
boolean result = false;
try {
long curTime;
do {
Thread.sleep(period);
result = verifier.verify();
if (result == true) {
break;
}
curTime = System.currentTimeMillis();
} while (curTime <= startTime + timeout);
return result;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
long endTime = System.currentTimeMillis();
// debug
System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
+ "ms to verify");
}
return false;
}
public static boolean verifyByZkCallback(ZkVerifier verifier) {
return verifyByZkCallback(verifier, 30000);
}
/**
* This function should be always single threaded
*
* @param verifier
* @param timeout
* @return
*/
public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout) {
long startTime = System.currentTimeMillis();
CountDownLatch countDown = new CountDownLatch(1);
HelixZkClient zkClient = verifier.getZkClient();
String clusterName = verifier.getClusterName();
// add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
// so when analyze zk log, we know when a test ends
try {
zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
} catch (ZkNodeExistsException ex) {
LOG.error("There is already a verification in progress", ex);
throw ex;
}
ExtViewVeriferZkListener listener = new ExtViewVeriferZkListener(countDown, zkClient, verifier);
String extViewPath = PropertyPathBuilder.externalView(clusterName);
zkClient.subscribeChildChanges(extViewPath, listener);
for (String child : zkClient.getChildren(extViewPath)) {
String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
zkClient.subscribeDataChanges(childPath, listener);
}
// do initial verify
boolean result = verifier.verify();
if (result == false) {
try {
result = countDown.await(timeout, TimeUnit.MILLISECONDS);
if (result == false) {
// make a final try if timeout
result = verifier.verify();
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// clean up
zkClient.unsubscribeChildChanges(extViewPath, listener);
for (String child : zkClient.getChildren(extViewPath)) {
String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
zkClient.unsubscribeDataChanges(childPath, listener);
}
long endTime = System.currentTimeMillis();
zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
// debug
System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
return result;
}
@SuppressWarnings("static-access")
private static Options constructCommandLineOptions() {
Option helpOption =
OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info")
.create();
Option zkServerOption =
OptionBuilder.withLongOpt(zkServerAddress).withDescription("Provide zookeeper address")
.create();
zkServerOption.setArgs(1);
zkServerOption.setRequired(true);
zkServerOption.setArgName("ZookeeperServerAddress(Required)");
Option clusterOption =
OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster name").create();
clusterOption.setArgs(1);
clusterOption.setRequired(true);
clusterOption.setArgName("Cluster name (Required)");
Option timeoutOption =
OptionBuilder.withLongOpt(timeout).withDescription("Timeout value for verification")
.create();
timeoutOption.setArgs(1);
timeoutOption.setArgName("Timeout value (Optional), default=30s");
Option sleepIntervalOption =
OptionBuilder.withLongOpt(period).withDescription("Polling period for verification")
.create();
sleepIntervalOption.setArgs(1);
sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
Option resourcesOption =
OptionBuilder.withLongOpt(resources).withDescription("Specific set of resources to verify")
.create();
resourcesOption.setArgs(1);
resourcesOption.setArgName("Comma-separated resource names, default is all resources");
Options options = new Options();
options.addOption(helpOption);
options.addOption(zkServerOption);
options.addOption(clusterOption);
options.addOption(timeoutOption);
options.addOption(sleepIntervalOption);
options.addOption(resourcesOption);
return options;
}
public static void printUsage(Options cliOptions) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setWidth(1000);
helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
}
public static CommandLine processCommandLineArgs(String[] cliArgs) {
CommandLineParser cliParser = new GnuParser();
Options cliOptions = constructCommandLineOptions();
// CommandLine cmd = null;
try {
return cliParser.parse(cliOptions, cliArgs);
} catch (ParseException pe) {
System.err.println("CommandLineClient: failed to parse command-line options: "
+ pe.toString());
printUsage(cliOptions);
System.exit(1);
}
return null;
}
public static boolean verifyState(String[] args) {
// TODO Auto-generated method stub
String clusterName = "storage-cluster";
String zkServer = "localhost:2181";
long timeoutValue = 0;
long periodValue = 1000;
Set<String> resourceSet = null;
if (args.length > 0) {
CommandLine cmd = processCommandLineArgs(args);
zkServer = cmd.getOptionValue(zkServerAddress);
clusterName = cmd.getOptionValue(cluster);
String timeoutStr = cmd.getOptionValue(timeout);
String periodStr = cmd.getOptionValue(period);
String resourceStr = cmd.getOptionValue(resources);
if (timeoutStr != null) {
try {
timeoutValue = Long.parseLong(timeoutStr);
} catch (Exception e) {
System.err.println("Exception in converting " + timeoutStr + " to long. Use default (0)");
}
}
if (periodStr != null) {
try {
periodValue = Long.parseLong(periodStr);
} catch (Exception e) {
System.err.println("Exception in converting " + periodStr
+ " to long. Use default (1000)");
}
}
// Allow specifying resources explicitly
if (resourceStr != null) {
String[] resources = resourceStr.split("[\\s,]");
resourceSet = Sets.newHashSet(resources);
}
}
// return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
// timeoutValue,
// periodValue);
ZkVerifier verifier;
if (resourceSet == null) {
verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName);
} else {
verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName, null, resourceSet);
}
return verifyByZkCallback(verifier, timeoutValue);
}
public static void main(String[] args) {
boolean result = verifyState(args);
System.out.println(result ? "Successful" : "failed");
System.exit(1);
}
}