blob: 52dde9cba6efd6eb25fdc2d308e4ef6978bd470d [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateAdvCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncDeleteCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncOpenCallback;
import org.apache.bookkeeper.client.api.BookKeeperBuilder;
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.meta.CleanupLedgerManager;
import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.EventLoopUtil;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* BookKeeper client.
*
* <p>We assume there is one single writer to a ledger at any time.
*
* <p>There are four possible operations: start a new ledger, write to a ledger,
* read from a ledger and delete a ledger.
*
* <p>The exceptions resulting from synchronous calls and error code resulting from
* asynchronous calls can be found in the class {@link BKException}.
*/
public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
private static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
final EventLoopGroup eventLoopGroup;
private final ByteBufAllocator allocator;
// The stats logger for this client.
private final StatsLogger statsLogger;
private final BookKeeperClientStats clientStats;
// whether the event loop group is one we created, or is owned by whoever
// instantiated us
boolean ownEventLoopGroup = false;
final BookieClient bookieClient;
final BookieWatcherImpl bookieWatcher;
final OrderedExecutor mainWorkerPool;
final OrderedScheduler scheduler;
final HashedWheelTimer requestTimer;
final boolean ownTimer;
final FeatureProvider featureProvider;
final ScheduledExecutorService bookieInfoScheduler;
final MetadataClientDriver metadataDriver;
// Ledger manager responsible for how to store ledger meta data
final LedgerManagerFactory ledgerManagerFactory;
final LedgerManager ledgerManager;
final LedgerIdGenerator ledgerIdGenerator;
// Ensemble Placement Policy
final EnsemblePlacementPolicy placementPolicy;
BookieInfoReader bookieInfoReader;
final ClientConfiguration conf;
final ClientInternalConf internalConf;
// Close State
boolean closed = false;
final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
/**
* BookKeeper Client Builder to build client instances.
*
* @see BookKeeperBuilder
*/
public static class Builder {
final ClientConfiguration conf;
ZooKeeper zk = null;
EventLoopGroup eventLoopGroup = null;
ByteBufAllocator allocator = null;
StatsLogger statsLogger = NullStatsLogger.INSTANCE;
DNSToSwitchMapping dnsResolver = null;
HashedWheelTimer requestTimer = null;
FeatureProvider featureProvider = null;
Builder(ClientConfiguration conf) {
this.conf = conf;
}
/**
* Configure the bookkeeper client with a provided {@link EventLoopGroup}.
*
* @param f an external {@link EventLoopGroup} to use by the bookkeeper client.
* @return client builder.
* @deprecated since 4.5, use {@link #eventLoopGroup(EventLoopGroup)}
* @see #eventLoopGroup(EventLoopGroup)
*/
@Deprecated
public Builder setEventLoopGroup(EventLoopGroup f) {
eventLoopGroup = f;
return this;
}
/**
* Configure the bookkeeper client with a provided {@link ZooKeeper} client.
*
* @param zk an external {@link ZooKeeper} client to use by the bookkeeper client.
* @return client builder.
* @deprecated since 4.5, use {@link #zk(ZooKeeper)}
* @see #zk(ZooKeeper)
*/
@Deprecated
public Builder setZookeeper(ZooKeeper zk) {
this.zk = zk;
return this;
}
/**
* Configure the bookkeeper client with a provided {@link StatsLogger}.
*
* @param statsLogger an {@link StatsLogger} to use by the bookkeeper client to collect stats generated
* by the client.
* @return client builder.
* @deprecated since 4.5, use {@link #statsLogger(StatsLogger)}
* @see #statsLogger(StatsLogger)
*/
@Deprecated
public Builder setStatsLogger(StatsLogger statsLogger) {
this.statsLogger = statsLogger;
return this;
}
/**
* Configure the bookkeeper client with a provided {@link EventLoopGroup}.
*
* @param f an external {@link EventLoopGroup} to use by the bookkeeper client.
* @return client builder.
* @since 4.5
*/
public Builder eventLoopGroup(EventLoopGroup f) {
eventLoopGroup = f;
return this;
}
/**
* Configure the bookkeeper client with a provided {@link ByteBufAllocator}.
*
* @param allocator an external {@link ByteBufAllocator} to use by the bookkeeper client.
* @return client builder.
* @since 4.9
*/
public Builder allocator(ByteBufAllocator allocator) {
this.allocator = allocator;
return this;
}
/**
* Configure the bookkeeper client with a provided {@link ZooKeeper} client.
*
* @param zk an external {@link ZooKeeper} client to use by the bookkeeper client.
* @return client builder.
* @since 4.5
*/
@Deprecated
public Builder zk(ZooKeeper zk) {
this.zk = zk;
return this;
}
/**
* Configure the bookkeeper client with a provided {@link StatsLogger}.
*
* @param statsLogger an {@link StatsLogger} to use by the bookkeeper client to collect stats generated
* by the client.
* @return client builder.
* @since 4.5
*/
public Builder statsLogger(StatsLogger statsLogger) {
this.statsLogger = statsLogger;
return this;
}
/**
* Configure the bookkeeper client to use the provided dns resolver {@link DNSToSwitchMapping}.
*
* @param dnsResolver dns resolver for placement policy to use for resolving network locations.
* @return client builder
* @since 4.5
*/
public Builder dnsResolver(DNSToSwitchMapping dnsResolver) {
this.dnsResolver = dnsResolver;
return this;
}
/**
* Configure the bookkeeper client to use a provided {@link HashedWheelTimer}.
*
* @param requestTimer request timer for client to manage timer related tasks.
* @return client builder
* @since 4.5
*/
public Builder requestTimer(HashedWheelTimer requestTimer) {
this.requestTimer = requestTimer;
return this;
}
/**
* Feature Provider.
*
* @param featureProvider
* @return
*/
public Builder featureProvider(FeatureProvider featureProvider) {
this.featureProvider = featureProvider;
return this;
}
public BookKeeper build() throws IOException, InterruptedException, BKException {
checkNotNull(statsLogger, "No stats logger provided");
return new BookKeeper(conf, zk, eventLoopGroup, allocator, statsLogger, dnsResolver, requestTimer,
featureProvider);
}
}
public static Builder forConfig(final ClientConfiguration conf) {
return new Builder(conf);
}
/**
* Create a bookkeeper client. A zookeeper client and a client event loop group
* will be instantiated as part of this constructor.
*
* @param servers
* A list of one of more servers on which zookeeper is running. The
* client assumes that the running bookies have been registered with
* zookeeper under the path
* {@link AbstractConfiguration#getZkAvailableBookiesPath()}
* @throws IOException
* @throws InterruptedException
*/
public BookKeeper(String servers) throws IOException, InterruptedException,
BKException {
this(new ClientConfiguration().setMetadataServiceUri("zk+null://" + servers + "/ledgers"));
}
/**
* Create a bookkeeper client using a configuration object.
* A zookeeper client and a client event loop group will be
* instantiated as part of this constructor.
*
* @param conf
* Client Configuration object
* @throws IOException
* @throws InterruptedException
*/
public BookKeeper(final ClientConfiguration conf)
throws IOException, InterruptedException, BKException {
this(conf, null, null, null, NullStatsLogger.INSTANCE,
null, null, null);
}
private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException, IOException {
checkNotNull(zk, "No zookeeper instance provided");
if (!zk.getState().isConnected()) {
LOG.error("Unconnected zookeeper handle passed to bookkeeper");
throw new IOException(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
}
return zk;
}
private static EventLoopGroup validateEventLoopGroup(EventLoopGroup eventLoopGroup)
throws NullPointerException {
checkNotNull(eventLoopGroup, "No Event Loop Group provided");
return eventLoopGroup;
}
/**
* Create a bookkeeper client but use the passed in zookeeper client instead
* of instantiating one.
*
* @param conf
* Client Configuration object
* {@link ClientConfiguration}
* @param zk
* Zookeeper client instance connected to the zookeeper with which
* the bookies have registered
* @throws IOException
* @throws InterruptedException
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
throws IOException, InterruptedException, BKException {
this(conf, validateZooKeeper(zk), null, null, NullStatsLogger.INSTANCE, null, null, null);
}
/**
* Create a bookkeeper client but use the passed in zookeeper client and
* client event loop group instead of instantiating those.
*
* @param conf
* Client Configuration Object
* {@link ClientConfiguration}
* @param zk
* Zookeeper client instance connected to the zookeeper with which
* the bookies have registered. The ZooKeeper client must be connected
* before it is passed to BookKeeper. Otherwise a KeeperException is thrown.
* @param eventLoopGroup
* An event loop group that will be used to create connections to the bookies
* @throws IOException
* @throws InterruptedException
* @throws BKException in the event of a bookkeeper connection error
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup)
throws IOException, InterruptedException, BKException {
this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), null, NullStatsLogger.INSTANCE,
null, null, null);
}
/**
* Constructor for use with the builder. Other constructors also use it.
*/
@SuppressWarnings("deprecation")
@VisibleForTesting
BookKeeper(ClientConfiguration conf,
ZooKeeper zkc,
EventLoopGroup eventLoopGroup,
ByteBufAllocator byteBufAllocator,
StatsLogger rootStatsLogger,
DNSToSwitchMapping dnsResolver,
HashedWheelTimer requestTimer,
FeatureProvider featureProvider)
throws IOException, InterruptedException, BKException {
this.conf = conf;
// initialize feature provider
if (null == featureProvider) {
this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
} else {
this.featureProvider = featureProvider;
}
this.internalConf = ClientInternalConf.fromConfigAndFeatureProvider(conf, this.featureProvider);
// initialize resources
this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
this.mainWorkerPool = OrderedExecutor.newBuilder()
.name("BookKeeperClientWorker")
.numThreads(conf.getNumWorkerThreads())
.statsLogger(rootStatsLogger)
.traceTaskExecution(conf.getEnableTaskExecutionStats())
.preserveMdcForTaskExecution(conf.getPreserveMdcForTaskExecution())
.traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
.enableBusyWait(conf.isBusyWaitEnabled())
.build();
// initialize stats logger
this.statsLogger = rootStatsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
this.clientStats = BookKeeperClientStats.newInstance(this.statsLogger);
// initialize metadata driver
try {
String metadataServiceUriStr = conf.getMetadataServiceUri();
if (null != metadataServiceUriStr) {
this.metadataDriver = MetadataDrivers.getClientDriver(URI.create(metadataServiceUriStr));
} else {
checkNotNull(zkc, "No external zookeeper provided when no metadata service uri is found");
this.metadataDriver = MetadataDrivers.getClientDriver("zk");
}
this.metadataDriver.initialize(
conf,
scheduler,
rootStatsLogger,
java.util.Optional.ofNullable(zkc));
} catch (ConfigurationException ce) {
LOG.error("Failed to initialize metadata client driver using invalid metadata service uri", ce);
throw new IOException("Failed to initialize metadata client driver", ce);
} catch (MetadataException me) {
LOG.error("Encountered metadata exceptions on initializing metadata client driver", me);
throw new IOException("Failed to initialize metadata client driver", me);
}
// initialize event loop group
if (null == eventLoopGroup) {
this.eventLoopGroup = EventLoopUtil.getClientEventLoopGroup(conf,
new DefaultThreadFactory("bookkeeper-io"));
this.ownEventLoopGroup = true;
} else {
this.eventLoopGroup = eventLoopGroup;
this.ownEventLoopGroup = false;
}
if (byteBufAllocator != null) {
this.allocator = byteBufAllocator;
} else {
this.allocator = ByteBufAllocatorBuilder.create()
.poolingPolicy(conf.getAllocatorPoolingPolicy())
.poolingConcurrency(conf.getAllocatorPoolingConcurrency())
.outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
.build();
}
// initialize bookie client
this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.allocator, this.mainWorkerPool,
scheduler, rootStatsLogger);
if (null == requestTimer) {
this.requestTimer = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
conf.getTimeoutTimerNumTicks());
this.ownTimer = true;
} else {
this.requestTimer = requestTimer;
this.ownTimer = false;
}
// initialize the ensemble placement
this.placementPolicy = initializeEnsemblePlacementPolicy(conf,
dnsResolver, this.requestTimer, this.featureProvider, this.statsLogger);
this.bookieWatcher = new BookieWatcherImpl(
conf, this.placementPolicy, metadataDriver.getRegistrationClient(),
this.statsLogger.scope(WATCHER_SCOPE));
if (conf.getDiskWeightBasedPlacementEnabled()) {
LOG.info("Weighted ledger placement enabled");
ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder()
.setNameFormat("BKClientMetaDataPollScheduler-%d");
this.bookieInfoScheduler = Executors.newSingleThreadScheduledExecutor(tFBuilder.build());
this.bookieInfoReader = new BookieInfoReader(this, conf, this.bookieInfoScheduler);
this.bookieWatcher.initialBlockingBookieRead();
this.bookieInfoReader.start();
} else {
LOG.info("Weighted ledger placement is not enabled");
this.bookieInfoScheduler = null;
this.bookieInfoReader = new BookieInfoReader(this, conf, null);
this.bookieWatcher.initialBlockingBookieRead();
}
// initialize ledger manager
try {
this.ledgerManagerFactory =
this.metadataDriver.getLedgerManagerFactory();
} catch (MetadataException e) {
throw new IOException("Failed to initialize ledger manager factory", e);
}
this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
scheduleBookieHealthCheckIfEnabled(conf);
}
/**
* Allow to extend BookKeeper for mocking in unit tests.
*/
@VisibleForTesting
BookKeeper() {
conf = new ClientConfiguration();
internalConf = ClientInternalConf.fromConfig(conf);
statsLogger = NullStatsLogger.INSTANCE;
clientStats = BookKeeperClientStats.newInstance(statsLogger);
scheduler = null;
requestTimer = null;
metadataDriver = null;
placementPolicy = null;
ownTimer = false;
mainWorkerPool = null;
ledgerManagerFactory = null;
ledgerManager = null;
ledgerIdGenerator = null;
featureProvider = null;
eventLoopGroup = null;
bookieWatcher = null;
bookieInfoScheduler = null;
bookieClient = null;
allocator = UnpooledByteBufAllocator.DEFAULT;
}
private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
DNSToSwitchMapping dnsResolver,
HashedWheelTimer timer,
FeatureProvider featureProvider,
StatsLogger statsLogger)
throws IOException {
try {
Class<? extends EnsemblePlacementPolicy> policyCls = conf.getEnsemblePlacementPolicy();
return ReflectionUtils.newInstance(policyCls).initialize(conf, java.util.Optional.ofNullable(dnsResolver),
timer, featureProvider, statsLogger);
} catch (ConfigurationException e) {
throw new IOException("Failed to initialize ensemble placement policy : ", e);
}
}
int getReturnRc(int rc) {
return getReturnRc(bookieClient, rc);
}
static int getReturnRc(BookieClient bookieClient, int rc) {
if (BKException.Code.OK == rc) {
return rc;
} else {
if (bookieClient.isClosed()) {
return BKException.Code.ClientClosedException;
} else {
return rc;
}
}
}
void scheduleBookieHealthCheckIfEnabled(ClientConfiguration conf) {
if (conf.isBookieHealthCheckEnabled()) {
scheduler.scheduleAtFixedRate(new SafeRunnable() {
@Override
public void safeRun() {
checkForFaultyBookies();
}
}, conf.getBookieHealthCheckIntervalSeconds(), conf.getBookieHealthCheckIntervalSeconds(),
TimeUnit.SECONDS);
}
}
void checkForFaultyBookies() {
List<BookieSocketAddress> faultyBookies = bookieClient.getFaultyBookies();
for (BookieSocketAddress faultyBookie : faultyBookies) {
bookieWatcher.quarantineBookie(faultyBookie);
}
}
/**
* Returns ref to speculative read counter, needed in PendingReadOp.
*/
@VisibleForTesting
public LedgerManager getLedgerManager() {
return ledgerManager;
}
@VisibleForTesting
LedgerManager getUnderlyingLedgerManager() {
return ((CleanupLedgerManager) ledgerManager).getUnderlying();
}
@VisibleForTesting
LedgerIdGenerator getLedgerIdGenerator() {
return ledgerIdGenerator;
}
@VisibleForTesting
ReentrantReadWriteLock getCloseLock() {
return closeLock;
}
@VisibleForTesting
boolean isClosed() {
return closed;
}
@VisibleForTesting
BookieWatcher getBookieWatcher() {
return bookieWatcher;
}
public OrderedExecutor getMainWorkerPool() {
return mainWorkerPool;
}
@VisibleForTesting
OrderedScheduler getScheduler() {
return scheduler;
}
@VisibleForTesting
EnsemblePlacementPolicy getPlacementPolicy() {
return placementPolicy;
}
@VisibleForTesting
public MetadataClientDriver getMetadataClientDriver() {
return metadataDriver;
}
/**
* There are 3 digest types that can be used for verification. The CRC32 is
* cheap to compute but does not protect against byzantine bookies (i.e., a
* bookie might report fake bytes and a matching CRC32). The MAC code is more
* expensive to compute, but is protected by a password, i.e., a bookie can't
* report fake bytes with a mathching MAC unless it knows the password.
* The CRC32C, which use SSE processor instruction, has better performance than CRC32.
* Legacy DigestType for backward compatibility. If we want to add new DigestType,
* we should add it in here, client.api.DigestType and DigestType in DataFormats.proto.
* If the digest type is set/passed in as DUMMY, a dummy digest is added/checked.
* This DUMMY digest is mostly for test purposes or in situations/use-cases
* where digest is considered a overhead.
*/
public enum DigestType {
MAC, CRC32, CRC32C, DUMMY;
public static DigestType fromApiDigestType(org.apache.bookkeeper.client.api.DigestType digestType) {
switch (digestType) {
case MAC:
return DigestType.MAC;
case CRC32:
return DigestType.CRC32;
case CRC32C:
return DigestType.CRC32C;
case DUMMY:
return DigestType.DUMMY;
default:
throw new IllegalArgumentException("Unable to convert digest type " + digestType);
}
}
public static DataFormats.LedgerMetadataFormat.DigestType toProtoDigestType(DigestType digestType) {
switch (digestType) {
case MAC:
return DataFormats.LedgerMetadataFormat.DigestType.HMAC;
case CRC32:
return DataFormats.LedgerMetadataFormat.DigestType.CRC32;
case CRC32C:
return DataFormats.LedgerMetadataFormat.DigestType.CRC32C;
case DUMMY:
return DataFormats.LedgerMetadataFormat.DigestType.DUMMY;
default:
throw new IllegalArgumentException("Unable to convert digest type " + digestType);
}
}
public org.apache.bookkeeper.client.api.DigestType toApiDigestType() {
switch (this) {
case MAC:
return org.apache.bookkeeper.client.api.DigestType.MAC;
case CRC32:
return org.apache.bookkeeper.client.api.DigestType.CRC32;
case CRC32C:
return org.apache.bookkeeper.client.api.DigestType.CRC32C;
case DUMMY:
return org.apache.bookkeeper.client.api.DigestType.DUMMY;
default:
throw new IllegalArgumentException("Unable to convert digest type " + this);
}
}
}
ZooKeeper getZkHandle() {
return ((ZKMetadataClientDriver) metadataDriver).getZk();
}
protected ClientConfiguration getConf() {
return conf;
}
StatsLogger getStatsLogger() {
return statsLogger;
}
/**
* Get the BookieClient, currently used for doing bookie recovery.
*
* @return BookieClient for the BookKeeper instance.
*/
BookieClient getBookieClient() {
return bookieClient;
}
/**
* Retrieves BookieInfo from all the bookies in the cluster. It sends requests
* to all the bookies in parallel and returns the info from the bookies that responded.
* If there was an error in reading from any bookie, nothing will be returned for
* that bookie in the map.
* @return map
* A map of bookieSocketAddress to its BookiInfo
* @throws BKException
* @throws InterruptedException
*/
public Map<BookieSocketAddress, BookieInfo> getBookieInfo() throws BKException, InterruptedException {
return bookieInfoReader.getBookieInfo();
}
/**
* Creates a new ledger asynchronously. To create a ledger, we need to specify
* the ensemble size, the quorum size, the digest type, a password, a callback
* implementation, and an optional control object. The ensemble size is how
* many bookies the entries should be striped among and the quorum size is the
* degree of replication of each entry. The digest type is either a MAC or a
* CRC. Note that the CRC option is not able to protect a client against a
* bookie that replaces an entry. The password is used not only to
* authenticate access to a ledger, but also to verify entries in ledgers.
*
* @param ensSize
* number of bookies over which to stripe entries
* @param writeQuorumSize
* number of bookies each entry will be written to. each of these bookies
* must acknowledge the entry before the call is completed.
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @param cb
* createCallback implementation
* @param ctx
* optional control object
*/
public void asyncCreateLedger(final int ensSize,
final int writeQuorumSize,
final DigestType digestType,
final byte[] passwd, final CreateCallback cb, final Object ctx) {
asyncCreateLedger(ensSize, writeQuorumSize, writeQuorumSize,
digestType, passwd, cb, ctx, Collections.emptyMap());
}
/**
* Creates a new ledger asynchronously. Ledgers created with this call have
* a separate write quorum and ack quorum size. The write quorum must be larger than
* the ack quorum.
*
* <p>Separating the write and the ack quorum allows the BookKeeper client to continue
* writing when a bookie has failed but the failure has not yet been detected. Detecting
* a bookie has failed can take a number of seconds, as configured by the read timeout
* {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected,
* that bookie will be removed from the ensemble.
*
* <p>The other parameters match those of {@link #asyncCreateLedger(int, int, DigestType, byte[],
* AsyncCallback.CreateCallback, Object)}
*
* @param ensSize
* number of bookies over which to stripe entries
* @param writeQuorumSize
* number of bookies each entry will be written to
* @param ackQuorumSize
* number of bookies which must acknowledge an entry before the call is completed
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @param cb
* createCallback implementation
* @param ctx
* optional control object
* @param customMetadata
* optional customMetadata that holds user specified metadata
*/
public void asyncCreateLedger(final int ensSize, final int writeQuorumSize, final int ackQuorumSize,
final DigestType digestType, final byte[] passwd,
final CreateCallback cb, final Object ctx, final Map<String, byte[]> customMetadata) {
if (writeQuorumSize < ackQuorumSize) {
throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
}
closeLock.readLock().lock();
try {
if (closed) {
cb.createComplete(BKException.Code.ClientClosedException, null, ctx);
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata, WriteFlag.NONE, clientStats)
.initiate();
} finally {
closeLock.readLock().unlock();
}
}
/**
* Creates a new ledger. Default of 3 servers, and quorum of 2 servers.
*
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @return a handle to the newly created ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle createLedger(DigestType digestType, byte[] passwd)
throws BKException, InterruptedException {
return createLedger(3, 2, digestType, passwd);
}
/**
* Synchronous call to create ledger. Parameters match those of
* {@link #asyncCreateLedger(int, int, DigestType, byte[],
* AsyncCallback.CreateCallback, Object)}
*
* @param ensSize
* @param qSize
* @param digestType
* @param passwd
* @return a handle to the newly created ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle createLedger(int ensSize, int qSize,
DigestType digestType, byte[] passwd)
throws InterruptedException, BKException {
return createLedger(ensSize, qSize, qSize, digestType, passwd, Collections.emptyMap());
}
/**
* Synchronous call to create ledger. Parameters match those of
* {@link #asyncCreateLedger(int, int, DigestType, byte[],
* AsyncCallback.CreateCallback, Object)}
*
* @param ensSize
* @param writeQuorumSize
* @param ackQuorumSize
* @param digestType
* @param passwd
* @return a handle to the newly created ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize,
DigestType digestType, byte[] passwd)
throws InterruptedException, BKException {
return createLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, Collections.emptyMap());
}
/**
* Synchronous call to create ledger. Parameters match those of asyncCreateLedger
*
* @param ensSize
* @param writeQuorumSize
* @param ackQuorumSize
* @param digestType
* @param passwd
* @param customMetadata
* @return a handle to the newly created ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize,
DigestType digestType, byte[] passwd, final Map<String, byte[]> customMetadata)
throws InterruptedException, BKException {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
SyncCreateCallback result = new SyncCreateCallback(future);
/*
* Calls asynchronous version
*/
asyncCreateLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
result, null, customMetadata);
LedgerHandle lh = SyncCallbackUtils.waitForResult(future);
if (lh == null) {
LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
throw BKException.create(BKException.Code.UnexpectedConditionException);
}
return lh;
}
/**
* Synchronous call to create ledger.
* Creates a new ledger asynchronously and returns {@link LedgerHandleAdv} which can accept entryId.
* Parameters must match those of asyncCreateLedgerAdv
*
* @param ensSize
* @param writeQuorumSize
* @param ackQuorumSize
* @param digestType
* @param passwd
*
* @return a handle to the newly created ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize,
DigestType digestType, byte[] passwd)
throws InterruptedException, BKException {
return createLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize,
digestType, passwd, Collections.emptyMap());
}
/**
* Synchronous call to create ledger.
* Creates a new ledger asynchronously and returns {@link LedgerHandleAdv} which can accept entryId.
* Parameters must match those of asyncCreateLedgerAdv
*
* @param ensSize
* @param writeQuorumSize
* @param ackQuorumSize
* @param digestType
* @param passwd
* @param customMetadata
* @return a handle to the newly created ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize,
DigestType digestType, byte[] passwd, final Map<String, byte[]> customMetadata)
throws InterruptedException, BKException {
CompletableFuture<LedgerHandleAdv> future = new CompletableFuture<>();
SyncCreateAdvCallback result = new SyncCreateAdvCallback(future);
/*
* Calls asynchronous version
*/
asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
result, null, customMetadata);
LedgerHandle lh = SyncCallbackUtils.waitForResult(future);
if (lh == null) {
LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
throw BKException.create(BKException.Code.UnexpectedConditionException);
}
return lh;
}
/**
* Creates a new ledger asynchronously and returns {@link LedgerHandleAdv}
* which can accept entryId. Ledgers created with this call have ability to accept
* a separate write quorum and ack quorum size. The write quorum must be larger than
* the ack quorum.
*
* <p>Separating the write and the ack quorum allows the BookKeeper client to continue
* writing when a bookie has failed but the failure has not yet been detected. Detecting
* a bookie has failed can take a number of seconds, as configured by the read timeout
* {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected,
* that bookie will be removed from the ensemble.
*
* <p>The other parameters match those of {@link #asyncCreateLedger(int, int, DigestType, byte[],
* AsyncCallback.CreateCallback, Object)}
*
* @param ensSize
* number of bookies over which to stripe entries
* @param writeQuorumSize
* number of bookies each entry will be written to
* @param ackQuorumSize
* number of bookies which must acknowledge an entry before the call is completed
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @param cb
* createCallback implementation
* @param ctx
* optional control object
* @param customMetadata
* optional customMetadata that holds user specified metadata
*/
public void asyncCreateLedgerAdv(final int ensSize, final int writeQuorumSize, final int ackQuorumSize,
final DigestType digestType, final byte[] passwd, final CreateCallback cb, final Object ctx,
final Map<String, byte[]> customMetadata) {
if (writeQuorumSize < ackQuorumSize) {
throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
}
closeLock.readLock().lock();
try {
if (closed) {
cb.createComplete(BKException.Code.ClientClosedException, null, ctx);
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata, WriteFlag.NONE, clientStats)
.initiateAdv(-1L);
} finally {
closeLock.readLock().unlock();
}
}
/**
* Synchronously creates a new ledger using the interface which accepts a ledgerId as input.
* This method returns {@link LedgerHandleAdv} which can accept entryId.
* Parameters must match those of asyncCreateLedgerAdvWithLedgerId
* @param ledgerId
* @param ensSize
* @param writeQuorumSize
* @param ackQuorumSize
* @param digestType
* @param passwd
* @param customMetadata
* @return a handle to the newly created ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle createLedgerAdv(final long ledgerId,
int ensSize,
int writeQuorumSize,
int ackQuorumSize,
DigestType digestType,
byte[] passwd,
final Map<String, byte[]> customMetadata)
throws InterruptedException, BKException {
CompletableFuture<LedgerHandleAdv> future = new CompletableFuture<>();
SyncCreateAdvCallback result = new SyncCreateAdvCallback(future);
/*
* Calls asynchronous version
*/
asyncCreateLedgerAdv(ledgerId, ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
result, null, customMetadata);
LedgerHandle lh = SyncCallbackUtils.waitForResult(future);
if (lh == null) {
LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
throw BKException.create(BKException.Code.UnexpectedConditionException);
} else if (ledgerId != lh.getId()) {
LOG.error("Unexpected condition : Expected ledgerId: {} but got: {}", ledgerId, lh.getId());
throw BKException.create(BKException.Code.UnexpectedConditionException);
}
LOG.info("Ensemble: {} for ledger: {}", lh.getLedgerMetadata().getEnsembleAt(0L), lh.getId());
return lh;
}
/**
* Asynchronously creates a new ledger using the interface which accepts a ledgerId as input.
* This method returns {@link LedgerHandleAdv} which can accept entryId.
* Ledgers created with this call have ability to accept
* a separate write quorum and ack quorum size. The write quorum must be larger than
* the ack quorum.
*
* <p>Separating the write and the ack quorum allows the BookKeeper client to continue
* writing when a bookie has failed but the failure has not yet been detected. Detecting
* a bookie has failed can take a number of seconds, as configured by the read timeout
* {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected,
* that bookie will be removed from the ensemble.
*
* <p>The other parameters match those of asyncCreateLedger</p>
*
* @param ledgerId
* ledger Id to use for the newly created ledger
* @param ensSize
* number of bookies over which to stripe entries
* @param writeQuorumSize
* number of bookies each entry will be written to
* @param ackQuorumSize
* number of bookies which must acknowledge an entry before the call is completed
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @param cb
* createCallback implementation
* @param ctx
* optional control object
* @param customMetadata
* optional customMetadata that holds user specified metadata
*/
public void asyncCreateLedgerAdv(final long ledgerId,
final int ensSize,
final int writeQuorumSize,
final int ackQuorumSize,
final DigestType digestType,
final byte[] passwd,
final CreateCallback cb,
final Object ctx,
final Map<String, byte[]> customMetadata) {
if (writeQuorumSize < ackQuorumSize) {
throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
}
closeLock.readLock().lock();
try {
if (closed) {
cb.createComplete(BKException.Code.ClientClosedException, null, ctx);
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
ackQuorumSize, digestType, passwd, cb, ctx,
customMetadata, WriteFlag.NONE, clientStats)
.initiateAdv(ledgerId);
} finally {
closeLock.readLock().unlock();
}
}
/**
* Open existing ledger asynchronously for reading.
*
* <p>Opening a ledger with this method invokes fencing and recovery on the ledger
* if the ledger has not been closed. Fencing will block all other clients from
* writing to the ledger. Recovery will make sure that the ledger is closed
* before reading from it.
*
* <p>Recovery also makes sure that any entries which reached one bookie, but not a
* quorum, will be replicated to a quorum of bookies. This occurs in cases were
* the writer of a ledger crashes after sending a write request to one bookie but
* before being able to send it to the rest of the bookies in the quorum.
*
* <p>If the ledger is already closed, neither fencing nor recovery will be applied.
*
* @see LedgerHandle#asyncClose
*
* @param lId
* ledger identifier
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @param ctx
* optional control object
*/
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
final OpenCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
if (closed) {
cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
return;
}
new LedgerOpenOp(BookKeeper.this, clientStats,
lId, digestType, passwd, cb, ctx).initiate();
} finally {
closeLock.readLock().unlock();
}
}
/**
* Open existing ledger asynchronously for reading, but it does not try to
* recover the ledger if it is not yet closed. The application needs to use
* it carefully, since the writer might have crashed and ledger will remain
* unsealed forever if there is no external mechanism to detect the failure
* of the writer and the ledger is not open in a safe manner, invoking the
* recovery procedure.
*
* <p>Opening a ledger without recovery does not fence the ledger. As such, other
* clients can continue to write to the ledger.
*
* <p>This method returns a read only ledger handle. It will not be possible
* to add entries to the ledger. Any attempt to add entries will throw an
* exception.
*
* <p>Reads from the returned ledger will be able to read entries up until
* the lastConfirmedEntry at the point in time at which the ledger was opened.
* If an attempt is made to read beyond the ledger handle's LAC, an attempt is made
* to get the latest LAC from bookies or metadata, and if the entry_id of the read request
* is less than or equal to the new LAC, read will be allowed to proceed.
*
* @param lId
* ledger identifier
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @param ctx
* optional control object
*/
public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final byte[] passwd,
final OpenCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
if (closed) {
cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
return;
}
new LedgerOpenOp(BookKeeper.this, clientStats,
lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
} finally {
closeLock.readLock().unlock();
}
}
/**
* Synchronous open ledger call.
*
* @see #asyncOpenLedger
* @param lId
* ledger identifier
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @return a handle to the open ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd)
throws BKException, InterruptedException {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
SyncOpenCallback result = new SyncOpenCallback(future);
/*
* Calls async open ledger
*/
asyncOpenLedger(lId, digestType, passwd, result, null);
return SyncCallbackUtils.waitForResult(future);
}
/**
* Synchronous, unsafe open ledger call.
*
* @see #asyncOpenLedgerNoRecovery
* @param lId
* ledger identifier
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @return a handle to the open ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd)
throws BKException, InterruptedException {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
SyncOpenCallback result = new SyncOpenCallback(future);
/*
* Calls async open ledger
*/
asyncOpenLedgerNoRecovery(lId, digestType, passwd,
result, null);
return SyncCallbackUtils.waitForResult(future);
}
/**
* Deletes a ledger asynchronously.
*
* @param lId
* ledger Id
* @param cb
* deleteCallback implementation
* @param ctx
* optional control object
*/
public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
if (closed) {
cb.deleteComplete(BKException.Code.ClientClosedException, ctx);
return;
}
new LedgerDeleteOp(BookKeeper.this, clientStats, lId, cb, ctx).initiate();
} finally {
closeLock.readLock().unlock();
}
}
/**
* Synchronous call to delete a ledger. Parameters match those of
* {@link #asyncDeleteLedger(long, AsyncCallback.DeleteCallback, Object)}
*
* @param lId
* ledgerId
* @throws InterruptedException
* @throws BKException
*/
public void deleteLedger(long lId) throws InterruptedException, BKException {
CompletableFuture<Void> future = new CompletableFuture<>();
SyncDeleteCallback result = new SyncDeleteCallback(future);
// Call asynchronous version
asyncDeleteLedger(lId, result, null);
SyncCallbackUtils.waitForResult(future);
}
/**
* Check asynchronously whether the ledger with identifier <i>lId</i>
* has been closed.
*
* @param lId ledger identifier
* @param cb callback method
*/
public void asyncIsClosed(long lId, final IsClosedCallback cb, final Object ctx){
ledgerManager.readLedgerMetadata(lId).whenComplete((metadata, exception) -> {
if (exception == null) {
cb.isClosedComplete(BKException.Code.OK, metadata.getValue().isClosed(), ctx);
} else {
cb.isClosedComplete(BKException.getExceptionCode(exception), false, ctx);
}
});
}
/**
* Check whether the ledger with identifier <i>lId</i>
* has been closed.
*
* @param lId
* @return boolean true if ledger has been closed
* @throws BKException
*/
public boolean isClosed(long lId)
throws BKException, InterruptedException {
final class Result {
int rc;
boolean isClosed;
final CountDownLatch notifier = new CountDownLatch(1);
}
final Result result = new Result();
final IsClosedCallback cb = new IsClosedCallback(){
@Override
public void isClosedComplete(int rc, boolean isClosed, Object ctx){
result.isClosed = isClosed;
result.rc = rc;
result.notifier.countDown();
}
};
/*
* Call asynchronous version of isClosed
*/
asyncIsClosed(lId, cb, null);
/*
* Wait for callback
*/
result.notifier.await();
if (result.rc != BKException.Code.OK) {
throw BKException.create(result.rc);
}
return result.isClosed;
}
/**
* Shuts down client.
*
*/
@Override
public void close() throws BKException, InterruptedException {
closeLock.writeLock().lock();
try {
if (closed) {
return;
}
closed = true;
} finally {
closeLock.writeLock().unlock();
}
// Close bookie client so all pending bookie requests would be failed
// which will reject any incoming bookie requests.
bookieClient.close();
try {
// Close ledger manage so all pending metadata requests would be failed
// which will reject any incoming metadata requests.
ledgerManager.close();
ledgerIdGenerator.close();
} catch (IOException ie) {
LOG.error("Failed to close ledger manager : ", ie);
}
// Close the scheduler
scheduler.shutdown();
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The scheduler did not shutdown cleanly");
}
mainWorkerPool.shutdown();
if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The mainWorkerPool did not shutdown cleanly");
}
if (this.bookieInfoScheduler != null) {
this.bookieInfoScheduler.shutdown();
if (!bookieInfoScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The bookieInfoScheduler did not shutdown cleanly");
}
}
if (ownTimer) {
requestTimer.stop();
}
if (ownEventLoopGroup) {
eventLoopGroup.shutdownGracefully();
}
this.metadataDriver.close();
}
@Override
public CreateBuilder newCreateLedgerOp() {
return new LedgerCreateOp.CreateBuilderImpl(this);
}
@Override
public OpenBuilder newOpenLedgerOp() {
return new LedgerOpenOp.OpenBuilderImpl(this);
}
@Override
public DeleteBuilder newDeleteLedgerOp() {
return new LedgerDeleteOp.DeleteBuilderImpl(this);
}
private final ClientContext clientCtx = new ClientContext() {
@Override
public ClientInternalConf getConf() {
return internalConf;
}
@Override
public LedgerManager getLedgerManager() {
return BookKeeper.this.getLedgerManager();
}
@Override
public BookieWatcher getBookieWatcher() {
return BookKeeper.this.getBookieWatcher();
}
@Override
public EnsemblePlacementPolicy getPlacementPolicy() {
return BookKeeper.this.getPlacementPolicy();
}
@Override
public BookieClient getBookieClient() {
return BookKeeper.this.getBookieClient();
}
@Override
public OrderedExecutor getMainWorkerPool() {
return BookKeeper.this.getMainWorkerPool();
}
@Override
public OrderedScheduler getScheduler() {
return BookKeeper.this.getScheduler();
}
@Override
public BookKeeperClientStats getClientStats() {
return clientStats;
}
@Override
public boolean isClientClosed() {
return BookKeeper.this.isClosed();
}
@Override
public ByteBufAllocator getByteBufAllocator() {
return allocator;
}
};
ClientContext getClientCtx() {
return clientCtx;
}
}