blob: 990d60f3aafbd571d30c3c30c1b00071bc582c97 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.bookkeeper.client;
import static;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
import static org.apache.bookkeeper.client.BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_FAULTDOMAIN;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.Configurable;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Simple zoneaware ensemble placement policy.
public class ZoneawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy {
static final Logger LOG = LoggerFactory.getLogger(ZoneawareEnsemblePlacementPolicyImpl.class);
public static final String UNKNOWN_ZONE = "UnknownZone";
* this defaultFaultDomain is used as placeholder network location for
* bookies for which network location can't be resolved. In
* ZoneawareEnsemblePlacementPolicyImpl zone is the fault domain and upgrade
* domain is logical concept to enable parallel patching by bringing down
* all the bookies in the upgrade domain.
private String defaultFaultDomain = NetworkTopology.DEFAULT_ZONE_AND_UPGRADEDOMAIN;
protected ZoneAwareNodeLocation unresolvedNodeLocation = new ZoneAwareNodeLocation(
private final Random rand;
protected StatsLogger statsLogger = null;
// Use a loading cache so slow bookies are expired. Use entryId as values.
protected Cache<BookieId, Long> slowBookies;
protected BookieNode myNode = null;
protected String myZone = null;
protected boolean reorderReadsRandom = false;
protected int stabilizePeriodSeconds = 0;
protected int reorderThresholdPendingRequests = 0;
protected int maxWeightMultiple;
protected int minNumZonesPerWriteQuorum;
protected int desiredNumZonesPerWriteQuorum;
protected boolean enforceStrictZoneawarePlacement;
protected HashedWheelTimer timer;
protected final ConcurrentMap<BookieId, ZoneAwareNodeLocation> address2NodePlacement;
@StatsDoc(name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER, help = "Counter for number of times"
+ " DNSResolverDecorator failed to resolve Network Location")
protected Counter failedToResolveNetworkLocationCounter = null;
@StatsDoc(name = NUM_WRITABLE_BOOKIES_IN_DEFAULT_FAULTDOMAIN, help = "Gauge for the number of writable"
+ " Bookies in default fault domain")
protected Gauge<Integer> numWritableBookiesInDefaultFaultDomain;
* Zone and UpgradeDomain pair of a node.
public static class ZoneAwareNodeLocation {
private final String zone;
private final String upgradeDomain;
private final String repString;
public ZoneAwareNodeLocation(String zone, String upgradeDomain) { = zone;
this.upgradeDomain = upgradeDomain;
repString = zone + upgradeDomain;
public String getZone() {
return zone;
public String getUpgradeDomain() {
return upgradeDomain;
public int hashCode() {
return repString.hashCode();
public boolean equals(Object obj) {
return ((obj instanceof ZoneAwareNodeLocation)
&& repString.equals(((ZoneAwareNodeLocation) obj).repString));
ZoneawareEnsemblePlacementPolicyImpl() {
address2NodePlacement = new ConcurrentHashMap<BookieId, ZoneAwareNodeLocation>();
rand = new Random(System.currentTimeMillis());
protected ZoneAwareNodeLocation getZoneAwareNodeLocation(BookieId addr) {
ZoneAwareNodeLocation nodeLocation = address2NodePlacement.get(addr);
if (null == nodeLocation) {
String networkLocation = resolveNetworkLocation(addr);
if (getDefaultFaultDomain().equals(networkLocation)) {
nodeLocation = unresolvedNodeLocation;
} else {
String[] parts = StringUtils.split(NodeBase.normalize(networkLocation), NodeBase.PATH_SEPARATOR);
if (parts.length != 2) {
nodeLocation = unresolvedNodeLocation;
} else {
nodeLocation = new ZoneAwareNodeLocation(NodeBase.PATH_SEPARATOR_STR + parts[0],
NodeBase.PATH_SEPARATOR_STR + parts[1]);
address2NodePlacement.putIfAbsent(addr, nodeLocation);
return nodeLocation;
protected ZoneAwareNodeLocation getZoneAwareNodeLocation(BookieNode node) {
if (null == node || null == node.getAddr()) {
return unresolvedNodeLocation;
return getZoneAwareNodeLocation(node.getAddr());
public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
this.statsLogger = statsLogger;
this.bookieAddressResolver = bookieAddressResolver;
this.timer = timer;
this.bookiesJoinedCounter = statsLogger.getOpStatsLogger(BOOKIES_JOINED);
this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
this.failedToResolveNetworkLocationCounter = statsLogger.getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
this.numWritableBookiesInDefaultFaultDomain = new Gauge<Integer>() {
public Integer getDefaultValue() {
return 0;
public Integer getSample() {
try {
return topology.countNumOfAvailableNodes(getDefaultFaultDomain(), Collections.emptySet());
} finally {
this.reorderThresholdPendingRequests = conf.getReorderThresholdPendingRequests();
this.isWeighted = conf.getDiskWeightBasedPlacementEnabled();
if (this.isWeighted) {
this.maxWeightMultiple = conf.getBookieMaxWeightMultipleForWeightBasedPlacement();
this.weightedSelection = new DynamicWeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);"Weight based placement with max multiple of {}", this.maxWeightMultiple);
} else {"Not weighted");
this.minNumZonesPerWriteQuorum = conf.getMinNumZonesPerWriteQuorum();
this.desiredNumZonesPerWriteQuorum = conf.getDesiredNumZonesPerWriteQuorum();
this.enforceStrictZoneawarePlacement = conf.getEnforceStrictZoneawarePlacement();
if (minNumZonesPerWriteQuorum > desiredNumZonesPerWriteQuorum) {
"It is misconfigured, for ZoneawareEnsemblePlacementPolicy, minNumZonesPerWriteQuorum: {} cann't be"
+ " greater than desiredNumZonesPerWriteQuorum: {}",
minNumZonesPerWriteQuorum, desiredNumZonesPerWriteQuorum);
throw new IllegalArgumentException("minNumZonesPerWriteQuorum: " + minNumZonesPerWriteQuorum
+ " cann't be greater than desiredNumZonesPerWriteQuorum: " + desiredNumZonesPerWriteQuorum);
DNSToSwitchMapping actualDNSResolver;
if (optionalDnsResolver.isPresent()) {
actualDNSResolver = optionalDnsResolver.get();
} else {
String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
actualDNSResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
if (actualDNSResolver instanceof Configurable) {
((Configurable) actualDNSResolver).setConf(conf);
this.dnsResolver = new DNSResolverDecorator(actualDNSResolver, () -> this.getDefaultFaultDomain(),
this.stabilizePeriodSeconds = conf.getNetworkTopologyStabilizePeriodSeconds();
// create the network topology
if (stabilizePeriodSeconds > 0) {
this.topology = new StabilizeNetworkTopology(timer, stabilizePeriodSeconds);
} else {
this.topology = new NetworkTopologyImpl();
try {
myNode = createDummyLocalBookieNode(InetAddress.getLocalHost().getHostAddress());
myZone = getZoneAwareNodeLocation(myNode).getZone();
} catch (IOException e) {
LOG.error("Failed to get local host address : ", e);
throw new RuntimeException(e);
}"Initialized zoneaware ensemble placement policy @ {} @ {} : {}.", myNode,
myNode.getNetworkLocation(), dnsResolver.getClass().getName());
slowBookies = CacheBuilder.newBuilder()
.expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
.build(new CacheLoader<BookieId, Long>() {
public Long load(BookieId key) throws Exception {
return -1L;
return this;
public ZoneawareEnsemblePlacementPolicyImpl withDefaultFaultDomain(String defaultFaultDomain) {
checkNotNull(defaultFaultDomain, "Default fault domain cannot be null");
String[] parts = StringUtils.split(NodeBase.normalize(defaultFaultDomain), NodeBase.PATH_SEPARATOR);
if (parts.length != 2) {
LOG.error("provided defaultFaultDomain: {} is not valid", defaultFaultDomain);
throw new IllegalArgumentException("invalid defaultFaultDomain");
} else {
unresolvedNodeLocation = new ZoneAwareNodeLocation(NodeBase.PATH_SEPARATOR_STR + parts[0],
NodeBase.PATH_SEPARATOR_STR + parts[1]);
this.defaultFaultDomain = defaultFaultDomain;
return this;
public String getDefaultFaultDomain() {
return defaultFaultDomain;
public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQuorumSize,
int ackQuorumSize, Set<BookieId> excludeBookies,
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble<BookieNode> parentEnsemble,
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate<BookieNode> parentPredicate)
throws BKNotEnoughBookiesException {
throw new UnsupportedOperationException(
"newEnsemble method with parentEnsemble and parentPredicate is not supported for "
+ "ZoneawareEnsemblePlacementPolicyImpl");
public BookieNode selectFromNetworkLocation(String networkLoc, Set<Node> excludeBookies,
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate<BookieNode> predicate,
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble<BookieNode> ensemble,
boolean fallbackToRandom) throws BKNotEnoughBookiesException {
throw new UnsupportedOperationException(
"selectFromNetworkLocation is not supported for ZoneawareEnsemblePlacementPolicyImpl");
public BookieNode selectFromNetworkLocation(Set<String> excludeRacks, Set<Node> excludeBookies,
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate<BookieNode> predicate,
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble<BookieNode> ensemble,
boolean fallbackToRandom) throws BKNotEnoughBookiesException {
throw new UnsupportedOperationException(
"selectFromNetworkLocation is not supported for ZoneawareEnsemblePlacementPolicyImpl");
public BookieNode selectFromNetworkLocation(String networkLoc, Set<String> excludeRacks, Set<Node> excludeBookies,
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate<BookieNode> predicate,
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble<BookieNode> ensemble,
boolean fallbackToRandom) throws BKNotEnoughBookiesException {
throw new UnsupportedOperationException(
"selectFromNetworkLocation is not supported for ZoneawareEnsemblePlacementPolicyImpl");
public void uninitalize() {
public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
if (enforceStrictZoneawarePlacement) {
if (ensembleSize % writeQuorumSize != 0) {
* if ensembleSize is not multiple of writeQuorumSize, then the
* write quorums which are wrapped will have bookies from just
* minNumberOfZones though bookies are available from
* desiredNumZones.
* lets say for example - desiredZones = 3, minZones = 2,
* ensembleSize = 5, writeQuorumSize = 3, ackQuorumSize = 2
* z1, z2, z3, z1, z2 is a legal ensemble. (lets assume here z1
* represents a node belonging to zone z1)
* the writeQuorum for entry 3 will be z1, z2 and z1, since
* ackQuorumSize is 2, an entry could be written just to two
* bookies that belong to z1. If the zone z1 goes down then the
* entry could potentially be unavailable until the zone z1 has
* come back.
* Also, it is not ideal to allow combination which fallsback to
* minZones, when bookies are available from desiredNumZones.
* So prohibiting this combination of configuration.
LOG.error("It is illegal for ensembleSize to be not multiple of"
+ " writeQuorumSize When StrictZoneawarePlacement is enabled");
throw new IllegalArgumentException("It is illegal for ensembleSize to be not multiple of"
+ " writeQuorumSize When StrictZoneawarePlacement is enabled");
if (writeQuorumSize <= minNumZonesPerWriteQuorum) {
* if we allow writeQuorumSize <= minNumZonesPerWriteQuorum,
* then replaceBookie may fail to find a candidate to replace a
* node when a zone goes down.
* lets say for example - desiredZones = 3, minZones = 2,
* ensembleSize = 6, writeQuorumSize = 2, ackQuorumSize = 2
* z1, z2, z3, z1, z2, z3 is a legal ensemble. (lets assume here
* z1 represents a node belonging to zone z1)
* Now if Zone z2 goes down, you need to replace Index 1 and 4.
* To replace index 1, you need to find a zone that is not z1
* and Z3 which is not possible.
* So prohibiting this combination of configuration.
LOG.error("It is illegal for writeQuorumSize to be lesser than or equal"
+ " to minNumZonesPerWriteQuorum When StrictZoneawarePlacement is enabled");
throw new IllegalArgumentException("It is illegal for writeQuorumSize to be lesser than or equal"
+ " to minNumZonesPerWriteQuorum When StrictZoneawarePlacement is enabled");
int desiredNumZonesPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, desiredNumZonesPerWriteQuorum);
List<BookieId> newEnsemble = new ArrayList<BookieId>(
Collections.nCopies(ensembleSize, null));
try {
if (!enforceStrictZoneawarePlacement) {
return createNewEnsembleRandomly(newEnsemble, writeQuorumSize, ackQuorumSize, customMetadata,
Set<BookieId> comprehensiveExclusionBookiesSet = addDefaultFaultDomainBookies(excludeBookies);
for (int index = 0; index < ensembleSize; index++) {
BookieId selectedBookie = setBookieInTheEnsemble(ensembleSize, writeQuorumSize, newEnsemble,
newEnsemble, index, desiredNumZonesPerWriteQuorumForThisEnsemble,
return PlacementResult.of(newEnsemble,
isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize));
} finally {
public PlacementResult<BookieId> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, List<BookieId> currentEnsemble,
BookieId bookieToReplace, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
int bookieToReplaceIndex = currentEnsemble.indexOf(bookieToReplace);
int desiredNumZonesPerWriteQuorumForThisEnsemble = (writeQuorumSize < desiredNumZonesPerWriteQuorum)
? writeQuorumSize : desiredNumZonesPerWriteQuorum;
List<BookieId> newEnsemble = new ArrayList<BookieId>(currentEnsemble);
try {
if (!enforceStrictZoneawarePlacement) {
return selectBookieRandomly(newEnsemble, bookieToReplace, excludeBookies, writeQuorumSize,
Set<BookieId> comprehensiveExclusionBookiesSet = addDefaultFaultDomainBookies(excludeBookies);
BookieId candidateAddr = setBookieInTheEnsemble(ensembleSize, writeQuorumSize, currentEnsemble,
newEnsemble, bookieToReplaceIndex, desiredNumZonesPerWriteQuorumForThisEnsemble,
return PlacementResult.of(candidateAddr,
isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize));
} finally {
private PlacementResult<List<BookieId>> createNewEnsembleRandomly(List<BookieId> newEnsemble,
int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieId> excludeBookies) throws BKNotEnoughBookiesException {
int ensembleSize = newEnsemble.size();
Set<BookieNode> bookiesToConsider = getBookiesToConsider(excludeBookies);
if (bookiesToConsider.size() < newEnsemble.size()) {
LOG.error("Not enough bookies are available to form ensemble of size: {}", newEnsemble.size());
throw new BKNotEnoughBookiesException();
for (int i = 0; i < ensembleSize; i++) {
BookieNode candidateNode = selectCandidateNode(bookiesToConsider);
newEnsemble.set(i, candidateNode.getAddr());
return PlacementResult.of(newEnsemble,
isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize));
private PlacementResult<BookieId> selectBookieRandomly(List<BookieId> newEnsemble,
BookieId bookieToReplace, Set<BookieId> excludeBookies, int writeQuorumSize,
int ackQuorumSize) throws BKNotEnoughBookiesException {
Set<BookieId> bookiesToExcludeIncludingEnsemble = new HashSet<BookieId>(excludeBookies);
Set<BookieNode> bookiesToConsider = getBookiesToConsider(bookiesToExcludeIncludingEnsemble);
int bookieToReplaceIndex = newEnsemble.indexOf(bookieToReplace);
if (bookiesToConsider.isEmpty()) {
LOG.error("There is no bookie available to replace a bookie");
throw new BKNotEnoughBookiesException();
BookieId candidateAddr = (selectCandidateNode(bookiesToConsider)).getAddr();
newEnsemble.set(bookieToReplaceIndex, candidateAddr);
return PlacementResult.of(candidateAddr,
isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize));
private Set<BookieNode> getBookiesToConsider(Set<BookieId> excludeBookies) {
Set<Node> leaves = topology.getLeaves(NodeBase.ROOT);
Set<BookieNode> bookiesToConsider = new HashSet<BookieNode>();
BookieNode bookieNode;
for (Node leaf : leaves) {
if (leaf instanceof BookieNode) {
bookieNode = ((BookieNode) leaf);
if (excludeBookies.contains(bookieNode.getAddr())) {
return bookiesToConsider;
* This method finds the appropriate bookie for newEnsemble by finding
* bookie to replace at bookieToReplaceIndex in the currentEnsemble.
* It goes through following filtering process 1) Exclude zones of
* desiredNumZonesPerWriteQuorumForThisEnsemble neighboring nodes 2) Find
* bookies to consider by excluding zones (found from previous step) and
* excluding UDs of the zones to consider. 3) If it can't find eligible
* bookie, then keep reducing the number of neighboring nodes to
* minNumZonesPerWriteQuorum and repeat step 2. 4) If it still can't find
* eligible bookies then find the zones to exclude such that in a writeset
* there will be bookies from atleast minNumZonesPerWriteQuorum zones and
* repeat step 2 5) After getting the list of eligible candidates select a
* node randomly. 6) If step-4 couldn't find eligible candidates then throw
* BKNotEnoughBookiesException.
* Example: Ensemble:6 Qw:6 desiredNumZonesPerWriteQuorumForThisEnsemble:3
* minNumZonesPerWriteQuorum:2 The selection process is as follows:
* 1) Find bookies by excluding zones of
* (desiredNumZonesPerWriteQuorumForThisEnsemble -1) neighboring bookies on
* the left and and the right side of the bookieToReplaceIndex. i.e Zones of
* 2 bookies(3-1) on both sides of the index in question will be excluded to
* find bookies. 2) Get the set of zones of the bookies selected above. 3)
* Get the UpgradeDomains to exclude of the each zone selected above to make
* sure bookies of writeSets containing bookieToReplaceIndex are from
* different UD if they belong to same zone. 4) now from the zones selected
* in step 2, apply the filter of UDs to exclude found in previous step and
* get the eligible bookies. 5) If no bookie matches this filter, then
* instead of aiming for unique UDs, fallback to UDs to exclude such that if
* bookies are from same zone in the writeSets containing
* bookieToReplaceIndex then they must be atleast from 2 different UDs. 6)
* now from the zones selected in step 2, apply the filter of UDs to exclude
* found in previous step and get the eligible bookies. 7) If no bookie
* matches this filter, repeat from Step1 to Step6 by decreasing neighboring
* exclude zones from (desiredNumZonesPerWriteQuorumForThisEnsemble - 1),
* which is 2 to (minNumZonesPerWriteQuorum - 1), which is 1 8) If even
* after this, bookies are not found matching the criteria fallback to
* minNumZonesPerWriteQuorum, for this find the zones to exclude such that
* in writesets containing this bookieToReplaceIndex there will be bookies
* from atleast minNumZonesPerWriteQuorum zones, which is 2. 9) Get the set
* of the zones of the bookies by excluding zones selected above. 10) repeat
* Step3 to Step6. 11) After getting the list of eligible candidates select
* a node randomly. 12) If even after Step10 there are no eligible
* candidates then throw BKNotEnoughBookiesException.
private BookieId setBookieInTheEnsemble(int ensembleSize, int writeQuorumSize,
List<BookieId> currentEnsemble, List<BookieId> newEnsemble, int bookieToReplaceIndex,
int desiredNumZonesPerWriteQuorumForThisEnsemble, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
BookieId bookieToReplace = currentEnsemble.get(bookieToReplaceIndex);
Set<String> zonesToExclude = null;
Set<BookieNode> bookiesToConsiderAfterExcludingZonesAndUDs = null;
for (int numberOfNeighborsToConsider = (desiredNumZonesPerWriteQuorumForThisEnsemble
- 1); numberOfNeighborsToConsider >= (minNumZonesPerWriteQuorum - 1); numberOfNeighborsToConsider--) {
zonesToExclude = getZonesOfNeighboringNodesInEnsemble(currentEnsemble, bookieToReplaceIndex,
bookiesToConsiderAfterExcludingZonesAndUDs = getBookiesToConsiderAfterExcludingZonesAndUDs(ensembleSize,
writeQuorumSize, currentEnsemble, bookieToReplaceIndex, excludeBookies, zonesToExclude);
if (!bookiesToConsiderAfterExcludingZonesAndUDs.isEmpty()) {
if (bookiesToConsiderAfterExcludingZonesAndUDs.isEmpty()) {
zonesToExclude = getZonesToExcludeToMaintainMinZones(currentEnsemble, bookieToReplaceIndex,
bookiesToConsiderAfterExcludingZonesAndUDs = getBookiesToConsiderAfterExcludingZonesAndUDs(ensembleSize,
writeQuorumSize, currentEnsemble, bookieToReplaceIndex, excludeBookies, zonesToExclude);
if (bookiesToConsiderAfterExcludingZonesAndUDs.isEmpty()) {
LOG.error("Not enough bookies are available to replaceBookie : {} in ensemble : {} with excludeBookies {}.",
bookieToReplace, currentEnsemble, excludeBookies);
throw new BKNotEnoughBookiesException();
BookieId candidateAddr = selectCandidateNode(bookiesToConsiderAfterExcludingZonesAndUDs).getAddr();
newEnsemble.set(bookieToReplaceIndex, candidateAddr);
return candidateAddr;
* this method should be called in readlock scope of 'rwLock'. This method
* returns a new set, by adding excludedBookies and bookies in
* defaultfaultdomain.
protected Set<BookieId> addDefaultFaultDomainBookies(Set<BookieId> excludeBookies) {
Set<BookieId> comprehensiveExclusionBookiesSet = new HashSet<BookieId>(excludeBookies);
Set<Node> defaultFaultDomainLeaves = topology.getLeaves(getDefaultFaultDomain());
for (Node node : defaultFaultDomainLeaves) {
if (node instanceof BookieNode) {
comprehensiveExclusionBookiesSet.add(((BookieNode) node).getAddr());
} else {
LOG.error("found non-BookieNode: {} as leaf of defaultFaultDomain: {}", node, getDefaultFaultDomain());
return comprehensiveExclusionBookiesSet;
* Select bookie randomly from the bookiesToConsiderAfterExcludingUDs set.
* If diskWeightBasedPlacement is enabled then it will select node randomly
* based on node weight.
private BookieNode selectCandidateNode(Set<BookieNode> bookiesToConsiderAfterExcludingUDs) {
BookieNode candidate = null;
if (!this.isWeighted) {
int randSelIndex = rand.nextInt(bookiesToConsiderAfterExcludingUDs.size());
int ind = 0;
for (BookieNode bookieNode : bookiesToConsiderAfterExcludingUDs) {
if (ind == randSelIndex) {
candidate = bookieNode;
} else {
candidate = weightedSelection.getNextRandom(bookiesToConsiderAfterExcludingUDs);
return candidate;
private String getExcludedZonesString(Set<String> excludeZones) {
if (excludeZones.isEmpty()) {
return "";
StringBuilder excludedZonesString = new StringBuilder("~");
boolean firstZone = true;
for (String excludeZone : excludeZones) {
if (!firstZone) {
firstZone = false;
return excludedZonesString.toString();
private Set<BookieNode> getBookiesToConsider(String excludedZonesString, Set<BookieId> excludeBookies) {
Set<BookieNode> bookiesToConsider = new HashSet<BookieNode>();
Set<Node> leaves = topology.getLeaves(excludedZonesString);
for (Node leaf : leaves) {
BookieNode bookieNode = ((BookieNode) leaf);
if (excludeBookies.contains(bookieNode.getAddr())) {
return bookiesToConsider;
* For the position of 'bookieToReplaceIndex' in currentEnsemble, get the
* set of bookies eligible by excluding the 'excludeZones' and
* 'excludeBookies'. After excluding excludeZones and excludeBookies, it
* would first try to exclude upgrade domains of neighboring nodes
* (writeset) so the bookie would be from completely new upgrade domain
* of a zone, if a writeset contains bookie from the zone. If Bookie is
* not found matching this criteria, then it will fallback to maintain min
* upgrade domains (two) from a zone, such that if multiple bookies in a
* write quorum are from the same zone then they will be spread across two
* upgrade domains.
private Set<BookieNode> getBookiesToConsiderAfterExcludingZonesAndUDs(int ensembleSize, int writeQuorumSize,
List<BookieId> currentEnsemble, int bookieToReplaceIndex,
Set<BookieId> excludeBookies, Set<String> excludeZones) {
Set<BookieNode> bookiesToConsiderAfterExcludingZonesAndUDs = new HashSet<BookieNode>();
HashMap<String, Set<String>> excludingUDsOfZonesToConsider = new HashMap<String, Set<String>>();
Set<BookieNode> bookiesToConsiderAfterExcludingZones = getBookiesToConsider(
getExcludedZonesString(excludeZones), excludeBookies);
if (!bookiesToConsiderAfterExcludingZones.isEmpty()) {
Set<String> zonesToConsider = getZonesOfBookies(bookiesToConsiderAfterExcludingZones);
for (String zoneToConsider : zonesToConsider) {
Set<String> upgradeDomainsOfAZoneInNeighboringNodes = getUpgradeDomainsOfAZoneInNeighboringNodes(
currentEnsemble, bookieToReplaceIndex, writeQuorumSize, zoneToConsider);
excludingUDsOfZonesToConsider.put(zoneToConsider, upgradeDomainsOfAZoneInNeighboringNodes);
bookiesToConsiderAfterExcludingZones, excludingUDsOfZonesToConsider);
* If no eligible bookie is found, then instead of aiming for unique
* UDs, fallback to UDs to exclude such that if bookies are from
* same zone in the writeSets containing bookieToReplaceIndex then
* they must be atleast from 2 different UDs
if (bookiesToConsiderAfterExcludingZonesAndUDs.isEmpty()) {
for (String zoneToConsider : zonesToConsider) {
Set<String> udsToExcludeToMaintainMinUDsInWriteQuorums =
getUDsToExcludeToMaintainMinUDsInWriteQuorums(currentEnsemble, bookieToReplaceIndex,
writeQuorumSize, zoneToConsider);
excludingUDsOfZonesToConsider.put(zoneToConsider, udsToExcludeToMaintainMinUDsInWriteQuorums);
bookiesToConsiderAfterExcludingZones, excludingUDsOfZonesToConsider);
return bookiesToConsiderAfterExcludingZonesAndUDs;
* Filter bookies which belong to excludingUDs of zones to consider from
* 'bookiesToConsider' set and add them to
* 'bookiesToConsiderAfterExcludingUDs' set.
private void updateBookiesToConsiderAfterExcludingZonesAndUDs(Set<BookieNode> bookiesToConsiderAfterExcludingUDs,
Set<BookieNode> bookiesToConsider, HashMap<String, Set<String>> excludingUDsOfZonesToConsider) {
for (BookieNode bookieToConsider : bookiesToConsider) {
ZoneAwareNodeLocation nodeLocation = getZoneAwareNodeLocation(bookieToConsider);
if (excludingUDsOfZonesToConsider.get(nodeLocation.getZone()).contains(nodeLocation.getUpgradeDomain())) {
* Gets the set of zones of neighboring nodes.
private Set<String> getZonesOfNeighboringNodesInEnsemble(List<BookieId> currentEnsemble, int indexOfNode,
int numOfNeighboringNodes) {
Set<String> zonesOfNeighboringNodes = new HashSet<String>();
int ensembleSize = currentEnsemble.size();
for (int i = (-1 * numOfNeighboringNodes); i <= numOfNeighboringNodes; i++) {
if (i == 0) {
int index = (indexOfNode + i + ensembleSize) % ensembleSize;
BookieId addrofNode = currentEnsemble.get(index);
if (addrofNode == null) {
String zoneOfNode = getZoneAwareNodeLocation(addrofNode).getZone();
return zonesOfNeighboringNodes;
* This method returns set of zones to exclude for the position of
* 'indexOfNode', so that writequorums, containing this index, would have
* atleast minNumZonesPerWriteQuorum.
private Set<String> getZonesToExcludeToMaintainMinZones(List<BookieId> currentEnsemble, int indexOfNode,
int writeQuorumSize) {
int ensSize = currentEnsemble.size();
Set<String> zonesToExclude = new HashSet<String>();
Set<String> zonesInWriteQuorum = new HashSet<String>();
for (int i = -(writeQuorumSize - 1); i <= 0; i++) {
for (int j = 0; j < writeQuorumSize; j++) {
int indexInEnsemble = (i + j + indexOfNode + ensSize) % ensSize;
if (indexInEnsemble == indexOfNode) {
BookieId bookieAddr = currentEnsemble.get(indexInEnsemble);
if (bookieAddr == null) {
ZoneAwareNodeLocation nodeLocation = getZoneAwareNodeLocation(bookieAddr);
if (zonesInWriteQuorum.size() <= (minNumZonesPerWriteQuorum - 1)) {
return zonesToExclude;
private Set<String> getZonesOfBookies(Collection<BookieNode> bookieNodes) {
Set<String> zonesOfBookies = new HashSet<String>();
for (BookieNode bookieNode : bookieNodes) {
ZoneAwareNodeLocation nodeLocation = getZoneAwareNodeLocation(bookieNode);
return zonesOfBookies;
* Gets the set of upgradedomains of neighboring nodes (writeQuorumSize)
* which belong to this 'zone'.
private Set<String> getUpgradeDomainsOfAZoneInNeighboringNodes(List<BookieId> currentEnsemble,
int indexOfNode, int writeQuorumSize, String zone) {
int ensSize = currentEnsemble.size();
Set<String> upgradeDomainsOfAZoneInNeighboringNodes = new HashSet<String>();
for (int i = -(writeQuorumSize - 1); i <= (writeQuorumSize - 1); i++) {
if (i == 0) {
int indexInEnsemble = (indexOfNode + i + ensSize) % ensSize;
BookieId bookieAddr = currentEnsemble.get(indexInEnsemble);
if (bookieAddr == null) {
ZoneAwareNodeLocation nodeLocation = getZoneAwareNodeLocation(bookieAddr);
if (nodeLocation.getZone().equals(zone)) {
return upgradeDomainsOfAZoneInNeighboringNodes;
* This method returns set of UpgradeDomains to exclude if a bookie from
* the 'zone' has to be selected for the position of 'indexOfNode', then if
* there are multiple bookies from the 'zone' in a write quorum then they
* will be atleast from minimum of two upgrade domains.
private Set<String> getUDsToExcludeToMaintainMinUDsInWriteQuorums(List<BookieId> currentEnsemble,
int indexOfNode, int writeQuorumSize, String zone) {
int ensSize = currentEnsemble.size();
Set<String> upgradeDomainsToExclude = new HashSet<String>();
Set<String> upgradeDomainsOfThisZoneInWriteQuorum = new HashSet<String>();
for (int i = -(writeQuorumSize - 1); i <= 0; i++) {
for (int j = 0; j < writeQuorumSize; j++) {
int indexInEnsemble = (i + j + indexOfNode + ensSize) % ensSize;
if (indexInEnsemble == indexOfNode) {
BookieId bookieAddr = currentEnsemble.get(indexInEnsemble);
if (bookieAddr == null) {
ZoneAwareNodeLocation nodeLocation = getZoneAwareNodeLocation(bookieAddr);
if (nodeLocation.getZone().equals(zone)) {
if (upgradeDomainsOfThisZoneInWriteQuorum.size() == 1) {
return upgradeDomainsToExclude;
public void registerSlowBookie(BookieId bookieSocketAddress, long entryId) {
// TODO Auto-generated method stub
public DistributionSchedule.WriteSet reorderReadSequence(List<BookieId> ensemble,
BookiesHealthInfo bookiesHealthInfo, DistributionSchedule.WriteSet writeSet) {
return writeSet;
public DistributionSchedule.WriteSet reorderReadLACSequence(List<BookieId> ensemble,
BookiesHealthInfo bookiesHealthInfo, DistributionSchedule.WriteSet writeSet) {
DistributionSchedule.WriteSet retList = reorderReadSequence(ensemble, bookiesHealthInfo, writeSet);
return retList;
* In ZoneAwareEnsemblePlacementPolicy if bookies in the writeset are from
* 'desiredNumOfZones' then it is considered as MEETS_STRICT if they are
* from 'minNumOfZones' then it is considered as MEETS_SOFT otherwise
* considered as FAIL. Also in a writeset if there are multiple bookies from
* the same zone then they are expected to be from different upgrade
* domains.
public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
int writeQuorumSize, int ackQuorumSize) {
PlacementPolicyAdherence placementPolicyAdherence = PlacementPolicyAdherence.MEETS_STRICT;
try {
HashMap<String, Set<String>> bookiesLocationInWriteSet = new HashMap<String, Set<String>>();
HashMap<String, Integer> numOfBookiesInZones = new HashMap<String, Integer>();
BookieId bookieNode;
if (ensembleList.size() % writeQuorumSize != 0) {
placementPolicyAdherence = PlacementPolicyAdherence.FAIL;
if (LOG.isDebugEnabled()) {
"For ensemble: {}, ensembleSize: {} is not a multiple of writeQuorumSize: {}",
ensembleList, ensembleList.size(), writeQuorumSize);
return placementPolicyAdherence;
if (writeQuorumSize <= minNumZonesPerWriteQuorum) {
placementPolicyAdherence = PlacementPolicyAdherence.FAIL;
if (LOG.isDebugEnabled()) {
"For ensemble: {}, writeQuorumSize: {} is less than or equal to"
+ " minNumZonesPerWriteQuorum: {}",
ensembleList, writeQuorumSize, minNumZonesPerWriteQuorum);
return placementPolicyAdherence;
int desiredNumZonesPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, desiredNumZonesPerWriteQuorum);
for (int i = 0; i < ensembleList.size(); i++) {
for (int j = 0; j < writeQuorumSize; j++) {
int indexOfNode = (i + j) % ensembleList.size();
bookieNode = ensembleList.get(indexOfNode);
ZoneAwareNodeLocation nodeLocation = getZoneAwareNodeLocation(bookieNode);
if (nodeLocation.equals(unresolvedNodeLocation)) {
placementPolicyAdherence = PlacementPolicyAdherence.FAIL;
if (LOG.isDebugEnabled()) {
LOG.debug("ensemble: {}, contains bookie: {} for which network location is unresolvable",
ensembleList, bookieNode);
return placementPolicyAdherence;
String zone = nodeLocation.getZone();
String upgradeDomain = nodeLocation.getUpgradeDomain();
Set<String> udsOfThisZoneInThisWriteSet = bookiesLocationInWriteSet.get(zone);
if (udsOfThisZoneInThisWriteSet == null) {
udsOfThisZoneInThisWriteSet = new HashSet<String>();
bookiesLocationInWriteSet.put(zone, udsOfThisZoneInThisWriteSet);
numOfBookiesInZones.put(zone, 1);
} else {
Integer numOfNodesInAZone = numOfBookiesInZones.get(zone);
numOfBookiesInZones.put(zone, (numOfNodesInAZone + 1));
if (numOfBookiesInZones.entrySet().size() < minNumZonesPerWriteQuorum) {
placementPolicyAdherence = PlacementPolicyAdherence.FAIL;
if (LOG.isDebugEnabled()) {
LOG.debug("in ensemble: {}, writeset starting at: {} doesn't contain bookies from"
+ " minNumZonesPerWriteQuorum: {}", ensembleList, i, minNumZonesPerWriteQuorum);
return placementPolicyAdherence;
} else if (numOfBookiesInZones.entrySet().size() >= desiredNumZonesPerWriteQuorumForThisEnsemble) {
if (!validateMinUDsAreMaintained(numOfBookiesInZones, bookiesLocationInWriteSet)) {
placementPolicyAdherence = PlacementPolicyAdherence.FAIL;
if (LOG.isDebugEnabled()) {
LOG.debug("in ensemble: {}, writeset starting at: {} doesn't maintain min of 2 UDs"
+ " when there are multiple bookies from the same zone.", ensembleList, i);
return placementPolicyAdherence;
} else {
if (!validateMinUDsAreMaintained(numOfBookiesInZones, bookiesLocationInWriteSet)) {
placementPolicyAdherence = PlacementPolicyAdherence.FAIL;
if (LOG.isDebugEnabled()) {
LOG.debug("in ensemble: {}, writeset starting at: {} doesn't maintain min of 2 UDs"
+ " when there are multiple bookies from the same zone.", ensembleList, i);
return placementPolicyAdherence;
if (placementPolicyAdherence == PlacementPolicyAdherence.MEETS_STRICT) {
placementPolicyAdherence = PlacementPolicyAdherence.MEETS_SOFT;
} finally {
return placementPolicyAdherence;
private boolean validateMinUDsAreMaintained(HashMap<String, Integer> numOfNodesInZones,
HashMap<String, Set<String>> nodesLocationInWriteSet) {
for (Entry<String, Integer> numOfNodesInZone : numOfNodesInZones.entrySet()) {
String zone = numOfNodesInZone.getKey();
Integer numOfNodesInThisZone = numOfNodesInZone.getValue();
if (numOfNodesInThisZone > 1) {
Set<String> udsOfThisZone = nodesLocationInWriteSet.get(zone);
if (udsOfThisZone.size() < 2) {
return false;
return true;
public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBookies, int writeQuorumSize,
int ackQuorumSize) {
HashSet<String> zonesOfAckedBookies = new HashSet<>();
int minNumZonesPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumZonesPerWriteQuorum);
boolean areAckedBookiesAdheringToPlacementPolicy = false;
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
try {
for (BookieId ackedBookie : ackedBookies) {
areAckedBookiesAdheringToPlacementPolicy = ((zonesOfAckedBookies
.size() >= minNumZonesPerWriteQuorumForThisEnsemble) && (ackedBookies.size() >= ackQuorumSize));
if (LOG.isDebugEnabled()) {
"areAckedBookiesAdheringToPlacementPolicy returning {}, because number of ackedBookies = {},"
+ " number of Zones of ackedbookies = {},"
+ " number of minNumZonesPerWriteQuorumForThisEnsemble = {}",
areAckedBookiesAdheringToPlacementPolicy, ackedBookies.size(), zonesOfAckedBookies.size(),
} finally {
return areAckedBookiesAdheringToPlacementPolicy;