blob: ca47c16b9e9c335647725d134bb406269cc4fd18 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Verifier that verifies whether the ExternalViews of given resources (or all resources in the cluster)
* match exactly as its ideal mapping (in idealstate).
* To use this verifier on resources in Full-Auto mode, BestPossible state must be persisted in Cluster config.
public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
private static Logger LOG = LoggerFactory.getLogger(StrictMatchExternalViewVerifier.class);
private final Set<String> _resources;
private final Set<String> _expectLiveInstances;
private final boolean _isDeactivatedNodeAware;
public StrictMatchExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
Set<String> expectLiveInstances) {
this(zkAddr, clusterName, resources, expectLiveInstances, false, 0);
public StrictMatchExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
Set<String> resources, Set<String> expectLiveInstances) {
// usesExternalZkClient = true because ZkClient is given by the caller
// at close(), we will not close this ZkClient because it might be being used elsewhere
super(zkClient, clusterName, true, 0);
_resources = resources == null ? new HashSet<>() : new HashSet<>(resources);
_expectLiveInstances =
expectLiveInstances == null ? new HashSet<>() : new HashSet<>(expectLiveInstances);
_isDeactivatedNodeAware = false;
private StrictMatchExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
Set<String> expectLiveInstances, boolean isDeactivatedNodeAware, int waitTillVerify) {
super(zkAddr, clusterName, waitTillVerify);
_resources = resources;
_expectLiveInstances = expectLiveInstances;
_isDeactivatedNodeAware = isDeactivatedNodeAware;
private StrictMatchExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
Set<String> resources, Set<String> expectLiveInstances, boolean isDeactivatedNodeAware,
int waitPeriodTillVerify, boolean usesExternalZkClient) {
// Initialize StrictMatchExternalViewVerifier with usesExternalZkClient = false so that
// StrictMatchExternalViewVerifier::close() would close ZkClient to prevent thread leakage
super(zkClient, clusterName, usesExternalZkClient, 0);
_resources = resources == null ? new HashSet<>() : new HashSet<>(resources);
_expectLiveInstances =
expectLiveInstances == null ? new HashSet<>() : new HashSet<>(expectLiveInstances);
_isDeactivatedNodeAware = isDeactivatedNodeAware;
public static class Builder extends ZkHelixClusterVerifier.Builder<Builder> {
private final String _clusterName; // This is the ZK path sharding key
private Set<String> _resources;
private Set<String> _expectLiveInstances;
private RealmAwareZkClient _zkClient;
// For backward compatibility, set the default isDeactivatedNodeAware to be false.
private boolean _isDeactivatedNodeAware = false;
private boolean _usesExternalZkClient = false; // false by default
public StrictMatchExternalViewVerifier build() {
if (_clusterName == null) {
throw new IllegalArgumentException("Cluster name is missing!");
if (_zkClient != null) {
return new StrictMatchExternalViewVerifier(_zkClient, _clusterName, _resources,
_expectLiveInstances, _isDeactivatedNodeAware, _waitPeriodTillVerify,
if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig == null) {
// For backward-compatibility
return new StrictMatchExternalViewVerifier(_zkAddress, _clusterName, _resources,
_expectLiveInstances, _isDeactivatedNodeAware, _waitPeriodTillVerify);
return new StrictMatchExternalViewVerifier(
createZkClient(RealmAwareZkClient.RealmMode.SINGLE_REALM, _realmAwareZkConnectionConfig,
_realmAwareZkClientConfig, _zkAddress), _clusterName, _resources,
_expectLiveInstances, _isDeactivatedNodeAware, _waitPeriodTillVerify,
public Builder(String clusterName) {
_clusterName = clusterName;
public String getClusterName() {
return _clusterName;
public Set<String> getResources() {
return _resources;
public Builder setResources(Set<String> resources) {
_resources = resources;
return this;
public Set<String> getExpectLiveInstances() {
return _expectLiveInstances;
public Builder setExpectLiveInstances(Set<String> expectLiveInstances) {
_expectLiveInstances = expectLiveInstances;
return this;
public String getZkAddress() {
return _zkAddress;
public Builder setZkClient(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
_usesExternalZkClient = true; // Set the flag since external ZkClient is used
return this;
public boolean getDeactivatedNodeAwareness() {
return _isDeactivatedNodeAware;
public Builder setDeactivatedNodeAwareness(boolean isDeactivatedNodeAware) {
_isDeactivatedNodeAware = isDeactivatedNodeAware;
return this;
protected void validate() {
if (!_clusterName.equals(_realmAwareZkConnectionConfig.getZkRealmShardingKey())) {
throw new IllegalArgumentException(
"StrictMatchExternalViewVerifier: Cluster name: " + _clusterName
+ " and ZK realm sharding key: " + _realmAwareZkConnectionConfig
.getZkRealmShardingKey() + " do not match!");
public boolean verify(long timeout) {
return verifyByZkCallback(timeout);
public boolean verifyByZkCallback(long timeout) {
List<ClusterVerifyTrigger> triggers = new ArrayList<ClusterVerifyTrigger>();
// setup triggers
if (_resources != null && !_resources.isEmpty()) {
for (String resource : _resources) {
.add(new ClusterVerifyTrigger(_keyBuilder.idealStates(resource), true, false, false));
.add(new ClusterVerifyTrigger(_keyBuilder.externalView(resource), true, false, false));
} else {
triggers.add(new ClusterVerifyTrigger(_keyBuilder.idealStates(), false, true, true));
triggers.add(new ClusterVerifyTrigger(_keyBuilder.externalViews(), false, true, true));
return verifyByCallback(timeout, triggers);
protected boolean verifyState() {
try {
PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
// read cluster once and do verification
ResourceControllerDataProvider cache = new ResourceControllerDataProvider();
Map<String, IdealState> idealStates = new HashMap<>(cache.getIdealStates());
// filter out all resources that use Task state model
Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, IdealState> pair =;
if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
// verify live instances.
if (_expectLiveInstances != null && !_expectLiveInstances.isEmpty()) {
Set<String> actualLiveNodes = cache.getLiveInstances().keySet();
if (!_expectLiveInstances.equals(actualLiveNodes)) {
return false;
Map<String, ExternalView> extViews =
_accessor.getChildValuesMap(keyBuilder.externalViews(), true);
if (extViews == null) {
extViews = Collections.emptyMap();
// Filter resources if requested
if (_resources != null && !_resources.isEmpty()) {
// if externalView is not empty and idealState doesn't exist
// add empty idealState for the resource
for (String resource : extViews.keySet()) {
if (!idealStates.containsKey(resource)) {
idealStates.put(resource, new IdealState(resource));
for (String resourceName : idealStates.keySet()) {
ExternalView extView = extViews.get(resourceName);
IdealState idealState = idealStates.get(resourceName);
if (extView == null) {
if (idealState.isExternalViewDisabled()) {
} else {
LOG.debug("externalView for " + resourceName + " is not available");
return false;
boolean result = verifyExternalView(cache, extView, idealState);
if (!result) {
return false;
return true;
} catch (Exception e) {
LOG.error("exception in verification", e);
return false;
private boolean verifyExternalView(ResourceControllerDataProvider dataCache, ExternalView externalView,
IdealState idealState) {
Map<String, Map<String, String>> mappingInExtview = externalView.getRecord().getMapFields();
Map<String, Map<String, String>> idealPartitionState;
switch (idealState.getRebalanceMode()) {
ClusterConfig clusterConfig = new ConfigAccessor(_zkClient).getClusterConfig(dataCache.getClusterName());
if (!clusterConfig.isPersistBestPossibleAssignment() && !clusterConfig.isPersistIntermediateAssignment()) {
throw new HelixException(String.format("Full-Auto IdealState verifier requires "
+ "is enabled."));
for (String partition : idealState.getPartitionSet()) {
if (idealState.getPreferenceList(partition) == null || idealState.getPreferenceList(partition).isEmpty()) {
return false;
idealPartitionState = computeIdealPartitionState(dataCache, idealState);
idealPartitionState = computeIdealPartitionState(dataCache, idealState);
idealPartitionState = idealState.getRecord().getMapFields();
case TASK:
// ignore jobs
return true;
return mappingInExtview.equals(idealPartitionState);
private Map<String, Map<String, String>> computeIdealPartitionState(
ResourceControllerDataProvider cache, IdealState idealState) {
String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
Map<String, Map<String, String>> idealPartitionState = new HashMap<>();
for (String partition : idealState.getPartitionSet()) {
List<String> preferenceList = AbstractRebalancer
.getPreferenceList(new Partition(partition), idealState, cache.getEnabledLiveInstances());
Map<String, String> idealMapping;
if (_isDeactivatedNodeAware) {
idealMapping = HelixUtil
.computeIdealMapping(preferenceList, stateModelDef, cache.getLiveInstances().keySet(),
cache.getDisabledInstancesForPartition(idealState.getResourceName(), partition));
} else {
idealMapping = HelixUtil
.computeIdealMapping(preferenceList, stateModelDef, cache.getEnabledLiveInstances(),
idealPartitionState.put(partition, idealMapping);
return idealPartitionState;
public String toString() {
String verifierName = getClass().getSimpleName();
return String
.format("%s(%s@%s@resources[%s])", verifierName, _clusterName, _zkClient.getServers(),
_resources != null ? Arrays.toString(_resources.toArray()) : "");
public void finalize() {