blob: f4dd9a907ad447084347c6f3e5e87c9886351ecf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
import static org.apache.bookkeeper.client.BookKeeperClientStats.CREATE_OP;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.BookKeeperServerStats;
import org.apache.bookkeeper.client.BKException.BKInterruptedException;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BKException.MetaStoreException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
/**
* This class is responsible for maintaining a consistent view of what bookies
* are available by reading Zookeeper (and setting watches on the bookie nodes).
* When a bookie fails, the other parts of the code turn to this class to find a
* replacement
*
*/
@StatsDoc(
name = WATCHER_SCOPE,
help = "Bookie watcher related stats"
)
@Slf4j
class BookieWatcherImpl implements BookieWatcher {
private static final Function<Throwable, BKException> EXCEPTION_FUNC = cause -> {
if (cause instanceof BKException) {
log.error("Failed to get bookie list : ", cause);
return (BKException) cause;
} else if (cause instanceof InterruptedException) {
log.error("Interrupted reading bookie list : ", cause);
return new BKInterruptedException();
} else {
MetaStoreException mse = new MetaStoreException(cause);
return mse;
}
};
private final ClientConfiguration conf;
private final RegistrationClient registrationClient;
private final EnsemblePlacementPolicy placementPolicy;
@StatsDoc(
name = NEW_ENSEMBLE_TIME,
help = "operation stats of new ensembles",
parent = CREATE_OP
)
private final OpStatsLogger newEnsembleTimer;
@StatsDoc(
name = REPLACE_BOOKIE_TIME,
help = "operation stats of replacing bookie in an ensemble"
)
private final OpStatsLogger replaceBookieTimer;
@StatsDoc(
name = ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER,
help = "total number of newEnsemble/replaceBookie operations failed to adhere"
+ " EnsemblePlacementPolicy"
)
private final Counter ensembleNotAdheringToPlacementPolicy;
// Bookies that will not be preferred to be chosen in a new ensemble
final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
private volatile Set<BookieSocketAddress> writableBookies = Collections.emptySet();
private volatile Set<BookieSocketAddress> readOnlyBookies = Collections.emptySet();
private CompletableFuture<?> initialWritableBookiesFuture = null;
private CompletableFuture<?> initialReadonlyBookiesFuture = null;
public BookieWatcherImpl(ClientConfiguration conf,
EnsemblePlacementPolicy placementPolicy,
RegistrationClient registrationClient,
StatsLogger statsLogger) {
this.conf = conf;
this.placementPolicy = placementPolicy;
this.registrationClient = registrationClient;
this.quarantinedBookies = CacheBuilder.newBuilder()
.expireAfterWrite(conf.getBookieQuarantineTimeSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<BookieSocketAddress, Boolean>() {
@Override
public void onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie) {
log.info("Bookie {} is no longer quarantined", bookie.getKey());
}
}).build();
this.newEnsembleTimer = statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
this.replaceBookieTimer = statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
this.ensembleNotAdheringToPlacementPolicy = statsLogger
.getCounter(BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
}
@Override
public Set<BookieSocketAddress> getBookies() throws BKException {
try {
return FutureUtils.result(registrationClient.getWritableBookies(), EXCEPTION_FUNC).getValue();
} catch (BKInterruptedException ie) {
Thread.currentThread().interrupt();
throw ie;
}
}
@Override
public Set<BookieSocketAddress> getReadOnlyBookies()
throws BKException {
try {
return FutureUtils.result(registrationClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue();
} catch (BKInterruptedException ie) {
Thread.currentThread().interrupt();
throw ie;
}
}
// this callback is already not executed in zookeeper thread
private synchronized void processWritableBookiesChanged(Set<BookieSocketAddress> newBookieAddrs) {
// Update watcher outside ZK callback thread, to avoid deadlock in case some other
// component is trying to do a blocking ZK operation
this.writableBookies = newBookieAddrs;
placementPolicy.onClusterChanged(newBookieAddrs, readOnlyBookies);
// we don't need to close clients here, because:
// a. the dead bookies will be removed from topology, which will not be used in new ensemble.
// b. the read sequence will be reordered based on znode availability, so most of the reads
// will not be sent to them.
// c. the close here is just to disconnect the channel, which doesn't remove the channel from
// from pcbc map. we don't really need to disconnect the channel here, since if a bookie is
// really down, PCBC will disconnect itself based on netty callback. if we try to disconnect
// here, it actually introduces side-effects on case d.
// d. closing the client here will affect latency if the bookie is alive but just being flaky
// on its znode registration due zookeeper session expire.
// e. if we want to permanently remove a bookkeeper client, we should watch on the cookies' list.
// if (bk.getBookieClient() != null) {
// bk.getBookieClient().closeClients(deadBookies);
// }
}
private synchronized void processReadOnlyBookiesChanged(Set<BookieSocketAddress> readOnlyBookies) {
this.readOnlyBookies = readOnlyBookies;
placementPolicy.onClusterChanged(writableBookies, readOnlyBookies);
}
/**
* Blocks until bookies are read from zookeeper, used in the {@link BookKeeper} constructor.
*
* @throws BKException when failed to read bookies
*/
public void initialBlockingBookieRead() throws BKException {
CompletableFuture<?> writable;
CompletableFuture<?> readonly;
synchronized (this) {
if (initialReadonlyBookiesFuture == null) {
assert initialWritableBookiesFuture == null;
writable = this.registrationClient.watchWritableBookies(
bookies -> processWritableBookiesChanged(bookies.getValue()));
readonly = this.registrationClient.watchReadOnlyBookies(
bookies -> processReadOnlyBookiesChanged(bookies.getValue()));
initialWritableBookiesFuture = writable;
initialReadonlyBookiesFuture = readonly;
} else {
writable = initialWritableBookiesFuture;
readonly = initialReadonlyBookiesFuture;
}
}
try {
FutureUtils.result(writable, EXCEPTION_FUNC);
} catch (BKInterruptedException ie) {
Thread.currentThread().interrupt();
throw ie;
}
try {
FutureUtils.result(readonly, EXCEPTION_FUNC);
} catch (BKInterruptedException ie) {
Thread.currentThread().interrupt();
throw ie;
} catch (Exception e) {
log.error("Failed getReadOnlyBookies: ", e);
}
}
@Override
public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata)
throws BKNotEnoughBookiesException {
long startTime = MathUtils.nowInNano();
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> newEnsembleResponse;
List<BookieSocketAddress> socketAddresses;
boolean isEnsembleAdheringToPlacementPolicy = false;
try {
Set<BookieSocketAddress> quarantinedBookiesSet = quarantinedBookies.asMap().keySet();
newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, new HashSet<BookieSocketAddress>(quarantinedBookiesSet));
socketAddresses = newEnsembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn("New ensemble: {} is not adhering to Placement Policy. quarantinedBookies: {}",
socketAddresses, quarantinedBookiesSet);
}
// we try to only get from the healthy bookies first
newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
} catch (BKNotEnoughBookiesException e) {
if (log.isDebugEnabled()) {
log.debug("Not enough healthy bookies available, using quarantined bookies");
}
newEnsembleResponse = placementPolicy.newEnsemble(
ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
socketAddresses = newEnsembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn("New ensemble: {} is not adhering to Placement Policy", socketAddresses);
}
newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
}
return socketAddresses;
}
@Override
public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata,
List<BookieSocketAddress> existingBookies, int bookieIdx,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
long startTime = MathUtils.nowInNano();
BookieSocketAddress addr = existingBookies.get(bookieIdx);
EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress> replaceBookieResponse;
BookieSocketAddress socketAddress;
boolean isEnsembleAdheringToPlacementPolicy = false;
try {
// we exclude the quarantined bookies also first
Set<BookieSocketAddress> excludedBookiesAndQuarantinedBookies = new HashSet<BookieSocketAddress>(
excludeBookies);
Set<BookieSocketAddress> quarantinedBookiesSet = quarantinedBookies.asMap().keySet();
excludedBookiesAndQuarantinedBookies.addAll(quarantinedBookiesSet);
replaceBookieResponse = placementPolicy.replaceBookie(
ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
existingBookies, addr, excludedBookiesAndQuarantinedBookies);
socketAddress = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn(
"replaceBookie for bookie: {} in ensemble: {} is not adhering to placement policy and"
+ " chose {}. excludedBookies {} and quarantinedBookies {}",
addr, existingBookies, socketAddress, excludeBookies, quarantinedBookiesSet);
}
replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
} catch (BKNotEnoughBookiesException e) {
if (log.isDebugEnabled()) {
log.debug("Not enough healthy bookies available, using quarantined bookies");
}
replaceBookieResponse = placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, existingBookies, addr, excludeBookies);
socketAddress = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn(
"replaceBookie for bookie: {} in ensemble: {} is not adhering to placement policy and"
+ " chose {}. excludedBookies {}",
addr, existingBookies, socketAddress, excludeBookies);
}
replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
}
return socketAddress;
}
/**
* Quarantine <i>bookie</i> so it will not be preferred to be chosen for new ensembles.
* @param bookie
*/
@Override
public void quarantineBookie(BookieSocketAddress bookie) {
if (quarantinedBookies.getIfPresent(bookie) == null) {
quarantinedBookies.put(bookie, Boolean.TRUE);
log.warn("Bookie {} has been quarantined because of read/write errors.", bookie);
}
}
}