blob: fd8ec2dee277a5e8543a41371e3ed5c882ecdcce [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider;
import com.twitter.distributedlog.bk.LedgerAllocator;
import com.twitter.distributedlog.bk.LedgerAllocatorDelegator;
import com.twitter.distributedlog.bk.QuorumConfigProvider;
import com.twitter.distributedlog.bk.SimpleLedgerAllocator;
import com.twitter.distributedlog.callback.LogSegmentListener;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LogEmptyException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.BKDLConfig;
import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.subscription.SubscriptionStateStore;
import com.twitter.distributedlog.subscription.SubscriptionsStore;
import com.twitter.distributedlog.subscription.ZKSubscriptionStateStore;
import com.twitter.distributedlog.subscription.ZKSubscriptionsStore;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.MonitoredFuturePool;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.PermitLimiter;
import com.twitter.distributedlog.util.PermitManager;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.ExceptionalFunction;
import com.twitter.util.ExceptionalFunction0;
import com.twitter.util.ExecutorServiceFuturePool;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FuturePool;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* <h3>Metrics</h3>
* <ul>
* <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`.
* See {@link BKAsyncLogWriter} for detail stats.
* <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`.
* See {@link BKAsyncLogReaderDLSN} for detail stats.
* <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under
* scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats.
* <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under
* scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats.
* <li> `lock/*`: metrics about the locks used by writers. See {@link DistributedLock} for detail
* stats.
* <li> `read_lock/*`: metrics about the locks used by readers. See {@link DistributedLock} for
* detail stats.
* <li> `logsegments/*`: metrics about basic operations on log segments. See {@link BKLogHandler} for details.
* <li> `segments/*`: metrics about write operations on log segments. See {@link BKLogWriteHandler} for details.
* <li> `readahead_worker/*`: metrics about readahead workers used by readers. See {@link BKLogReadHandler}
* for details.
* </ul>
*/
class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedLogManager {
static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
static void createLog(DistributedLogConfiguration conf, ZooKeeperClient zkc, URI uri, String streamName)
throws IOException, InterruptedException {
Future<ZKLogMetadataForWriter> createFuture = ZKLogMetadataForWriter.of(
uri, streamName, conf.getUnpartitionedStreamName(), zkc.get(), zkc.getDefaultACL(), true, true);
FutureUtils.result(createFuture);
}
static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION =
new Function<LogRecordWithDLSN, Long>() {
@Override
public Long apply(LogRecordWithDLSN record) {
return record.getTransactionId();
}
};
static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION =
new Function<LogRecordWithDLSN, DLSN>() {
@Override
public DLSN apply(LogRecordWithDLSN record) {
return record.getDlsn();
}
};
private final String clientId;
private final int regionId;
private final String streamIdentifier;
private final DistributedLogConfiguration conf;
private final DynamicDistributedLogConfiguration dynConf;
private Promise<Void> closePromise;
private final OrderedScheduler scheduler;
private final OrderedScheduler readAheadScheduler;
private boolean ownExecutor;
private final FeatureProvider featureProvider;
private final StatsLogger statsLogger;
private final StatsLogger perLogStatsLogger;
private final AlertStatsLogger alertStatsLogger;
// lock factory
private SessionLockFactory lockFactory = null;
// log segment metadata stores
private final LogSegmentMetadataStore writerMetadataStore;
private final LogSegmentMetadataStore readerMetadataStore;
// bookkeeper clients
// NOTE: The actual bookkeeper client is initialized lazily when it is referenced by
// {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to
// keep builders and their client wrappers here, as they will be used when
// instantiating readers or writers.
private final BookKeeperClientBuilder writerBKCBuilder;
private final BookKeeperClient writerBKC;
private final boolean ownWriterBKC;
private final BookKeeperClientBuilder readerBKCBuilder;
private final BookKeeperClient readerBKC;
private final boolean ownReaderBKC;
//
// Writer Related Variables
//
private final LedgerAllocator ledgerAllocator;
private final PermitLimiter writeLimiter;
// Log Segment Rolling Manager to control rolling speed
private final PermitManager logSegmentRollingPermitManager;
private OrderedScheduler lockStateExecutor = null;
//
// Reader Related Variables
///
// read handler for listener.
private BKLogReadHandler readHandlerForListener = null;
private FuturePool readerFuturePool = null;
private final PendingReaders pendingReaders;
private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
/**
* Create a DLM for testing.
*
* @param name log name
* @param conf distributedlog configuration
* @param uri uri location for the log
* @param writerZKCBuilder zookeeper builder for writers
* @param readerZKCBuilder zookeeper builder for readers
* @param zkcForWriterBKC zookeeper builder for bookkeeper shared by writers
* @param zkcForReaderBKC zookeeper builder for bookkeeper shared by readers
* @param writerBKCBuilder bookkeeper builder for writers
* @param readerBKCBuilder bookkeeper builder for readers
* @param featureProvider provider to offer features
* @param writeLimiter write limiter
* @param statsLogger stats logger to receive stats
* @throws IOException
*/
BKDistributedLogManager(String name,
DistributedLogConfiguration conf,
URI uri,
ZooKeeperClientBuilder writerZKCBuilder,
ZooKeeperClientBuilder readerZKCBuilder,
ZooKeeperClient zkcForWriterBKC,
ZooKeeperClient zkcForReaderBKC,
BookKeeperClientBuilder writerBKCBuilder,
BookKeeperClientBuilder readerBKCBuilder,
FeatureProvider featureProvider,
PermitLimiter writeLimiter,
StatsLogger statsLogger) throws IOException {
this(name,
conf,
ConfUtils.getConstDynConf(conf),
uri,
writerZKCBuilder,
readerZKCBuilder,
zkcForWriterBKC,
zkcForReaderBKC,
writerBKCBuilder,
readerBKCBuilder,
null,
null,
null,
OrderedScheduler.newBuilder().name("BKDL-" + name).corePoolSize(1).build(),
null,
null,
null,
null,
new ReadAheadExceptionsLogger(statsLogger),
DistributedLogConstants.UNKNOWN_CLIENT_ID,
DistributedLogConstants.LOCAL_REGION_ID,
null,
writeLimiter,
PermitManager.UNLIMITED_PERMIT_MANAGER,
featureProvider,
statsLogger,
NullStatsLogger.INSTANCE);
this.ownExecutor = true;
}
/**
* Create a {@link DistributedLogManager} with supplied resources.
*
* @param name log name
* @param conf distributedlog configuration
* @param uri uri location for the log
* @param writerZKCBuilder zookeeper builder for writers
* @param readerZKCBuilder zookeeper builder for readers
* @param zkcForWriterBKC zookeeper builder for bookkeeper shared by writers
* @param zkcForReaderBKC zookeeper builder for bookkeeper shared by readers
* @param writerBKCBuilder bookkeeper builder for writers
* @param readerBKCBuilder bookkeeper builder for readers
* @param lockFactory distributed lock factory
* @param writerMetadataStore writer metadata store
* @param readerMetadataStore reader metadata store
* @param scheduler ordered scheduled used by readers and writers
* @param readAheadScheduler readAhead scheduler used by readers
* @param lockStateExecutor ordered scheduled used by locks to execute lock actions
* @param channelFactory client socket channel factory to build bookkeeper clients
* @param requestTimer request timer to build bookkeeper clients
* @param readAheadExceptionsLogger stats logger to record readahead exceptions
* @param clientId client id that used to initiate the locks
* @param regionId region id that would be encrypted as part of log segment metadata
* to indicate which region that the log segment will be created
* @param ledgerAllocator ledger allocator to allocate ledgers
* @param featureProvider provider to offer features
* @param writeLimiter write limiter
* @param statsLogger stats logger to receive stats
* @param perLogStatsLogger stats logger to receive per log stats
* @throws IOException
*/
BKDistributedLogManager(String name,
DistributedLogConfiguration conf,
DynamicDistributedLogConfiguration dynConf,
URI uri,
ZooKeeperClientBuilder writerZKCBuilder,
ZooKeeperClientBuilder readerZKCBuilder,
ZooKeeperClient zkcForWriterBKC,
ZooKeeperClient zkcForReaderBKC,
BookKeeperClientBuilder writerBKCBuilder,
BookKeeperClientBuilder readerBKCBuilder,
SessionLockFactory lockFactory,
LogSegmentMetadataStore writerMetadataStore,
LogSegmentMetadataStore readerMetadataStore,
OrderedScheduler scheduler,
OrderedScheduler readAheadScheduler,
OrderedScheduler lockStateExecutor,
ClientSocketChannelFactory channelFactory,
HashedWheelTimer requestTimer,
ReadAheadExceptionsLogger readAheadExceptionsLogger,
String clientId,
Integer regionId,
LedgerAllocator ledgerAllocator,
PermitLimiter writeLimiter,
PermitManager logSegmentRollingPermitManager,
FeatureProvider featureProvider,
StatsLogger statsLogger,
StatsLogger perLogStatsLogger) throws IOException {
super(name, conf, uri, writerZKCBuilder, readerZKCBuilder, statsLogger);
Preconditions.checkNotNull(readAheadExceptionsLogger, "No ReadAhead Stats Logger Provided.");
this.conf = conf;
this.dynConf = dynConf;
this.scheduler = scheduler;
this.lockFactory = lockFactory;
this.lockStateExecutor = lockStateExecutor;
this.readAheadScheduler = null == readAheadScheduler ? scheduler : readAheadScheduler;
this.statsLogger = statsLogger;
this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger);
this.ownExecutor = false;
this.pendingReaders = new PendingReaders(scheduler);
this.regionId = regionId;
this.clientId = clientId;
this.streamIdentifier = conf.getUnpartitionedStreamName();
this.ledgerAllocator = ledgerAllocator;
this.writeLimiter = writeLimiter;
this.logSegmentRollingPermitManager = logSegmentRollingPermitManager;
if (null == writerMetadataStore) {
this.writerMetadataStore = new ZKLogSegmentMetadataStore(conf, writerZKC, scheduler);
} else {
this.writerMetadataStore = writerMetadataStore;
}
if (null == readerMetadataStore) {
this.readerMetadataStore = new ZKLogSegmentMetadataStore(conf, readerZKC, scheduler);
} else {
this.readerMetadataStore = readerMetadataStore;
}
// create the bkc for writers
if (null == writerBKCBuilder) {
// resolve uri
BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, uri);
BKDLConfig.propagateConfiguration(bkdlConfig, conf);
this.writerBKCBuilder = BookKeeperClientBuilder.newBuilder()
.dlConfig(conf)
.name(String.format("bk:%s:dlm_writer_shared", name))
.ledgersPath(bkdlConfig.getBkLedgersPath())
.channelFactory(channelFactory)
.requestTimer(requestTimer)
.statsLogger(statsLogger);
if (null == zkcForWriterBKC) {
this.writerBKCBuilder.zkServers(bkdlConfig.getBkZkServersForWriter());
} else {
this.writerBKCBuilder.zkc(zkcForWriterBKC);
}
this.ownWriterBKC = true;
} else {
this.writerBKCBuilder = writerBKCBuilder;
this.ownWriterBKC = false;
}
this.writerBKC = this.writerBKCBuilder.build();
// create the bkc for readers
if (null == readerBKCBuilder) {
// resolve uri
BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, uri);
BKDLConfig.propagateConfiguration(bkdlConfig, conf);
if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) {
this.readerBKCBuilder = this.writerBKCBuilder;
this.ownReaderBKC = false;
} else {
this.readerBKCBuilder = BookKeeperClientBuilder.newBuilder()
.dlConfig(conf)
.name(String.format("bk:%s:dlm_reader_shared", name))
.ledgersPath(bkdlConfig.getBkLedgersPath())
.channelFactory(channelFactory)
.requestTimer(requestTimer)
.statsLogger(statsLogger);
if (null == zkcForReaderBKC) {
this.readerBKCBuilder.zkServers(bkdlConfig.getBkZkServersForReader());
} else {
this.readerBKCBuilder.zkc(zkcForReaderBKC);
}
this.ownReaderBKC = true;
}
} else {
this.readerBKCBuilder = readerBKCBuilder;
this.ownReaderBKC = false;
}
this.readerBKC = this.readerBKCBuilder.build();
// Feature Provider
this.featureProvider = featureProvider;
// Stats
this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert");
this.readAheadExceptionsLogger = readAheadExceptionsLogger;
}
private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
if (createIfNull && null == lockStateExecutor && ownExecutor) {
lockStateExecutor = OrderedScheduler.newBuilder()
.corePoolSize(1).name("BKDL-LockState").build();
}
return lockStateExecutor;
}
private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
if (createIfNull && null == lockFactory) {
lockFactory = new ZKSessionLockFactory(
writerZKC,
clientId,
getLockStateExecutor(createIfNull),
conf.getZKNumRetries(),
conf.getLockTimeoutMilliSeconds(),
conf.getZKRetryBackoffStartMillis(),
statsLogger);
}
return lockFactory;
}
DistributedLogConfiguration getConf() {
return conf;
}
OrderedScheduler getScheduler() {
return scheduler;
}
@VisibleForTesting
BookKeeperClient getWriterBKC() {
return this.writerBKC;
}
@VisibleForTesting
BookKeeperClient getReaderBKC() {
return this.readerBKC;
}
@VisibleForTesting
FuturePool getReaderFuturePool() {
return this.readerFuturePool;
}
@VisibleForTesting
FeatureProvider getFeatureProvider() {
return this.featureProvider;
}
private synchronized BKLogReadHandler getReadHandlerForListener(boolean create) {
if (null == readHandlerForListener && create) {
readHandlerForListener = createReadHandler();
readHandlerForListener.scheduleGetLedgersTask(true, true);
}
return readHandlerForListener;
}
@Override
public List<LogSegmentMetadata> getLogSegments() throws IOException {
return FutureUtils.result(getLogSegmentsAsync());
}
protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() {
final BKLogReadHandler readHandler = createReadHandler();
return readHandler.asyncGetFullLedgerList(true, false).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
readHandler.asyncClose();
return BoxedUnit.UNIT;
}
});
}
@Override
public void registerListener(LogSegmentListener listener) throws IOException {
BKLogReadHandler readHandler = getReadHandlerForListener(true);
readHandler.registerListener(listener);
}
@Override
public synchronized void unregisterListener(LogSegmentListener listener) {
if (null != readHandlerForListener) {
readHandlerForListener.unregisterListener(listener);
}
}
public void checkClosedOrInError(String operation) throws AlreadyClosedException {
if (null != closePromise) {
throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager");
}
if (null != writerBKC) {
writerBKC.checkClosedOrInError();
}
if (null != readerBKC) {
readerBKC.checkClosedOrInError();
}
}
// Create Read Handler
synchronized BKLogReadHandler createReadHandler() {
Optional<String> subscriberId = Optional.absent();
return createReadHandler(subscriberId, false);
}
synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId) {
return createReadHandler(subscriberId, false);
}
synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
boolean isHandleForReading) {
return createReadHandler(
subscriberId,
getLockStateExecutor(true),
null,
true, /* deserialize record set */
isHandleForReading);
}
synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
OrderedScheduler lockExecutor,
AsyncNotification notification,
boolean deserializeRecordSet,
boolean isHandleForReading) {
ZKLogMetadataForReader logMetadata = ZKLogMetadataForReader.of(uri, name, streamIdentifier);
return new BKLogReadHandler(
logMetadata,
subscriberId,
conf,
dynConf,
readerZKCBuilder,
readerBKCBuilder,
readerMetadataStore,
scheduler,
lockExecutor,
readAheadScheduler,
alertStatsLogger,
readAheadExceptionsLogger,
statsLogger,
perLogStatsLogger,
clientId,
notification,
isHandleForReading,
deserializeRecordSet);
}
// Create Ledger Allocator
LedgerAllocator createLedgerAllocator(ZKLogMetadataForWriter logMetadata) throws IOException {
LedgerAllocator ledgerAllocatorDelegator;
if (!dynConf.getEnableLedgerAllocatorPool()) {
QuorumConfigProvider quorumConfigProvider =
new DynamicQuorumConfigProvider(dynConf);
LedgerAllocator allocator = new SimpleLedgerAllocator(
logMetadata.getAllocationPath(),
logMetadata.getAllocationData(),
quorumConfigProvider,
writerZKC,
writerBKC);
ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, true);
} else {
ledgerAllocatorDelegator = ledgerAllocator;
}
return ledgerAllocatorDelegator;
}
// Create Write Handler
public BKLogWriteHandler createWriteHandler(boolean lockHandler)
throws IOException {
return FutureUtils.result(asyncCreateWriteHandler(lockHandler));
}
Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) {
final ZooKeeper zk;
try {
zk = writerZKC.get();
} catch (InterruptedException e) {
LOG.error("Failed to initialize zookeeper client : ", e);
return Future.exception(new DLInterruptedException("Failed to initialize zookeeper client", e));
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
return Future.exception(FutureUtils.zkException(e, uri.getPath()));
}
boolean ownAllocator = null == ledgerAllocator;
// Fetching Log Metadata
Future<ZKLogMetadataForWriter> metadataFuture =
ZKLogMetadataForWriter.of(uri, name, streamIdentifier,
zk, writerZKC.getDefaultACL(),
ownAllocator, conf.getCreateStreamIfNotExists() || ownAllocator);
return metadataFuture.flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() {
@Override
public Future<BKLogWriteHandler> apply(ZKLogMetadataForWriter logMetadata) {
Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>();
createWriteHandler(logMetadata, lockHandler, createPromise);
return createPromise;
}
});
}
private void createWriteHandler(ZKLogMetadataForWriter logMetadata,
boolean lockHandler,
final Promise<BKLogWriteHandler> createPromise) {
OrderedScheduler lockStateExecutor = getLockStateExecutor(true);
// Build the locks
DistributedLock lock = new DistributedLock(
lockStateExecutor,
getLockFactory(true),
logMetadata.getLockPath(),
conf.getLockTimeoutMilliSeconds(),
statsLogger);
// Build the ledger allocator
LedgerAllocator allocator;
try {
allocator = createLedgerAllocator(logMetadata);
} catch (IOException e) {
FutureUtils.setException(createPromise, e);
return;
}
// Make sure writer handler created before resources are initialized
final BKLogWriteHandler writeHandler = new BKLogWriteHandler(
logMetadata,
conf,
writerZKCBuilder,
writerBKCBuilder,
writerMetadataStore,
scheduler,
allocator,
statsLogger,
perLogStatsLogger,
alertStatsLogger,
clientId,
regionId,
writeLimiter,
featureProvider,
dynConf,
lock);
PermitManager manager = getLogSegmentRollingPermitManager();
if (manager instanceof Watcher) {
writeHandler.register((Watcher) manager);
}
if (lockHandler) {
writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() {
@Override
public void onSuccess(DistributedLock lock) {
FutureUtils.setValue(createPromise, writeHandler);
}
@Override
public void onFailure(final Throwable cause) {
writeHandler.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
FutureUtils.setException(createPromise, cause);
return BoxedUnit.UNIT;
}
});
}
});
} else {
FutureUtils.setValue(createPromise, writeHandler);
}
}
PermitManager getLogSegmentRollingPermitManager() {
return logSegmentRollingPermitManager;
}
<T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) {
initializeFuturePool(false);
return readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() {
@Override
public BKLogReadHandler applyE() throws Throwable {
return getReadHandlerForListener(true);
}
}).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
@Override
public Future<T> applyE(final BKLogReadHandler readHandler) throws Throwable {
return func.apply(readHandler);
}
});
}
/**
* Check if an end of stream marker was added to the stream
* A stream with an end of stream marker cannot be appended to
*
* @return true if the marker was added to the stream, false otherwise
*/
@Override
public boolean isEndOfStreamMarked() throws IOException {
checkClosedOrInError("isEndOfStreamMarked");
long lastTxId = FutureUtils.result(getLastLogRecordAsyncInternal(false, true)).getTransactionId();
return lastTxId == DistributedLogConstants.MAX_TXID;
}
/**
* Begin appending to the end of the log stream which is being treated as a sequence of bytes
*
* @return the writer interface to generate log records
*/
public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException {
long position;
try {
position = FutureUtils.result(getLastLogRecordAsyncInternal(true, false)).getTransactionId();
if (DistributedLogConstants.INVALID_TXID == position ||
DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID == position) {
position = 0;
}
} catch (LogEmptyException ex) {
position = 0;
} catch (LogNotFoundException ex) {
position = 0;
}
return new AppendOnlyStreamWriter(startAsyncLogSegmentNonPartitioned(), position);
}
/**
* Get a reader to read a log stream as a sequence of bytes
*
* @return the writer interface to generate log records
*/
public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
return new AppendOnlyStreamReader(this);
}
/**
* Begin writing to the log stream identified by the name
*
* @return the writer interface to generate log records
*/
@Override
public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException {
checkClosedOrInError("startLogSegmentNonPartitioned");
BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this);
boolean success = false;
try {
writer.createAndCacheWriteHandler();
BKLogWriteHandler writeHandler = writer.getWriteHandler();
FutureUtils.result(writeHandler.lockHandler());
success = true;
return writer;
} finally {
if (!success) {
writer.abort();
}
}
}
/**
* Begin writing to the log stream identified by the name
*
* @return the writer interface to generate log records
*/
@Override
public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
}
@Override
public Future<AsyncLogWriter> openAsyncLogWriter() {
try {
checkClosedOrInError("startLogSegmentNonPartitioned");
} catch (AlreadyClosedException e) {
return Future.exception(e);
}
Future<BKLogWriteHandler> createWriteHandleFuture;
synchronized (this) {
// 1. create the locked write handler
createWriteHandleFuture = asyncCreateWriteHandler(true);
}
return createWriteHandleFuture.flatMap(new AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() {
@Override
public Future<AsyncLogWriter> apply(final BKLogWriteHandler writeHandler) {
final BKAsyncLogWriter writer;
synchronized (BKDistributedLogManager.this) {
// 2. create the writer with the handler
writer = new BKAsyncLogWriter(
conf,
dynConf,
BKDistributedLogManager.this,
writeHandler,
featureProvider,
statsLogger);
}
// 3. recover the incomplete log segments
return writeHandler.recoverIncompleteLogSegments()
.map(new AbstractFunction1<Long, AsyncLogWriter>() {
@Override
public AsyncLogWriter apply(Long lastTxId) {
// 4. update last tx id if successfully recovered
writer.setLastTxId(lastTxId);
return writer;
}
}).onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
@Override
public BoxedUnit apply(Throwable cause) {
// 5. close the writer if recovery failed
writer.asyncAbort();
return BoxedUnit.UNIT;
}
});
}
});
}
@Override
public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() {
@Override
public Future<DLSN> apply(List<LogSegmentMetadata> segments) {
return getDLSNNotLessThanTxId(fromTxnId, segments);
}
});
}
private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
final List<LogSegmentMetadata> segments) {
if (segments.isEmpty()) {
return getLastDLSNAsync();
}
final int segmentIdx = DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId);
if (segmentIdx < 0) {
return Future.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
}
final LedgerHandleCache handleCache =
LedgerHandleCache.newBuilder().bkc(readerBKC).conf(conf).build();
return getDLSNNotLessThanTxIdInSegment(
fromTxnId,
segmentIdx,
segments,
handleCache
).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
handleCache.clear();
return BoxedUnit.UNIT;
}
});
}
private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId,
final int segmentIdx,
final List<LogSegmentMetadata> segments,
final LedgerHandleCache handleCache) {
final LogSegmentMetadata segment = segments.get(segmentIdx);
return ReadUtils.getLogRecordNotLessThanTxId(
name,
segment,
fromTxnId,
scheduler,
handleCache,
Math.max(2, dynConf.getReadAheadBatchSize())
).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, Future<DLSN>>() {
@Override
public Future<DLSN> apply(Optional<LogRecordWithDLSN> foundRecord) {
if (foundRecord.isPresent()) {
return Future.value(foundRecord.get().getDlsn());
}
if ((segments.size() - 1) == segmentIdx) {
return getLastLogRecordAsync().map(new AbstractFunction1<LogRecordWithDLSN, DLSN>() {
@Override
public DLSN apply(LogRecordWithDLSN record) {
if (record.getTransactionId() >= fromTxnId) {
return record.getDlsn();
}
return record.getDlsn().getNextDLSN();
}
});
} else {
return getDLSNNotLessThanTxIdInSegment(
fromTxnId,
segmentIdx + 1,
segments,
handleCache);
}
}
});
}
/**
* Get the input stream starting with fromTxnId for the specified log
*
* @param fromTxnId - the first transaction id we want to read
* @return the stream starting with transaction fromTxnId
* @throws IOException if a stream cannot be found.
*/
@Override
public LogReader getInputStream(long fromTxnId)
throws IOException {
return getInputStreamInternal(fromTxnId);
}
@Override
public LogReader getInputStream(DLSN fromDLSN) throws IOException {
return getInputStreamInternal(fromDLSN, Optional.<Long>absent());
}
@Override
public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException {
return FutureUtils.result(openAsyncLogReader(fromTxnId));
}
/**
* Opening a log reader positioning by transaction id <code>fromTxnId</code>.
*
* <p>
* - retrieve log segments for the stream
* - if the log segment list is empty, positioning by the last dlsn
* - otherwise, find the first log segment that contains the records whose transaction ids are not less than
* the provided transaction id <code>fromTxnId</code>
* - if all log segments' records' transaction ids are more than <code>fromTxnId</code>, positioning
* on the first record.
* - otherwise, search the log segment to find the log record
* - if the log record is found, positioning the reader by that found record's dlsn
* - otherwise, positioning by the last dlsn
* </p>
*
* @see DLUtils#findLogSegmentNotLessThanTxnId(List, long)
* @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, long, ExecutorService, LedgerHandleCache, int)
* @param fromTxnId
* transaction id to start reading from
* @return future representing the open result.
*/
@Override
public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) {
final Promise<DLSN> dlsnPromise = new Promise<DLSN>();
getDLSNNotLessThanTxId(fromTxnId).addEventListener(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN dlsn) {
dlsnPromise.setValue(dlsn);
}
@Override
public void onFailure(Throwable cause) {
if (cause instanceof LogEmptyException) {
dlsnPromise.setValue(DLSN.InitialDLSN);
} else {
dlsnPromise.setException(cause);
}
}
});
return dlsnPromise.flatMap(new AbstractFunction1<DLSN, Future<AsyncLogReader>>() {
@Override
public Future<AsyncLogReader> apply(DLSN dlsn) {
return openAsyncLogReader(dlsn);
}
});
}
@Override
public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException {
return FutureUtils.result(openAsyncLogReader(fromDLSN));
}
@Override
public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
Optional<String> subscriberId = Optional.absent();
AsyncLogReader reader = new BKAsyncLogReaderDLSN(
this,
scheduler,
getLockStateExecutor(true),
fromDLSN,
subscriberId,
false,
dynConf.getDeserializeRecordSetOnReads(),
statsLogger);
return Future.value(reader);
}
/**
* Note the lock here is a sort of elective exclusive lock. I.e. acquiring this lock will only prevent other
* people who try to acquire the lock from reading from the stream. Normal readers (and writers) will not be
* blocked.
*/
@Override
public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN) {
Optional<String> subscriberId = Optional.absent();
return getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId);
}
@Override
public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN, final String subscriberId) {
return getAsyncLogReaderWithLock(Optional.of(fromDLSN), Optional.of(subscriberId));
}
@Override
public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) {
Optional<DLSN> fromDLSN = Optional.absent();
return getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId));
}
protected Future<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN,
final Optional<String> subscriberId) {
if (!fromDLSN.isPresent() && !subscriberId.isPresent()) {
return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided."));
}
final BKAsyncLogReaderDLSN reader = new BKAsyncLogReaderDLSN(
BKDistributedLogManager.this,
scheduler,
getLockStateExecutor(true),
fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
subscriberId,
false,
dynConf.getDeserializeRecordSetOnReads(),
statsLogger);
pendingReaders.add(reader);
final Future<Void> lockFuture = reader.lockStream();
final Promise<AsyncLogReader> createPromise = new Promise<AsyncLogReader>(
new Function<Throwable, BoxedUnit>() {
@Override
public BoxedUnit apply(Throwable cause) {
// cancel the lock when the creation future is cancelled
lockFuture.cancel();
return BoxedUnit.UNIT;
}
});
// lock the stream - fetch the last commit position on success
lockFuture.flatMap(new Function<Void, Future<AsyncLogReader>>() {
@Override
public Future<AsyncLogReader> apply(Void complete) {
if (fromDLSN.isPresent()) {
return Future.value((AsyncLogReader) reader);
}
LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.",
subscriberId.get(), name);
// we acquired lock
final SubscriptionStateStore stateStore = getSubscriptionStateStore(subscriberId.get());
return stateStore.getLastCommitPosition().map(new ExceptionalFunction<DLSN, AsyncLogReader>() {
@Override
public AsyncLogReader applyE(DLSN lastCommitPosition) throws UnexpectedException {
LOG.info("Reader {} @ {} positioned to last commit position {}.",
new Object[] { subscriberId.get(), name, lastCommitPosition });
reader.setStartDLSN(lastCommitPosition);
return reader;
}
});
}
}).addEventListener(new FutureEventListener<AsyncLogReader>() {
@Override
public void onSuccess(AsyncLogReader r) {
pendingReaders.remove(reader);
FutureUtils.setValue(createPromise, r);
}
@Override
public void onFailure(final Throwable cause) {
pendingReaders.remove(reader);
reader.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
FutureUtils.setException(createPromise, cause);
return BoxedUnit.UNIT;
}
});
}
});
return createPromise;
}
/**
* Get the input stream starting with fromTxnId for the specified log
*
* @param fromTxnId
* transaction id to start reading from
* @return log reader
* @throws IOException
*/
LogReader getInputStreamInternal(long fromTxnId)
throws IOException {
DLSN fromDLSN;
try {
fromDLSN = FutureUtils.result(getDLSNNotLessThanTxId(fromTxnId));
} catch (LogEmptyException lee) {
fromDLSN = DLSN.InitialDLSN;
}
return getInputStreamInternal(fromDLSN, Optional.of(fromTxnId));
}
LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId)
throws IOException {
LOG.info("Create async reader starting from {}", fromDLSN);
checkClosedOrInError("getInputStream");
Optional<String> subscriberId = Optional.absent();
BKAsyncLogReaderDLSN asyncReader = new BKAsyncLogReaderDLSN(
this,
scheduler,
getLockStateExecutor(true),
fromDLSN,
subscriberId,
true,
dynConf.getDeserializeRecordSetOnReads(),
statsLogger);
return new BKSyncLogReaderDLSN(conf, asyncReader, scheduler, fromTxnId);
}
/**
* Get the last log record in the stream
*
* @return the last log record in the stream
* @throws java.io.IOException if a stream cannot be found.
*/
@Override
public LogRecordWithDLSN getLastLogRecord() throws IOException {
checkClosedOrInError("getLastLogRecord");
return FutureUtils.result(getLastLogRecordAsync());
}
@Override
public long getFirstTxId() throws IOException {
checkClosedOrInError("getFirstTxId");
return FutureUtils.result(getFirstRecordAsyncInternal()).getTransactionId();
}
@Override
public long getLastTxId() throws IOException {
checkClosedOrInError("getLastTxId");
return FutureUtils.result(getLastTxIdAsync());
}
@Override
public DLSN getLastDLSN() throws IOException {
checkClosedOrInError("getLastDLSN");
return FutureUtils.result(getLastLogRecordAsyncInternal(false, false)).getDlsn();
}
/**
* Get Latest log record in the log
*
* @return latest log record
*/
@Override
public Future<LogRecordWithDLSN> getLastLogRecordAsync() {
return getLastLogRecordAsyncInternal(false, false);
}
private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover,
final boolean includeEndOfStream) {
return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
@Override
public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream);
}
});
}
/**
* Get Latest Transaction Id in the log
*
* @return latest transaction id
*/
@Override
public Future<Long> getLastTxIdAsync() {
return getLastLogRecordAsyncInternal(false, false)
.map(RECORD_2_TXID_FUNCTION);
}
/**
* Get first DLSN in the log.
*
* @return first dlsn in the stream
*/
@Override
public Future<DLSN> getFirstDLSNAsync() {
return getFirstRecordAsyncInternal().map(RECORD_2_DLSN_FUNCTION);
}
private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
@Override
public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
return ledgerHandler.asyncGetFirstLogRecord();
}
});
}
/**
* Get Latest DLSN in the log.
*
* @return latest transaction id
*/
@Override
public Future<DLSN> getLastDLSNAsync() {
return getLastLogRecordAsyncInternal(false, false)
.map(RECORD_2_DLSN_FUNCTION);
}
/**
* Get the number of log records in the active portion of the log
* Any log segments that have already been truncated will not be included
*
* @return number of log records
* @throws IOException
*/
@Override
public long getLogRecordCount() throws IOException {
checkClosedOrInError("getLogRecordCount");
return FutureUtils.result(getLogRecordCountAsync(DLSN.InitialDLSN));
}
/**
* Get the number of log records in the active portion of the log asynchronously.
* Any log segments that have already been truncated will not be included
*
* @return future number of log records
* @throws IOException
*/
@Override
public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN) {
return processReaderOperation(new Function<BKLogReadHandler, Future<Long>>() {
@Override
public Future<Long> apply(BKLogReadHandler ledgerHandler) {
return ledgerHandler.asyncGetLogRecordCount(beginDLSN);
}
});
}
@Override
public void recover() throws IOException {
recoverInternal(conf.getUnpartitionedStreamName());
}
/**
* Recover a specified stream within the log container
* The writer implicitly recovers a topic when it resumes writing.
* This allows applications to recover a container explicitly so
* that application may read a fully recovered log before resuming
* the writes
*
* @throws IOException if the recovery fails
*/
private void recoverInternal(String streamIdentifier) throws IOException {
checkClosedOrInError("recoverInternal");
BKLogWriteHandler ledgerHandler = createWriteHandler(true);
try {
FutureUtils.result(ledgerHandler.recoverIncompleteLogSegments());
} finally {
Utils.closeQuietly(ledgerHandler);
}
}
/**
* Delete all the partitions of the specified log
*
* @throws IOException if the deletion fails
*/
@Override
public void delete() throws IOException {
BKLogWriteHandler ledgerHandler = createWriteHandler(true);
try {
ledgerHandler.deleteLog();
} finally {
Utils.closeQuietly(ledgerHandler);
}
// Delete the ZK path associated with the log stream
String zkPath = getZKPath();
// Safety check when we are using the shared zookeeper
if (zkPath.toLowerCase().contains("distributedlog")) {
try {
LOG.info("Delete the path associated with the log {}, ZK Path {}", name, zkPath);
ZKUtil.deleteRecursive(writerZKC.get(), zkPath);
} catch (InterruptedException ie) {
LOG.error("Interrupted while accessing ZK", ie);
throw new DLInterruptedException("Error initializing zk", ie);
} catch (KeeperException ke) {
LOG.error("Error accessing entry in zookeeper", ke);
throw new IOException("Error initializing zk", ke);
}
} else {
LOG.warn("Skip deletion of unrecognized ZK Path {}", zkPath);
}
}
/**
* The DistributedLogManager may archive/purge any logs for transactionId
* less than or equal to minImageTxId.
* This is to be used only when the client explicitly manages deletion. If
* the cleanup policy is based on sliding time window, then this method need
* not be called.
*
* @param minTxIdToKeep the earliest txid that must be retained
* @throws IOException if purging fails
*/
@Override
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
Preconditions.checkArgument(minTxIdToKeep > 0, "Invalid transaction id " + minTxIdToKeep);
checkClosedOrInError("purgeLogSegmentsOlderThan");
BKLogWriteHandler ledgerHandler = createWriteHandler(true);
try {
LOG.info("Purging logs for {} older than {}", ledgerHandler.getFullyQualifiedName(), minTxIdToKeep);
FutureUtils.result(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep));
} finally {
Utils.closeQuietly(ledgerHandler);
}
}
static class PendingReaders implements AsyncCloseable {
final ExecutorService executorService;
final Set<AsyncLogReader> readers = new HashSet<AsyncLogReader>();
PendingReaders(ExecutorService executorService) {
this.executorService = executorService;
}
public synchronized void remove(AsyncLogReader reader) {
readers.remove(reader);
}
public synchronized void add(AsyncLogReader reader) {
readers.add(reader);
}
@Override
public Future<Void> asyncClose() {
return Utils.closeSequence(executorService, true, readers.toArray(new AsyncLogReader[readers.size()]))
.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
@Override
public BoxedUnit apply(Void value) {
readers.clear();
return BoxedUnit.UNIT;
}
});
}
};
/**
* Close the distributed log manager, freeing any resources it may hold.
*/
@Override
public Future<Void> asyncClose() {
Promise<Void> closeFuture;
BKLogReadHandler readHandlerToClose;
synchronized (this) {
if (null != closePromise) {
return closePromise;
}
closeFuture = closePromise = new Promise<Void>();
readHandlerToClose = readHandlerForListener;
}
// NOTE: the resources {scheduler, writerBKC, readerBKC} are mostly from namespace instance.
// so they are not blocking call except tests.
AsyncCloseable resourcesCloseable = new AsyncCloseable() {
@Override
public Future<Void> asyncClose() {
int schedTimeout = conf.getSchedulerShutdownTimeoutMs();
// Clean up executor state.
if (ownExecutor) {
SchedulerUtils.shutdownScheduler(scheduler, schedTimeout, TimeUnit.MILLISECONDS);
LOG.info("Stopped BKDL executor service for {}.", name);
if (scheduler != readAheadScheduler) {
SchedulerUtils.shutdownScheduler(readAheadScheduler, schedTimeout, TimeUnit.MILLISECONDS);
LOG.info("Stopped BKDL ReadAhead Executor Service for {}.", name);
}
SchedulerUtils.shutdownScheduler(getLockStateExecutor(false), schedTimeout, TimeUnit.MILLISECONDS);
LOG.info("Stopped BKDL Lock State Executor for {}.", name);
}
if (ownWriterBKC) {
writerBKC.close();
}
if (ownReaderBKC) {
readerBKC.close();
}
return Future.Void();
}
};
Future<Void> closeResult = Utils.closeSequence(null, true,
readHandlerToClose,
pendingReaders,
resourcesCloseable,
new AsyncCloseable() {
@Override
public Future<Void> asyncClose() {
return BKDistributedLogManager.super.asyncClose();
}
});
closeResult.proxyTo(closeFuture);
return closeFuture;
}
@Override
public void close() throws IOException {
FutureUtils.result(asyncClose());
}
public boolean scheduleTask(Runnable task) {
try {
scheduler.submit(task);
return true;
} catch (RejectedExecutionException ree) {
LOG.error("Task {} is rejected : ", task, ree);
return false;
}
}
private FuturePool buildFuturePool(ExecutorService executorService,
StatsLogger statsLogger) {
FuturePool futurePool = new ExecutorServiceFuturePool(executorService);
return new MonitoredFuturePool(
futurePool,
statsLogger,
conf.getEnableTaskExecutionStats(),
conf.getTaskExecutionWarnTimeMicros());
}
private void initializeFuturePool(boolean ordered) {
// ownExecutor is a single threaded thread pool
if (null == readerFuturePool) {
readerFuturePool = buildFuturePool(
scheduler, statsLogger.scope("reader_future_pool"));
}
}
@Override
public String toString() {
return String.format("DLM:%s:%s", getZKPath(), getStreamName());
}
public void raiseAlert(String msg, Object... args) {
alertStatsLogger.raise(msg, args);
}
/**
* Get the subscription state storage provided by the distributed log manager
*
* @param subscriberId - Application specific Id associated with the subscriber
* @return Subscription state store
*/
@Override
@Deprecated
public SubscriptionStateStore getSubscriptionStateStore(String subscriberId) {
return getSubscriptionStateStoreInternal(conf.getUnpartitionedStreamName(), subscriberId);
}
/**
* Get the subscription state storage provided by the distributed log manager
*
* @param streamIdentifier - Identifier associated with the stream
* @param subscriberId - Application specific Id associated with the subscriber
* @return Subscription state store
*/
private SubscriptionStateStore getSubscriptionStateStoreInternal(String streamIdentifier, String subscriberId) {
return new ZKSubscriptionStateStore(writerZKC,
ZKLogMetadataForReader.getSubscriberPath(uri, name, streamIdentifier, subscriberId));
}
@Override
public SubscriptionsStore getSubscriptionsStore() {
return getSubscriptionsStoreInternal(conf.getUnpartitionedStreamName());
}
/**
* Get the subscription state storage provided by the distributed log manager
*
* @param streamIdentifier - Identifier associated with the stream
* @return Subscriptions store
*/
private SubscriptionsStore getSubscriptionsStoreInternal(String streamIdentifier) {
return new ZKSubscriptionsStore(writerZKC,
ZKLogMetadataForReader.getSubscribersPath(uri, name, streamIdentifier));
}
}