blob: 438053f5449776077fb30df21d4b0d7e6cfe7d19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetUtils;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class TopologyAwareEnsemblePlacementPolicy implements
ITopologyAwareEnsemblePlacementPolicy<BookieNode> {
static final Logger LOG = LoggerFactory.getLogger(TopologyAwareEnsemblePlacementPolicy.class);
public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
protected final Map<BookieId, BookieNode> knownBookies = new HashMap<BookieId, BookieNode>();
protected final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
protected Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>();
// Initialize to empty set
protected ImmutableSet<BookieId> readOnlyBookies = ImmutableSet.of();
boolean isWeighted;
protected WeightedRandomSelection<BookieNode> weightedSelection;
// for now, we just maintain the writable bookies' topology
protected NetworkTopology topology;
protected DNSToSwitchMapping dnsResolver;
protected BookieAddressResolver bookieAddressResolver;
@StatsDoc(
name = BOOKIES_JOINED,
help = "The distribution of number of bookies joined the cluster on each network topology change"
)
protected OpStatsLogger bookiesJoinedCounter = null;
@StatsDoc(
name = BOOKIES_LEFT,
help = "The distribution of number of bookies left the cluster on each network topology change"
)
protected OpStatsLogger bookiesLeftCounter = null;
protected static class TruePredicate implements Predicate<BookieNode> {
public static final TruePredicate INSTANCE = new TruePredicate();
@Override
public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
return true;
}
}
protected static class EnsembleForReplacementWithNoConstraints implements Ensemble<BookieNode> {
public static final EnsembleForReplacementWithNoConstraints INSTANCE =
new EnsembleForReplacementWithNoConstraints();
static final List<BookieId> EMPTY_LIST = new ArrayList<BookieId>(0);
@Override
public boolean addNode(BookieNode node) {
// do nothing
return true;
}
@Override
public List<BookieId> toList() {
return EMPTY_LIST;
}
/**
* Validates if an ensemble is valid.
*
* @return true if the ensemble is valid; false otherwise
*/
@Override
public boolean validate() {
return true;
}
}
/**
* A predicate checking the rack coverage for write quorum in {@link RoundRobinDistributionSchedule},
* which ensures that a write quorum should be covered by at least two racks.
*/
protected static class RRTopologyAwareCoverageEnsemble implements Predicate<BookieNode>, Ensemble<BookieNode> {
protected interface CoverageSet {
boolean apply(BookieNode candidate);
void addBookie(BookieNode candidate);
CoverageSet duplicate();
}
protected class RackQuorumCoverageSet implements CoverageSet {
HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
int seenBookies = 0;
private final int minNumRacksPerWriteQuorum;
protected RackQuorumCoverageSet(int minNumRacksPerWriteQuorum) {
this.minNumRacksPerWriteQuorum = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
}
@Override
public boolean apply(BookieNode candidate) {
// If we don't have sufficient members in the write quorum; then we cant enforce
// rack/region diversity
if (writeQuorumSize < 2) {
return true;
}
/*
* allow the initial writeQuorumSize-minRacksToWriteTo+1 bookies
* to be placed on any rack(including on a single rack). But
* after that make sure that with each new bookie chosen, we
* will be able to satisfy the minRackToWriteTo condition
* eventually
*/
if (seenBookies + minNumRacksPerWriteQuorum - 1 >= writeQuorumSize) {
int numRacks = racksOrRegionsInQuorum.size();
if (!racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves))) {
numRacks++;
}
if (numRacks >= minNumRacksPerWriteQuorum
|| ((writeQuorumSize - seenBookies - 1) >= (minNumRacksPerWriteQuorum - numRacks))) {
/*
* either we have reached our goal or we still have a
* few bookies to be selected with which to catch up to
* the goal
*/
return true;
} else {
return false;
}
}
return true;
}
@Override
public void addBookie(BookieNode candidate) {
++seenBookies;
racksOrRegionsInQuorum.add(candidate.getNetworkLocation(distanceFromLeaves));
}
@Override
public RackQuorumCoverageSet duplicate() {
RackQuorumCoverageSet ret = new RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
ret.racksOrRegionsInQuorum = Sets.newHashSet(this.racksOrRegionsInQuorum);
ret.seenBookies = this.seenBookies;
return ret;
}
}
protected class RackOrRegionDurabilityCoverageSet implements CoverageSet {
HashMap<String, Integer> allocationToRacksOrRegions = new HashMap<String, Integer>();
RackOrRegionDurabilityCoverageSet() {
for (String rackOrRegion: racksOrRegions) {
allocationToRacksOrRegions.put(rackOrRegion, 0);
}
}
@Override
public RackOrRegionDurabilityCoverageSet duplicate() {
RackOrRegionDurabilityCoverageSet ret = new RackOrRegionDurabilityCoverageSet();
ret.allocationToRacksOrRegions = Maps.newHashMap(this.allocationToRacksOrRegions);
return ret;
}
private boolean checkSumOfSubsetWithinLimit(final Set<String> includedRacksOrRegions,
final Set<String> remainingRacksOrRegions,
int subsetSize,
int maxAllowedSum) {
if (remainingRacksOrRegions.isEmpty() || (subsetSize <= 0)) {
if (maxAllowedSum < 0) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"CHECK FAILED: RacksOrRegions Included {} Remaining {}, subsetSize {}, "
+ "maxAllowedSum {}",
includedRacksOrRegions, remainingRacksOrRegions, subsetSize, maxAllowedSum);
}
}
return (maxAllowedSum >= 0);
}
for (String rackOrRegion: remainingRacksOrRegions) {
Integer currentAllocation = allocationToRacksOrRegions.get(rackOrRegion);
if (currentAllocation == null) {
allocationToRacksOrRegions.put(rackOrRegion, 0);
currentAllocation = 0;
}
if (currentAllocation > maxAllowedSum) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"CHECK FAILED: RacksOrRegions Included {} Candidate {}, subsetSize {}, "
+ "maxAllowedSum {}",
includedRacksOrRegions, rackOrRegion, subsetSize, maxAllowedSum);
}
return false;
} else {
Set<String> remainingElements = new HashSet<String>(remainingRacksOrRegions);
Set<String> includedElements = new HashSet<String>(includedRacksOrRegions);
includedElements.add(rackOrRegion);
remainingElements.remove(rackOrRegion);
if (!checkSumOfSubsetWithinLimit(includedElements,
remainingElements,
subsetSize - 1,
maxAllowedSum - currentAllocation)) {
return false;
}
}
}
return true;
}
@Override
public boolean apply(BookieNode candidate) {
if (minRacksOrRegionsForDurability <= 1) {
return true;
}
String candidateRackOrRegion = candidate.getNetworkLocation(distanceFromLeaves);
candidateRackOrRegion = candidateRackOrRegion.startsWith(NodeBase.PATH_SEPARATOR_STR)
? candidateRackOrRegion.substring(1) : candidateRackOrRegion;
final Set<String> remainingRacksOrRegions = new HashSet<String>(racksOrRegions);
remainingRacksOrRegions.remove(candidateRackOrRegion);
final Set<String> includedRacksOrRegions = new HashSet<String>();
includedRacksOrRegions.add(candidateRackOrRegion);
// If minRacksOrRegionsForDurability are required for durability; we must ensure that
// no subset of (minRacksOrRegionsForDurability - 1) regions have ackQuorumSize
// We are only modifying candidateRackOrRegion if we accept this bookie, so lets only
// find sets that contain this candidateRackOrRegion
Integer currentAllocation = allocationToRacksOrRegions.get(candidateRackOrRegion);
if (currentAllocation == null) {
LOG.info("Detected a region that was not initialized {}", candidateRackOrRegion);
if (candidateRackOrRegion.equals(NetworkTopology.DEFAULT_REGION)) {
LOG.error("Failed to resolve network location {}", candidate);
} else if (!racksOrRegions.contains(candidateRackOrRegion)) {
LOG.error("Unknown region detected {}", candidateRackOrRegion);
}
allocationToRacksOrRegions.put(candidateRackOrRegion, 0);
currentAllocation = 0;
}
int inclusiveLimit = (ackQuorumSize - 1) - (currentAllocation + 1);
return checkSumOfSubsetWithinLimit(includedRacksOrRegions,
remainingRacksOrRegions, minRacksOrRegionsForDurability - 2, inclusiveLimit);
}
@Override
public void addBookie(BookieNode candidate) {
String candidateRackOrRegion = candidate.getNetworkLocation(distanceFromLeaves);
candidateRackOrRegion = candidateRackOrRegion.startsWith(NodeBase.PATH_SEPARATOR_STR)
? candidateRackOrRegion.substring(1) : candidateRackOrRegion;
int oldCount = 0;
if (null != allocationToRacksOrRegions.get(candidateRackOrRegion)) {
oldCount = allocationToRacksOrRegions.get(candidateRackOrRegion);
}
allocationToRacksOrRegions.put(candidateRackOrRegion, oldCount + 1);
}
}
final int distanceFromLeaves;
final int ensembleSize;
final int writeQuorumSize;
final int ackQuorumSize;
final int minRacksOrRegionsForDurability;
final int minNumRacksPerWriteQuorum;
final List<BookieNode> chosenNodes;
final Set<String> racksOrRegions;
private final CoverageSet[] quorums;
final Predicate<BookieNode> parentPredicate;
final Ensemble<BookieNode> parentEnsemble;
protected RRTopologyAwareCoverageEnsemble(RRTopologyAwareCoverageEnsemble that) {
this.distanceFromLeaves = that.distanceFromLeaves;
this.ensembleSize = that.ensembleSize;
this.writeQuorumSize = that.writeQuorumSize;
this.ackQuorumSize = that.ackQuorumSize;
this.chosenNodes = Lists.newArrayList(that.chosenNodes);
this.quorums = new CoverageSet[that.quorums.length];
for (int i = 0; i < that.quorums.length; i++) {
if (null != that.quorums[i]) {
this.quorums[i] = that.quorums[i].duplicate();
} else {
this.quorums[i] = null;
}
}
this.parentPredicate = that.parentPredicate;
this.parentEnsemble = that.parentEnsemble;
if (null != that.racksOrRegions) {
this.racksOrRegions = new HashSet<String>(that.racksOrRegions);
} else {
this.racksOrRegions = null;
}
this.minRacksOrRegionsForDurability = that.minRacksOrRegionsForDurability;
this.minNumRacksPerWriteQuorum = that.minNumRacksPerWriteQuorum;
}
protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
int distanceFromLeaves,
Set<String> racksOrRegions,
int minRacksOrRegionsForDurability,
int minNumRacksPerWriteQuorum) {
this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, null, null, racksOrRegions,
minRacksOrRegionsForDurability, minNumRacksPerWriteQuorum);
}
protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
int distanceFromLeaves,
Ensemble<BookieNode> parentEnsemble,
Predicate<BookieNode> parentPredicate,
int minNumRacksPerWriteQuorum) {
this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, parentEnsemble, parentPredicate,
null, 0, minNumRacksPerWriteQuorum);
}
protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
int distanceFromLeaves,
Ensemble<BookieNode> parentEnsemble,
Predicate<BookieNode> parentPredicate,
Set<String> racksOrRegions,
int minRacksOrRegionsForDurability,
int minNumRacksPerWriteQuorum) {
this.ensembleSize = ensembleSize;
this.writeQuorumSize = writeQuorumSize;
this.ackQuorumSize = ackQuorumSize;
this.distanceFromLeaves = distanceFromLeaves;
this.chosenNodes = new ArrayList<BookieNode>(ensembleSize);
if (minRacksOrRegionsForDurability > 0) {
this.quorums = new RackOrRegionDurabilityCoverageSet[ensembleSize];
} else {
this.quorums = new RackQuorumCoverageSet[ensembleSize];
}
this.parentEnsemble = parentEnsemble;
this.parentPredicate = parentPredicate;
this.racksOrRegions = racksOrRegions;
this.minRacksOrRegionsForDurability = minRacksOrRegionsForDurability;
this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
}
@Override
public boolean apply(BookieNode candidate, Ensemble<BookieNode> ensemble) {
if (ensemble != this) {
return false;
}
// An ensemble cannot contain the same node twice
if (chosenNodes.contains(candidate)) {
return false;
}
// candidate position
if ((ensembleSize == writeQuorumSize) && (minRacksOrRegionsForDurability > 0)) {
if (null == quorums[0]) {
quorums[0] = new RackOrRegionDurabilityCoverageSet();
}
if (!quorums[0].apply(candidate)) {
return false;
}
} else {
int candidatePos = chosenNodes.size();
int startPos = candidatePos - writeQuorumSize + 1;
for (int i = startPos; i <= candidatePos; i++) {
int idx = (i + ensembleSize) % ensembleSize;
if (null == quorums[idx]) {
if (minRacksOrRegionsForDurability > 0) {
quorums[idx] = new RackOrRegionDurabilityCoverageSet();
} else {
quorums[idx] = new RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
}
}
if (!quorums[idx].apply(candidate)) {
return false;
}
}
}
return ((null == parentPredicate) || parentPredicate.apply(candidate, parentEnsemble));
}
@Override
public boolean addNode(BookieNode node) {
// An ensemble cannot contain the same node twice
if (chosenNodes.contains(node)) {
return false;
}
if ((ensembleSize == writeQuorumSize) && (minRacksOrRegionsForDurability > 0)) {
if (null == quorums[0]) {
quorums[0] = new RackOrRegionDurabilityCoverageSet();
}
quorums[0].addBookie(node);
} else {
int candidatePos = chosenNodes.size();
int startPos = candidatePos - writeQuorumSize + 1;
for (int i = startPos; i <= candidatePos; i++) {
int idx = (i + ensembleSize) % ensembleSize;
if (null == quorums[idx]) {
if (minRacksOrRegionsForDurability > 0) {
quorums[idx] = new RackOrRegionDurabilityCoverageSet();
} else {
quorums[idx] = new RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
}
}
quorums[idx].addBookie(node);
}
}
chosenNodes.add(node);
return ((null == parentEnsemble) || parentEnsemble.addNode(node));
}
@Override
public List<BookieId> toList() {
ArrayList<BookieId> addresses = new ArrayList<BookieId>(ensembleSize);
for (BookieNode bn : chosenNodes) {
addresses.add(bn.getAddr());
}
return addresses;
}
/**
* Validates if an ensemble is valid.
*
* @return true if the ensemble is valid; false otherwise
*/
@Override
public boolean validate() {
HashSet<BookieId> addresses = new HashSet<BookieId>(ensembleSize);
HashSet<String> racksOrRegions = new HashSet<String>();
for (BookieNode bn : chosenNodes) {
if (addresses.contains(bn.getAddr())) {
return false;
}
addresses.add(bn.getAddr());
racksOrRegions.add(bn.getNetworkLocation(distanceFromLeaves));
}
return ((minRacksOrRegionsForDurability == 0)
|| (racksOrRegions.size() >= minRacksOrRegionsForDurability));
}
@Override
public String toString() {
return chosenNodes.toString();
}
}
static class DefaultResolver implements DNSToSwitchMapping {
final Supplier<String> defaultRackSupplier;
public DefaultResolver(Supplier<String> defaultRackSupplier) {
checkNotNull(defaultRackSupplier, "defaultRackSupplier should not be null");
this.defaultRackSupplier = defaultRackSupplier;
}
@Override
public List<String> resolve(List<String> names) {
List<String> rNames = new ArrayList<String>(names.size());
for (@SuppressWarnings("unused") String name : names) {
final String defaultRack = defaultRackSupplier.get();
checkNotNull(defaultRack, "defaultRack cannot be null");
rNames.add(defaultRack);
}
return rNames;
}
@Override
public void reloadCachedMappings() {
// nop
}
}
/**
* Decorator for any existing dsn resolver.
* Backfills returned data with appropriate default rack info.
*/
static class DNSResolverDecorator implements DNSToSwitchMapping {
final Supplier<String> defaultRackSupplier;
final DNSToSwitchMapping resolver;
@StatsDoc(
name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
help = "total number of times Resolver failed to resolve rack information of a node"
)
final Counter failedToResolveNetworkLocationCounter;
DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String> defaultRackSupplier,
Counter failedToResolveNetworkLocationCounter) {
checkNotNull(resolver, "Resolver cannot be null");
checkNotNull(defaultRackSupplier, "defaultRackSupplier should not be null");
this.defaultRackSupplier = defaultRackSupplier;
this.resolver = resolver;
this.failedToResolveNetworkLocationCounter = failedToResolveNetworkLocationCounter;
}
@Override
public void setBookieAddressResolver(BookieAddressResolver bookieAddressResolver) {
this.resolver.setBookieAddressResolver(bookieAddressResolver);
}
@Override
public List<String> resolve(List<String> names) {
if (names == null) {
return Collections.emptyList();
}
final String defaultRack = defaultRackSupplier.get();
checkNotNull(defaultRack, "Default rack cannot be null");
List<String> rNames = resolver.resolve(names);
if (rNames != null && rNames.size() == names.size()) {
for (int i = 0; i < rNames.size(); ++i) {
if (rNames.get(i) == null) {
LOG.warn("Failed to resolve network location for {}, using default rack for it : {}.",
names.get(i), defaultRack);
failedToResolveNetworkLocationCounter.inc();
rNames.set(i, defaultRack);
}
}
return rNames;
}
LOG.warn("Failed to resolve network location for {}, using default rack for them : {}.", names,
defaultRack);
rNames = new ArrayList<>(names.size());
for (int i = 0; i < names.size(); ++i) {
failedToResolveNetworkLocationCounter.inc();
rNames.add(defaultRack);
}
return rNames;
}
@Override
public boolean useHostName() {
return resolver.useHostName();
}
@Override
public void reloadCachedMappings() {
resolver.reloadCachedMappings();
}
}
static Set<String> getNetworkLocations(Set<Node> bookieNodes) {
Set<String> networkLocs = new HashSet<>();
for (Node bookieNode : bookieNodes) {
networkLocs.add(bookieNode.getNetworkLocation());
}
return networkLocs;
}
/**
* Shuffle all the entries of an array that matches a mask.
* It assumes all entries with the same mask are contiguous in the array.
*/
static void shuffleWithMask(DistributionSchedule.WriteSet writeSet,
int mask, int bits) {
int first = -1;
int last = -1;
for (int i = 0; i < writeSet.size(); i++) {
if ((writeSet.get(i) & bits) == mask) {
if (first == -1) {
first = i;
}
last = i;
}
}
if (first != -1) {
for (int i = last + 1; i > first; i--) {
int swapWith = ThreadLocalRandom.current().nextInt(i);
writeSet.set(swapWith, writeSet.set(i, writeSet.get(swapWith)));
}
}
}
@Override
public DistributionSchedule.WriteSet reorderReadSequence(
List<BookieId> ensemble,
BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
return writeSet;
}
@Override
public DistributionSchedule.WriteSet reorderReadLACSequence(
List<BookieId> ensemble,
BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
DistributionSchedule.WriteSet retList = reorderReadSequence(
ensemble, bookiesHealthInfo, writeSet);
retList.addMissingIndices(ensemble.size());
return retList;
}
@Override
public Set<BookieId> onClusterChanged(Set<BookieId> writableBookies,
Set<BookieId> readOnlyBookies) {
rwLock.writeLock().lock();
try {
ImmutableSet<BookieId> joinedBookies, leftBookies, deadBookies;
Set<BookieId> oldBookieSet = knownBookies.keySet();
// left bookies : bookies in known bookies, but not in new writable bookie cluster.
leftBookies = Sets.difference(oldBookieSet, writableBookies).immutableCopy();
// joined bookies : bookies in new writable bookie cluster, but not in known bookies
joinedBookies = Sets.difference(writableBookies, oldBookieSet).immutableCopy();
// dead bookies.
deadBookies = Sets.difference(leftBookies, readOnlyBookies).immutableCopy();
LOG.debug("Cluster changed : left bookies are {}, joined bookies are {}, while dead bookies are {}.",
leftBookies, joinedBookies, deadBookies);
handleBookiesThatLeft(leftBookies);
handleBookiesThatJoined(joinedBookies);
if (this.isWeighted && (leftBookies.size() > 0 || joinedBookies.size() > 0)) {
this.weightedSelection.updateMap(this.bookieInfoMap);
}
if (!readOnlyBookies.isEmpty()) {
this.readOnlyBookies = ImmutableSet.copyOf(readOnlyBookies);
}
return deadBookies;
} finally {
rwLock.writeLock().unlock();
}
}
/*
* this method should be called in writelock scope of 'rwLock'
*/
@Override
public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
for (BookieId addr : leftBookies) {
try {
BookieNode node = knownBookies.remove(addr);
if (null != node) {
topology.remove(node);
if (this.isWeighted) {
this.bookieInfoMap.remove(node);
}
bookiesLeftCounter.registerSuccessfulValue(1L);
if (LOG.isDebugEnabled()) {
LOG.debug("Cluster changed : bookie {} left from cluster.", addr);
}
}
} catch (Throwable t) {
LOG.error("Unexpected exception while handling leaving bookie {}", addr, t);
if (bookiesLeftCounter != null) {
bookiesLeftCounter.registerFailedValue(1L);
}
// no need to re-throw; we want to process the rest of the bookies
// exception anyways will be caught/logged/suppressed in the ZK's event handler
}
}
}
/*
* this method should be called in writelock scope of 'rwLock'
*/
@Override
public void handleBookiesThatJoined(Set<BookieId> joinedBookies) {
// node joined
for (BookieId addr : joinedBookies) {
try {
BookieNode node = createBookieNode(addr);
topology.add(node);
knownBookies.put(addr, node);
if (this.isWeighted) {
this.bookieInfoMap.putIfAbsent(node, new BookieInfo());
}
bookiesJoinedCounter.registerSuccessfulValue(1L);
if (LOG.isDebugEnabled()) {
LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
}
} catch (Throwable t) {
// topology.add() throws unchecked exception
LOG.error("Unexpected exception while handling joining bookie {}", addr, t);
bookiesJoinedCounter.registerFailedValue(1L);
// no need to re-throw; we want to process the rest of the bookies
// exception anyways will be caught/logged/suppressed in the ZK's event handler
}
}
}
@Override
public void onBookieRackChange(List<BookieId> bookieAddressList) {
rwLock.writeLock().lock();
try {
for (BookieId bookieAddress : bookieAddressList) {
BookieNode node = knownBookies.get(bookieAddress);
if (node != null) {
// refresh the rack info if its a known bookie
BookieNode newNode = createBookieNode(bookieAddress);
topology.remove(node);
topology.add(newNode);
knownBookies.put(bookieAddress, newNode);
}
}
} finally {
rwLock.writeLock().unlock();
}
}
@Override
public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
if (!isWeighted) {
LOG.info("bookieFreeDiskInfo callback called even without weighted placement policy being used.");
return;
}
rwLock.writeLock().lock();
try {
List<BookieNode> allBookies = new ArrayList<BookieNode>(knownBookies.values());
// create a new map to reflect the new mapping
Map<BookieNode, WeightedObject> map = new HashMap<BookieNode, WeightedObject>();
for (BookieNode bookie : allBookies) {
if (bookieInfoMap.containsKey(bookie.getAddr())) {
map.put(bookie, bookieInfoMap.get(bookie.getAddr()));
} else {
map.put(bookie, new BookieInfo());
}
}
this.bookieInfoMap = map;
this.weightedSelection.updateMap(this.bookieInfoMap);
} finally {
rwLock.writeLock().unlock();
}
}
protected BookieNode createBookieNode(BookieId addr) {
return new BookieNode(addr, resolveNetworkLocation(addr));
}
protected BookieNode createDummyLocalBookieNode(String hostname) {
return new BookieNode(BookieSocketAddress.createDummyBookieIdForHostname(hostname),
NetUtils.resolveNetworkLocation(dnsResolver, new BookieSocketAddress(hostname, 0)));
}
protected String resolveNetworkLocation(BookieId addr) {
try {
return NetUtils.resolveNetworkLocation(dnsResolver, bookieAddressResolver.resolve(addr));
} catch (BookieAddressResolver.BookieIdNotResolvedException err) {
LOG.error("Cannot resolve bookieId {} to a network address, resolving as {}", addr,
NetworkTopology.DEFAULT_REGION_AND_RACK, err);
return NetworkTopology.DEFAULT_REGION_AND_RACK;
}
}
protected Set<Node> convertBookiesToNodes(Collection<BookieId> excludeBookies) {
Set<Node> nodes = new HashSet<Node>();
for (BookieId addr : excludeBookies) {
BookieNode bn = knownBookies.get(addr);
if (null == bn) {
bn = createBookieNode(addr);
}
nodes.add(bn);
}
return nodes;
}
}