| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.bookkeeper.client; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER; |
| import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE; |
| import static org.apache.bookkeeper.client.BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK; |
| import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED; |
| import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION; |
| |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| |
| import io.netty.util.HashedWheelTimer; |
| |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; |
| import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; |
| import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject; |
| import org.apache.bookkeeper.common.util.ReflectionUtils; |
| import org.apache.bookkeeper.conf.ClientConfiguration; |
| import org.apache.bookkeeper.conf.Configurable; |
| import org.apache.bookkeeper.feature.FeatureProvider; |
| import org.apache.bookkeeper.net.BookieNode; |
| import org.apache.bookkeeper.net.BookieSocketAddress; |
| import org.apache.bookkeeper.net.DNSToSwitchMapping; |
| import org.apache.bookkeeper.net.NetworkTopology; |
| import org.apache.bookkeeper.net.NetworkTopologyImpl; |
| import org.apache.bookkeeper.net.Node; |
| import org.apache.bookkeeper.net.NodeBase; |
| import org.apache.bookkeeper.net.ScriptBasedMapping; |
| import org.apache.bookkeeper.net.StabilizeNetworkTopology; |
| import org.apache.bookkeeper.stats.Counter; |
| import org.apache.bookkeeper.stats.Gauge; |
| import org.apache.bookkeeper.stats.OpStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.stats.annotations.StatsDoc; |
| import org.apache.commons.collections4.CollectionUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Simple rackware ensemble placement policy. |
| * |
| * <p>Make most of the class and methods as protected, so it could be extended to implement other algorithms. |
| */ |
| @StatsDoc( |
| name = CLIENT_SCOPE, |
| help = "BookKeeper client stats" |
| ) |
| public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy { |
| |
| static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class); |
| int maxWeightMultiple; |
| |
| protected int minNumRacksPerWriteQuorum; |
| protected boolean enforceMinNumRacksPerWriteQuorum; |
| protected boolean ignoreLocalNodeInPlacementPolicy; |
| |
| public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass"; |
| public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering"; |
| |
| static final int RACKNAME_DISTANCE_FROM_LEAVES = 1; |
| |
| // masks for reordering |
| static final int LOCAL_MASK = 0x01 << 24; |
| static final int LOCAL_FAIL_MASK = 0x02 << 24; |
| static final int REMOTE_MASK = 0x04 << 24; |
| static final int REMOTE_FAIL_MASK = 0x08 << 24; |
| static final int READ_ONLY_MASK = 0x10 << 24; |
| static final int SLOW_MASK = 0x20 << 24; |
| static final int UNAVAIL_MASK = 0x40 << 24; |
| static final int MASK_BITS = 0xFFF << 20; |
| |
| protected HashedWheelTimer timer; |
| // Use a loading cache so slow bookies are expired. Use entryId as values. |
| protected Cache<BookieSocketAddress, Long> slowBookies; |
| protected BookieNode localNode; |
| protected boolean reorderReadsRandom = false; |
| protected boolean enforceDurability = false; |
| protected int stabilizePeriodSeconds = 0; |
| protected int reorderThresholdPendingRequests = 0; |
| // looks like these only assigned in the same thread as constructor, immediately after constructor; |
| // no need to make volatile |
| protected StatsLogger statsLogger = null; |
| |
| @StatsDoc( |
| name = READ_REQUESTS_REORDERED, |
| help = "The distribution of number of bookies reordered on each read request" |
| ) |
| protected OpStatsLogger readReorderedCounter = null; |
| @StatsDoc( |
| name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER, |
| help = "Counter for number of times DNSResolverDecorator failed to resolve Network Location" |
| ) |
| protected Counter failedToResolveNetworkLocationCounter = null; |
| @StatsDoc( |
| name = NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK, |
| help = "Gauge for the number of writable Bookies in default rack" |
| ) |
| protected Gauge<Integer> numWritableBookiesInDefaultRack; |
| |
| private String defaultRack = NetworkTopology.DEFAULT_RACK; |
| |
| RackawareEnsemblePlacementPolicyImpl() { |
| this(false); |
| } |
| |
| RackawareEnsemblePlacementPolicyImpl(boolean enforceDurability) { |
| this.enforceDurability = enforceDurability; |
| topology = new NetworkTopologyImpl(); |
| } |
| |
| /** |
| * Initialize the policy. |
| * |
| * @param dnsResolver the object used to resolve addresses to their network address |
| * @return initialized ensemble placement policy |
| */ |
| protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dnsResolver, |
| HashedWheelTimer timer, |
| boolean reorderReadsRandom, |
| int stabilizePeriodSeconds, |
| int reorderThresholdPendingRequests, |
| boolean isWeighted, |
| int maxWeightMultiple, |
| int minNumRacksPerWriteQuorum, |
| boolean enforceMinNumRacksPerWriteQuorum, |
| boolean ignoreLocalNodeInPlacementPolicy, |
| StatsLogger statsLogger) { |
| checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead."); |
| this.statsLogger = statsLogger; |
| this.bookiesJoinedCounter = statsLogger.getOpStatsLogger(BOOKIES_JOINED); |
| this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT); |
| this.readReorderedCounter = statsLogger.getOpStatsLogger(READ_REQUESTS_REORDERED); |
| this.failedToResolveNetworkLocationCounter = statsLogger.getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER); |
| this.numWritableBookiesInDefaultRack = new Gauge<Integer>() { |
| @Override |
| public Integer getDefaultValue() { |
| return 0; |
| } |
| |
| @Override |
| public Integer getSample() { |
| rwLock.readLock().lock(); |
| try { |
| return topology.countNumOfAvailableNodes(getDefaultRack(), Collections.emptySet()); |
| } finally { |
| rwLock.readLock().unlock(); |
| } |
| } |
| }; |
| this.statsLogger.registerGauge(NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK, numWritableBookiesInDefaultRack); |
| this.reorderReadsRandom = reorderReadsRandom; |
| this.stabilizePeriodSeconds = stabilizePeriodSeconds; |
| this.reorderThresholdPendingRequests = reorderThresholdPendingRequests; |
| this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack(), |
| failedToResolveNetworkLocationCounter); |
| this.timer = timer; |
| this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum; |
| this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum; |
| this.ignoreLocalNodeInPlacementPolicy = ignoreLocalNodeInPlacementPolicy; |
| |
| // create the network topology |
| if (stabilizePeriodSeconds > 0) { |
| this.topology = new StabilizeNetworkTopology(timer, stabilizePeriodSeconds); |
| } else { |
| this.topology = new NetworkTopologyImpl(); |
| } |
| |
| BookieNode bn = null; |
| if (!ignoreLocalNodeInPlacementPolicy) { |
| try { |
| bn = createBookieNode(new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0)); |
| } catch (UnknownHostException e) { |
| LOG.error("Failed to get local host address : ", e); |
| } |
| } else { |
| LOG.info("Ignoring LocalNode in Placementpolicy"); |
| } |
| localNode = bn; |
| LOG.info("Initialize rackaware ensemble placement policy @ {} @ {} : {}.", |
| localNode, null == localNode ? "Unknown" : localNode.getNetworkLocation(), |
| dnsResolver.getClass().getName()); |
| |
| this.isWeighted = isWeighted; |
| if (this.isWeighted) { |
| this.maxWeightMultiple = maxWeightMultiple; |
| this.weightedSelection = new WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple); |
| LOG.info("Weight based placement with max multiple of " + this.maxWeightMultiple); |
| } else { |
| LOG.info("Not weighted"); |
| } |
| return this; |
| } |
| |
| /* |
| * sets default rack for the policy. |
| * i.e. region-aware policy may want to have /region/rack while regular |
| * rack-aware policy needs /rack only since we cannot mix both styles |
| */ |
| public RackawareEnsemblePlacementPolicyImpl withDefaultRack(String rack) { |
| checkNotNull(rack, "Default rack cannot be null"); |
| |
| this.defaultRack = rack; |
| return this; |
| } |
| |
| public String getDefaultRack() { |
| return defaultRack; |
| } |
| |
| @Override |
| public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, |
| Optional<DNSToSwitchMapping> optionalDnsResolver, |
| HashedWheelTimer timer, |
| FeatureProvider featureProvider, |
| StatsLogger statsLogger) { |
| DNSToSwitchMapping dnsResolver; |
| if (optionalDnsResolver.isPresent()) { |
| dnsResolver = optionalDnsResolver.get(); |
| } else { |
| String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName()); |
| try { |
| dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class); |
| if (dnsResolver instanceof Configurable) { |
| ((Configurable) dnsResolver).setConf(conf); |
| } |
| |
| if (dnsResolver instanceof RackChangeNotifier) { |
| ((RackChangeNotifier) dnsResolver).registerRackChangeListener(this); |
| } |
| } catch (RuntimeException re) { |
| if (!conf.getEnforceMinNumRacksPerWriteQuorum()) { |
| LOG.error("Failed to initialize DNS Resolver {}, used default subnet resolver : {}", |
| dnsResolverName, re, re.getMessage()); |
| dnsResolver = new DefaultResolver(() -> this.getDefaultRack()); |
| } else { |
| /* |
| * if minNumRacksPerWriteQuorum is enforced, then it |
| * shouldn't continue in the case of failure to create |
| * dnsResolver. |
| */ |
| throw re; |
| } |
| } |
| } |
| slowBookies = CacheBuilder.newBuilder() |
| .expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS) |
| .build(new CacheLoader<BookieSocketAddress, Long>() { |
| @Override |
| public Long load(BookieSocketAddress key) throws Exception { |
| return -1L; |
| } |
| }); |
| return initialize( |
| dnsResolver, |
| timer, |
| conf.getBoolean(REPP_RANDOM_READ_REORDERING, false), |
| conf.getNetworkTopologyStabilizePeriodSeconds(), |
| conf.getReorderThresholdPendingRequests(), |
| conf.getDiskWeightBasedPlacementEnabled(), |
| conf.getBookieMaxWeightMultipleForWeightBasedPlacement(), |
| conf.getMinNumRacksPerWriteQuorum(), |
| conf.getEnforceMinNumRacksPerWriteQuorum(), |
| conf.getIgnoreLocalNodeInPlacementPolicy(), |
| statsLogger); |
| } |
| |
| @Override |
| public void uninitalize() { |
| // do nothing |
| } |
| |
| /* |
| * this method should be called in readlock scope of 'rwLock' |
| */ |
| protected Set<BookieSocketAddress> addDefaultRackBookiesIfMinNumRacksIsEnforced( |
| Set<BookieSocketAddress> excludeBookies) { |
| Set<BookieSocketAddress> comprehensiveExclusionBookiesSet; |
| if (enforceMinNumRacksPerWriteQuorum) { |
| Set<BookieSocketAddress> bookiesInDefaultRack = null; |
| Set<Node> defaultRackLeaves = topology.getLeaves(getDefaultRack()); |
| for (Node node : defaultRackLeaves) { |
| if (node instanceof BookieNode) { |
| if (bookiesInDefaultRack == null) { |
| bookiesInDefaultRack = new HashSet<BookieSocketAddress>(excludeBookies); |
| } |
| bookiesInDefaultRack.add(((BookieNode) node).getAddr()); |
| } else { |
| LOG.error("found non-BookieNode: {} as leaf of defaultrack: {}", node, getDefaultRack()); |
| } |
| } |
| if ((bookiesInDefaultRack == null) || bookiesInDefaultRack.isEmpty()) { |
| comprehensiveExclusionBookiesSet = excludeBookies; |
| } else { |
| comprehensiveExclusionBookiesSet = new HashSet<BookieSocketAddress>(excludeBookies); |
| comprehensiveExclusionBookiesSet.addAll(bookiesInDefaultRack); |
| LOG.info("enforceMinNumRacksPerWriteQuorum is enabled, so Excluding bookies of defaultRack: {}", |
| bookiesInDefaultRack); |
| } |
| } else { |
| comprehensiveExclusionBookiesSet = excludeBookies; |
| } |
| return comprehensiveExclusionBookiesSet; |
| } |
| |
| @Override |
| public PlacementResult<List<BookieSocketAddress>> newEnsemble(int ensembleSize, int writeQuorumSize, |
| int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) |
| throws BKNotEnoughBookiesException { |
| rwLock.readLock().lock(); |
| try { |
| Set<BookieSocketAddress> comprehensiveExclusionBookiesSet = addDefaultRackBookiesIfMinNumRacksIsEnforced( |
| excludeBookies); |
| PlacementResult<List<BookieSocketAddress>> newEnsembleResult = newEnsembleInternal(ensembleSize, |
| writeQuorumSize, ackQuorumSize, comprehensiveExclusionBookiesSet, null, null); |
| return newEnsembleResult; |
| } finally { |
| rwLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public PlacementResult<List<BookieSocketAddress>> newEnsemble(int ensembleSize, |
| int writeQuorumSize, |
| int ackQuorumSize, |
| Set<BookieSocketAddress> excludeBookies, |
| Ensemble<BookieNode> parentEnsemble, |
| Predicate<BookieNode> parentPredicate) |
| throws BKNotEnoughBookiesException { |
| return newEnsembleInternal( |
| ensembleSize, |
| writeQuorumSize, |
| ackQuorumSize, |
| excludeBookies, |
| parentEnsemble, |
| parentPredicate); |
| } |
| |
| protected PlacementResult<List<BookieSocketAddress>> newEnsembleInternal( |
| int ensembleSize, |
| int writeQuorumSize, |
| int ackQuorumSize, |
| Set<BookieSocketAddress> excludeBookies, |
| Ensemble<BookieNode> parentEnsemble, |
| Predicate<BookieNode> parentPredicate) throws BKNotEnoughBookiesException { |
| rwLock.readLock().lock(); |
| try { |
| Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies); |
| int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum); |
| RRTopologyAwareCoverageEnsemble ensemble = |
| new RRTopologyAwareCoverageEnsemble( |
| ensembleSize, |
| writeQuorumSize, |
| ackQuorumSize, |
| RACKNAME_DISTANCE_FROM_LEAVES, |
| parentEnsemble, |
| parentPredicate, |
| minNumRacksPerWriteQuorumForThisEnsemble); |
| BookieNode prevNode = null; |
| int numRacks = topology.getNumOfRacks(); |
| // only one rack, use the random algorithm. |
| if (numRacks < 2) { |
| if (enforceMinNumRacksPerWriteQuorum && (minNumRacksPerWriteQuorumForThisEnsemble > 1)) { |
| LOG.error("Only one rack available and minNumRacksPerWriteQuorum is enforced, so giving up"); |
| throw new BKNotEnoughBookiesException(); |
| } |
| List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.INSTANCE, |
| ensemble); |
| ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize); |
| for (BookieNode bn : bns) { |
| addrs.add(bn.getAddr()); |
| } |
| return PlacementResult.of(addrs, false); |
| } |
| |
| for (int i = 0; i < ensembleSize; i++) { |
| String curRack; |
| if (null == prevNode) { |
| if ((null == localNode) || defaultRack.equals(localNode.getNetworkLocation())) { |
| curRack = NodeBase.ROOT; |
| } else { |
| curRack = localNode.getNetworkLocation(); |
| } |
| } else { |
| curRack = "~" + prevNode.getNetworkLocation(); |
| } |
| boolean firstBookieInTheEnsemble = (null == prevNode); |
| prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble, |
| !enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble); |
| } |
| List<BookieSocketAddress> bookieList = ensemble.toList(); |
| if (ensembleSize != bookieList.size()) { |
| LOG.error("Not enough {} bookies are available to form an ensemble : {}.", |
| ensembleSize, bookieList); |
| throw new BKNotEnoughBookiesException(); |
| } |
| return PlacementResult.of(bookieList, |
| isEnsembleAdheringToPlacementPolicy( |
| bookieList, writeQuorumSize, ackQuorumSize)); |
| } finally { |
| rwLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public PlacementResult<BookieSocketAddress> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, |
| Map<String, byte[]> customMetadata, List<BookieSocketAddress> currentEnsemble, |
| BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) |
| throws BKNotEnoughBookiesException { |
| rwLock.readLock().lock(); |
| try { |
| excludeBookies = addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies); |
| excludeBookies.addAll(currentEnsemble); |
| BookieNode bn = knownBookies.get(bookieToReplace); |
| if (null == bn) { |
| bn = createBookieNode(bookieToReplace); |
| } |
| |
| Set<Node> ensembleNodes = convertBookiesToNodes(currentEnsemble); |
| Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies); |
| |
| excludeNodes.addAll(ensembleNodes); |
| excludeNodes.add(bn); |
| ensembleNodes.remove(bn); |
| |
| Set<String> networkLocationsToBeExcluded = getNetworkLocations(ensembleNodes); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Try to choose a new bookie to replace {} from ensemble {}, excluding {}.", |
| bookieToReplace, ensembleNodes, excludeNodes); |
| } |
| // pick a candidate from same rack to replace |
| BookieNode candidate = selectFromNetworkLocation( |
| bn.getNetworkLocation(), |
| networkLocationsToBeExcluded, |
| excludeNodes, |
| TruePredicate.INSTANCE, |
| EnsembleForReplacementWithNoConstraints.INSTANCE, |
| !enforceMinNumRacksPerWriteQuorum); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn); |
| } |
| BookieSocketAddress candidateAddr = candidate.getAddr(); |
| List<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(currentEnsemble); |
| if (currentEnsemble.isEmpty()) { |
| /* |
| * in testing code there are test cases which would pass empty |
| * currentEnsemble |
| */ |
| newEnsemble.add(candidateAddr); |
| } else { |
| newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr); |
| } |
| return PlacementResult.of(candidateAddr, |
| isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize)); |
| } finally { |
| rwLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> bookieInfoMap) { |
| if (!isWeighted) { |
| LOG.info("bookieFreeDiskInfo callback called even without weighted placement policy being used."); |
| return; |
| } |
| List<BookieNode> allBookies = new ArrayList<BookieNode>(knownBookies.values()); |
| |
| // create a new map to reflect the new mapping |
| Map<BookieNode, WeightedObject> map = new HashMap<BookieNode, WeightedObject>(); |
| for (BookieNode bookie : allBookies) { |
| if (bookieInfoMap.containsKey(bookie.getAddr())) { |
| map.put(bookie, bookieInfoMap.get(bookie.getAddr())); |
| } else { |
| map.put(bookie, new BookieInfo()); |
| } |
| } |
| rwLock.writeLock().lock(); |
| try { |
| this.bookieInfoMap = map; |
| this.weightedSelection.updateMap(this.bookieInfoMap); |
| } finally { |
| rwLock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public BookieNode selectFromNetworkLocation( |
| String networkLoc, |
| Set<Node> excludeBookies, |
| Predicate<BookieNode> predicate, |
| Ensemble<BookieNode> ensemble, |
| boolean fallbackToRandom) |
| throws BKNotEnoughBookiesException { |
| // select one from local rack |
| try { |
| return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble); |
| } catch (BKNotEnoughBookiesException e) { |
| if (!fallbackToRandom) { |
| LOG.error( |
| "Failed to choose a bookie from {} : " |
| + "excluded {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.", |
| networkLoc, excludeBookies); |
| throw e; |
| } |
| LOG.warn("Failed to choose a bookie from {} : " |
| + "excluded {}, fallback to choose bookie randomly from the cluster.", |
| networkLoc, excludeBookies); |
| // randomly choose one from whole cluster, ignore the provided predicate. |
| return selectRandom(1, excludeBookies, predicate, ensemble).get(0); |
| } |
| } |
| |
| @Override |
| public BookieNode selectFromNetworkLocation(String networkLoc, |
| Set<String> excludeRacks, |
| Set<Node> excludeBookies, |
| Predicate<BookieNode> predicate, |
| Ensemble<BookieNode> ensemble, |
| boolean fallbackToRandom) |
| throws BKNotEnoughBookiesException { |
| // first attempt to select one from local rack |
| try { |
| return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble); |
| } catch (BKNotEnoughBookiesException e) { |
| /* |
| * there is no enough bookie from local rack, select bookies from |
| * the whole cluster and exclude the racks specified at |
| * <tt>excludeRacks</tt>. |
| */ |
| return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom); |
| } |
| } |
| |
| |
| /** |
| * It randomly selects a {@link BookieNode} that is not on the <i>excludeRacks</i> set, excluding the nodes in |
| * <i>excludeBookies</i> set. If it fails to find one, it selects a random {@link BookieNode} from the whole |
| * cluster. |
| */ |
| @Override |
| public BookieNode selectFromNetworkLocation(Set<String> excludeRacks, |
| Set<Node> excludeBookies, |
| Predicate<BookieNode> predicate, |
| Ensemble<BookieNode> ensemble, |
| boolean fallbackToRandom) |
| throws BKNotEnoughBookiesException { |
| |
| List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values()); |
| Set<Node> fullExclusionBookiesList = new HashSet<Node>(excludeBookies); |
| for (BookieNode knownNode : knownNodes) { |
| if (excludeRacks.contains(knownNode.getNetworkLocation())) { |
| fullExclusionBookiesList.add(knownNode); |
| } |
| } |
| |
| try { |
| return selectRandomInternal(knownNodes, 1, fullExclusionBookiesList, predicate, ensemble).get(0); |
| } catch (BKNotEnoughBookiesException e) { |
| if (!fallbackToRandom) { |
| LOG.error( |
| "Failed to choose a bookie excluding Racks: {} " |
| + "Nodes: {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.", |
| excludeRacks, excludeBookies); |
| throw e; |
| } |
| |
| LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.", |
| excludeBookies); |
| // randomly choose one from whole cluster |
| return selectRandom(1, excludeBookies, predicate, ensemble).get(0); |
| } |
| } |
| |
| private WeightedRandomSelection<BookieNode> prepareForWeightedSelection(List<Node> leaves) { |
| // create a map of bookieNode->freeDiskSpace for this rack. The assumption is that |
| // the number of nodes in a rack is of the order of 40, so it shouldn't be too bad |
| // to build it every time during a ledger creation |
| Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>(); |
| for (Node n : leaves) { |
| if (!(n instanceof BookieNode)) { |
| continue; |
| } |
| BookieNode bookie = (BookieNode) n; |
| if (this.bookieInfoMap.containsKey(bookie)) { |
| rackMap.put(bookie, this.bookieInfoMap.get(bookie)); |
| } else { |
| rackMap.put(bookie, new BookieInfo()); |
| } |
| } |
| if (rackMap.size() == 0) { |
| return null; |
| } |
| |
| WeightedRandomSelection<BookieNode> wRSelection = new WeightedRandomSelectionImpl<BookieNode>( |
| maxWeightMultiple); |
| wRSelection.updateMap(rackMap); |
| return wRSelection; |
| } |
| |
| /** |
| * Choose random node under a given network path. |
| * |
| * @param netPath |
| * network path |
| * @param excludeBookies |
| * exclude bookies |
| * @param predicate |
| * predicate to check whether the target is a good target. |
| * @param ensemble |
| * ensemble structure |
| * @return chosen bookie. |
| */ |
| protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate, |
| Ensemble<BookieNode> ensemble) throws BKNotEnoughBookiesException { |
| WeightedRandomSelection<BookieNode> wRSelection = null; |
| List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath)); |
| if (!this.isWeighted) { |
| Collections.shuffle(leaves); |
| } else { |
| if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) { |
| throw new BKNotEnoughBookiesException(); |
| } |
| wRSelection = prepareForWeightedSelection(leaves); |
| if (wRSelection == null) { |
| throw new BKNotEnoughBookiesException(); |
| } |
| } |
| |
| Iterator<Node> it = leaves.iterator(); |
| Set<Node> bookiesSeenSoFar = new HashSet<Node>(); |
| while (true) { |
| Node n; |
| if (isWeighted) { |
| if (bookiesSeenSoFar.size() == leaves.size()) { |
| // Don't loop infinitely. |
| break; |
| } |
| n = wRSelection.getNextRandom(); |
| bookiesSeenSoFar.add(n); |
| } else { |
| if (it.hasNext()) { |
| n = it.next(); |
| } else { |
| break; |
| } |
| } |
| if (excludeBookies.contains(n)) { |
| continue; |
| } |
| if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) { |
| continue; |
| } |
| BookieNode bn = (BookieNode) n; |
| // got a good candidate |
| if (ensemble.addNode(bn)) { |
| // add the candidate to exclude set |
| excludeBookies.add(bn); |
| } |
| return bn; |
| } |
| throw new BKNotEnoughBookiesException(); |
| } |
| |
| /** |
| * Choose a random node from whole cluster. |
| * |
| * @param numBookies |
| * number bookies to choose |
| * @param excludeBookies |
| * bookies set to exclude. |
| * @param ensemble |
| * ensemble to hold the bookie chosen. |
| * @return the bookie node chosen. |
| * @throws BKNotEnoughBookiesException |
| */ |
| protected List<BookieNode> selectRandom(int numBookies, |
| Set<Node> excludeBookies, |
| Predicate<BookieNode> predicate, |
| Ensemble<BookieNode> ensemble) |
| throws BKNotEnoughBookiesException { |
| return selectRandomInternal(null, numBookies, excludeBookies, predicate, ensemble); |
| } |
| |
| protected List<BookieNode> selectRandomInternal(List<BookieNode> bookiesToSelectFrom, |
| int numBookies, |
| Set<Node> excludeBookies, |
| Predicate<BookieNode> predicate, |
| Ensemble<BookieNode> ensemble) |
| throws BKNotEnoughBookiesException { |
| WeightedRandomSelection<BookieNode> wRSelection = null; |
| if (bookiesToSelectFrom == null) { |
| // If the list is null, we need to select from the entire knownBookies set |
| wRSelection = this.weightedSelection; |
| bookiesToSelectFrom = new ArrayList<BookieNode>(knownBookies.values()); |
| } |
| if (isWeighted) { |
| if (CollectionUtils.subtract(bookiesToSelectFrom, excludeBookies).size() < numBookies) { |
| throw new BKNotEnoughBookiesException(); |
| } |
| if (wRSelection == null) { |
| Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>(); |
| for (BookieNode n : bookiesToSelectFrom) { |
| if (excludeBookies.contains(n)) { |
| continue; |
| } |
| if (this.bookieInfoMap.containsKey(n)) { |
| rackMap.put(n, this.bookieInfoMap.get(n)); |
| } else { |
| rackMap.put(n, new BookieInfo()); |
| } |
| } |
| wRSelection = new WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple); |
| wRSelection.updateMap(rackMap); |
| } |
| } else { |
| Collections.shuffle(bookiesToSelectFrom); |
| } |
| |
| BookieNode bookie; |
| List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies); |
| Iterator<BookieNode> it = bookiesToSelectFrom.iterator(); |
| Set<BookieNode> bookiesSeenSoFar = new HashSet<BookieNode>(); |
| while (numBookies > 0) { |
| if (isWeighted) { |
| if (bookiesSeenSoFar.size() == bookiesToSelectFrom.size()) { |
| // If we have gone through the whole available list of bookies, |
| // and yet haven't been able to satisfy the ensemble request, bail out. |
| // We don't want to loop infinitely. |
| break; |
| } |
| bookie = wRSelection.getNextRandom(); |
| bookiesSeenSoFar.add(bookie); |
| } else { |
| if (it.hasNext()) { |
| bookie = it.next(); |
| } else { |
| break; |
| } |
| } |
| if (excludeBookies.contains(bookie)) { |
| continue; |
| } |
| |
| // When durability is being enforced; we must not violate the |
| // predicate even when selecting a random bookie; as durability |
| // guarantee is not best effort; correctness is implied by it |
| if (enforceDurability && !predicate.apply(bookie, ensemble)) { |
| continue; |
| } |
| |
| if (ensemble.addNode(bookie)) { |
| excludeBookies.add(bookie); |
| newBookies.add(bookie); |
| --numBookies; |
| } |
| } |
| if (numBookies == 0) { |
| return newBookies; |
| } |
| LOG.warn("Failed to find {} bookies : excludeBookies {}, allBookies {}.", |
| numBookies, excludeBookies, bookiesToSelectFrom); |
| |
| throw new BKNotEnoughBookiesException(); |
| } |
| |
| @Override |
| public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long entryId) { |
| if (reorderThresholdPendingRequests <= 0) { |
| // only put bookies on slowBookies list if reorderThresholdPendingRequests is *not* set (0); |
| // otherwise, rely on reordering of reads based on reorderThresholdPendingRequests |
| slowBookies.put(bookieSocketAddress, entryId); |
| } |
| } |
| |
| @Override |
| public DistributionSchedule.WriteSet reorderReadSequence( |
| List<BookieSocketAddress> ensemble, |
| BookiesHealthInfo bookiesHealthInfo, |
| DistributionSchedule.WriteSet writeSet) { |
| Map<Integer, String> writeSetWithRegion = new HashMap<>(); |
| for (int i = 0; i < writeSet.size(); i++) { |
| writeSetWithRegion.put(writeSet.get(i), ""); |
| } |
| return reorderReadSequenceWithRegion( |
| ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, false, "", writeSet.size()); |
| } |
| |
| /** |
| * This function orders the read sequence with a given region. For region-unaware policies (e.g. |
| * RackAware), we pass in false for regionAware and an empty myRegion. When this happens, any |
| * remote list will stay empty. The ordering is as follows (the R* at the beginning of each list item |
| * is only present for region aware policies). |
| * 1. available (local) bookies |
| * 2. R* a remote bookie (based on remoteNodeInReorderSequence |
| * 3. R* remaining (local) bookies |
| * 4. R* remaining remote bookies |
| * 5. read only bookies |
| * 6. slow bookies |
| * 7. unavailable bookies |
| * |
| * @param ensemble |
| * ensemble of bookies |
| * @param writeSet |
| * write set |
| * @param writeSetWithRegion |
| * write set with region information |
| * @param bookiesHealthInfo |
| * heuristics about health of boookies |
| * @param regionAware |
| * whether or not a region-aware policy is used |
| * @param myRegion |
| * current region of policy |
| * @param remoteNodeInReorderSequence |
| * number of local bookies to try before trying a remote bookie |
| * @return ordering of bookies to send read to |
| */ |
| DistributionSchedule.WriteSet reorderReadSequenceWithRegion( |
| List<BookieSocketAddress> ensemble, |
| DistributionSchedule.WriteSet writeSet, |
| Map<Integer, String> writeSetWithRegion, |
| BookiesHealthInfo bookiesHealthInfo, |
| boolean regionAware, |
| String myRegion, |
| int remoteNodeInReorderSequence) { |
| boolean useRegionAware = regionAware && (!myRegion.equals(UNKNOWN_REGION)); |
| int ensembleSize = ensemble.size(); |
| |
| // For rack aware, If all the bookies in the write set are available, simply return the original write set, |
| // to avoid creating more lists |
| boolean isAnyBookieUnavailable = false; |
| |
| if (useRegionAware || reorderReadsRandom) { |
| isAnyBookieUnavailable = true; |
| } else { |
| for (int i = 0; i < ensemble.size(); i++) { |
| BookieSocketAddress bookieAddr = ensemble.get(i); |
| if ((!knownBookies.containsKey(bookieAddr) && !readOnlyBookies.contains(bookieAddr)) |
| || slowBookies.getIfPresent(bookieAddr) != null) { |
| // Found at least one bookie not available in the ensemble, or in slowBookies |
| isAnyBookieUnavailable = true; |
| break; |
| } |
| } |
| } |
| |
| boolean reordered = false; |
| if (reorderThresholdPendingRequests > 0) { |
| // if there are no slow or unavailable bookies, capture each bookie's number of |
| // pending request to reorder requests based on a threshold of pending requests |
| |
| // number of pending requests per bookie (same index as writeSet) |
| long[] pendingReqs = new long[writeSet.size()]; |
| int bestBookieIdx = -1; |
| |
| for (int i = 0; i < writeSet.size(); i++) { |
| pendingReqs[i] = bookiesHealthInfo.getBookiePendingRequests(ensemble.get(writeSet.get(i))); |
| if (bestBookieIdx < 0 || pendingReqs[i] < pendingReqs[bestBookieIdx]) { |
| bestBookieIdx = i; |
| } |
| } |
| |
| // reorder the writeSet if the currently first bookie in our writeSet has at |
| // least |
| // reorderThresholdPendingRequests more outstanding request than the best bookie |
| if (bestBookieIdx > 0 && pendingReqs[0] >= pendingReqs[bestBookieIdx] + reorderThresholdPendingRequests) { |
| // We're not reordering the entire write set, but only move the best bookie |
| // to the first place. Chances are good that this bookie will be fast enough |
| // to not trigger the speculativeReadTimeout. But even if it hits that timeout, |
| // things may have changed by then so much that whichever bookie we put second |
| // may actually not be the second-best choice any more. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("read set reordered from {} ({} pending) to {} ({} pending)", |
| ensemble.get(writeSet.get(0)), pendingReqs[0], ensemble.get(writeSet.get(bestBookieIdx)), |
| pendingReqs[bestBookieIdx]); |
| } |
| writeSet.moveAndShift(bestBookieIdx, 0); |
| reordered = true; |
| } |
| } |
| |
| if (!isAnyBookieUnavailable) { |
| if (reordered) { |
| readReorderedCounter.registerSuccessfulValue(1); |
| } |
| return writeSet; |
| } |
| |
| for (int i = 0; i < writeSet.size(); i++) { |
| int idx = writeSet.get(i); |
| BookieSocketAddress address = ensemble.get(idx); |
| String region = writeSetWithRegion.get(idx); |
| Long lastFailedEntryOnBookie = bookiesHealthInfo.getBookieFailureHistory(address); |
| if (null == knownBookies.get(address)) { |
| // there isn't too much differences between readonly bookies |
| // from unavailable bookies. since there |
| // is no write requests to them, so we shouldn't try reading |
| // from readonly bookie prior to writable bookies. |
| if ((null == readOnlyBookies) |
| || !readOnlyBookies.contains(address)) { |
| writeSet.set(i, idx | UNAVAIL_MASK); |
| } else { |
| if (slowBookies.getIfPresent(address) != null) { |
| long numPendingReqs = bookiesHealthInfo.getBookiePendingRequests(address); |
| // use slow bookies with less pending requests first |
| long slowIdx = numPendingReqs * ensembleSize + idx; |
| writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK); |
| } else { |
| writeSet.set(i, idx | READ_ONLY_MASK); |
| } |
| } |
| } else if (lastFailedEntryOnBookie < 0) { |
| if (slowBookies.getIfPresent(address) != null) { |
| long numPendingReqs = bookiesHealthInfo.getBookiePendingRequests(address); |
| long slowIdx = numPendingReqs * ensembleSize + idx; |
| writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK); |
| } else { |
| if (useRegionAware && !myRegion.equals(region)) { |
| writeSet.set(i, idx | REMOTE_MASK); |
| } else { |
| writeSet.set(i, idx | LOCAL_MASK); |
| } |
| } |
| } else { |
| // use bookies with earlier failed entryIds first |
| long failIdx = lastFailedEntryOnBookie * ensembleSize + idx; |
| if (useRegionAware && !myRegion.equals(region)) { |
| writeSet.set(i, (int) (failIdx & ~MASK_BITS) | REMOTE_FAIL_MASK); |
| } else { |
| writeSet.set(i, (int) (failIdx & ~MASK_BITS) | LOCAL_FAIL_MASK); |
| } |
| } |
| } |
| |
| // Add a mask to ensure the sort is stable, sort, |
| // and then remove mask. This maintains stability as |
| // long as there are fewer than 16 bookies in the write set. |
| for (int i = 0; i < writeSet.size(); i++) { |
| writeSet.set(i, writeSet.get(i) | ((i & 0xF) << 20)); |
| } |
| writeSet.sort(); |
| for (int i = 0; i < writeSet.size(); i++) { |
| writeSet.set(i, writeSet.get(i) & ~((0xF) << 20)); |
| } |
| |
| if (reorderReadsRandom) { |
| shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS); |
| shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS); |
| shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS); |
| shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS); |
| } |
| |
| // nodes within a region are ordered as follows |
| // (Random?) list of nodes that have no history of failure |
| // Nodes with Failure history are ordered in the reverse |
| // order of the most recent entry that generated an error |
| // The sort will have put them in correct order, |
| // so remove the bits that sort by age. |
| for (int i = 0; i < writeSet.size(); i++) { |
| int mask = writeSet.get(i) & MASK_BITS; |
| int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize; |
| if (mask == LOCAL_FAIL_MASK) { |
| writeSet.set(i, LOCAL_MASK | idx); |
| } else if (mask == REMOTE_FAIL_MASK) { |
| writeSet.set(i, REMOTE_MASK | idx); |
| } else if (mask == SLOW_MASK) { |
| writeSet.set(i, SLOW_MASK | idx); |
| } |
| } |
| |
| // Insert a node from the remote region at the specified location so |
| // we try more than one region within the max allowed latency |
| int firstRemote = -1; |
| for (int i = 0; i < writeSet.size(); i++) { |
| if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) { |
| firstRemote = i; |
| break; |
| } |
| } |
| if (firstRemote != -1) { |
| int i = 0; |
| for (; i < remoteNodeInReorderSequence |
| && i < writeSet.size(); i++) { |
| if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) { |
| break; |
| } |
| } |
| writeSet.moveAndShift(firstRemote, i); |
| } |
| |
| |
| // remove all masks |
| for (int i = 0; i < writeSet.size(); i++) { |
| writeSet.set(i, writeSet.get(i) & ~MASK_BITS); |
| } |
| readReorderedCounter.registerSuccessfulValue(1); |
| return writeSet; |
| } |
| |
| // this method should be called in readlock scope of 'rwlock' |
| @Override |
| public boolean isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int writeQuorumSize, |
| int ackQuorumSize) { |
| int ensembleSize = ensembleList.size(); |
| int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum); |
| HashSet<String> racksInQuorum = new HashSet<String>(); |
| BookieSocketAddress bookie; |
| for (int i = 0; i < ensembleList.size(); i++) { |
| racksInQuorum.clear(); |
| for (int j = 0; j < writeQuorumSize; j++) { |
| bookie = ensembleList.get((i + j) % ensembleSize); |
| try { |
| racksInQuorum.add(knownBookies.get(bookie).getNetworkLocation()); |
| } catch (Exception e) { |
| /* |
| * any issue/exception in analyzing whether ensemble is |
| * strictly adhering to placement policy should be |
| * swallowed. |
| */ |
| LOG.warn("Received exception while trying to get network location of bookie: {}", bookie, e); |
| } |
| } |
| if ((racksInQuorum.size() < minNumRacksPerWriteQuorumForThisEnsemble) |
| || (enforceMinNumRacksPerWriteQuorum && racksInQuorum.contains(getDefaultRack()))) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieSocketAddress> ackedBookies, |
| int writeQuorumSize, |
| int ackQuorumSize) { |
| HashSet<String> rackCounter = new HashSet<>(); |
| int minWriteQuorumNumRacksPerWriteQuorum = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum); |
| |
| ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock(); |
| readLock.lock(); |
| try { |
| for (BookieSocketAddress bookie : ackedBookies) { |
| rackCounter.add(knownBookies.get(bookie).getNetworkLocation()); |
| } |
| |
| // Check to make sure that ensemble is writing to `minNumberOfRacks`'s number of racks at least. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("areAckedBookiesAdheringToPlacementPolicy returning {} because number of racks = {} and " |
| + "minNumRacksPerWriteQuorum = {}", |
| rackCounter.size() >= minNumRacksPerWriteQuorum, |
| rackCounter.size(), |
| minNumRacksPerWriteQuorum); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum; |
| } |
| } |