DL-117: Stream metadata store
This change is to abstract the zookeeper operations into a stream metadata store, so we can replace zookeeper with other metadata store easily.
So the metadata operations in distributedlog now are managed by 3 classes:
- LogMetadataStore : it is the namespace metadata store : it manages the location (uri) mapping for streams and handle namespace operations.
- LogStreamMetadataStore: it is the stream metadata store : it manages the metadata for a single stream, such as managing read/write lock, retriving/creating stream metadata, deleting metadata and such.
- LogSegmentMetadataStore: it is the segment metadata store : it manages the log segment metadata for individual log segment.
LogMetadataStore and LogSegmentMetadataStore are already there. This change focus on LogStreamMetadataStore
Changed:
* abstract all the zookeeper metadata operation in log handlers to LogStreamMetadataStore
* remove disabling max tx id santify check, as maxTxId update is part of the metadata update transaction
Not changed:
the name of ZKLogMetadataForReader and ZKLogMetadataForWriter are not changed. I will send out a change to rename these two classes as they are not related to zookeeper anymore.
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index cf792e3..2ca064c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -30,7 +30,6 @@
import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
@@ -210,7 +209,6 @@
BKAsyncLogReaderDLSN(BKDistributedLogManager bkdlm,
ScheduledExecutorService executorService,
- OrderedScheduler lockStateExecutor,
DLSN startDLSN,
Optional<String> subscriberId,
boolean returnEndOfStreamRecord,
@@ -219,7 +217,7 @@
this.bkDistributedLogManager = bkdlm;
this.executorService = executorService;
this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
- lockStateExecutor, this, deserializeRecordSet, true);
+ this, deserializeRecordSet, true);
LOG.debug("Starting async reader at {}", startDLSN);
this.startDLSN = startDLSN;
this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -414,7 +412,7 @@
final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
if (!readAheadStarted) {
- bkLedgerManager.checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+ bkLedgerManager.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
try {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index ac37f3a..0a34caa 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -29,25 +29,22 @@
import com.twitter.distributedlog.callback.LogSegmentListener;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LogEmptyException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
import com.twitter.distributedlog.function.GetVersionedValueFunction;
-import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.lock.NopDistributedLock;
-import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKDistributedLock;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.subscription.SubscriptionStateStore;
@@ -75,10 +72,6 @@
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
@@ -93,7 +86,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -120,13 +112,6 @@
class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedLogManager {
static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
- static void createLog(DistributedLogConfiguration conf, ZooKeeperClient zkc, URI uri, String streamName)
- throws IOException, InterruptedException {
- Future<ZKLogMetadataForWriter> createFuture = ZKLogMetadataForWriter.of(
- uri, streamName, conf.getUnpartitionedStreamName(), zkc.get(), zkc.getDefaultACL(), true, true);
- FutureUtils.result(createFuture);
- }
-
static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION =
new Function<LogRecordWithDLSN, Long>() {
@Override
@@ -158,12 +143,10 @@
private final StatsLogger perLogStatsLogger;
private final AlertStatsLogger alertStatsLogger;
- // lock factory
- private SessionLockFactory lockFactory = null;
-
- // log segment metadata stores
- private final LogSegmentMetadataStore writerMetadataStore;
- private final LogSegmentMetadataStore readerMetadataStore;
+ // log stream metadata stores
+ private final LogStreamMetadataStore writerMetadataStore;
+ private final LogStreamMetadataStore readerMetadataStore;
+ // log segment metadata cache
private final LogSegmentMetadataCache logSegmentMetadataCache;
// bookkeeper clients
@@ -183,9 +166,6 @@
//
private final LedgerAllocator ledgerAllocator;
private final PermitLimiter writeLimiter;
- // Log Segment Rolling Manager to control rolling speed
- private final PermitManager logSegmentRollingPermitManager;
- private OrderedScheduler lockStateExecutor = null;
//
// Reader Related Variables
@@ -237,19 +217,16 @@
readerBKCBuilder,
null,
null,
- null,
new LogSegmentMetadataCache(conf, Ticker.systemTicker()),
OrderedScheduler.newBuilder().name("BKDL-" + name).corePoolSize(1).build(),
null,
null,
null,
- null,
new ReadAheadExceptionsLogger(statsLogger),
DistributedLogConstants.UNKNOWN_CLIENT_ID,
DistributedLogConstants.LOCAL_REGION_ID,
null,
writeLimiter,
- PermitManager.UNLIMITED_PERMIT_MANAGER,
featureProvider,
statsLogger,
NullStatsLogger.INSTANCE);
@@ -268,12 +245,10 @@
* @param zkcForReaderBKC zookeeper builder for bookkeeper shared by readers
* @param writerBKCBuilder bookkeeper builder for writers
* @param readerBKCBuilder bookkeeper builder for readers
- * @param lockFactory distributed lock factory
* @param writerMetadataStore writer metadata store
* @param readerMetadataStore reader metadata store
* @param scheduler ordered scheduled used by readers and writers
* @param readAheadScheduler readAhead scheduler used by readers
- * @param lockStateExecutor ordered scheduled used by locks to execute lock actions
* @param channelFactory client socket channel factory to build bookkeeper clients
* @param requestTimer request timer to build bookkeeper clients
* @param readAheadExceptionsLogger stats logger to record readahead exceptions
@@ -297,13 +272,11 @@
ZooKeeperClient zkcForReaderBKC,
BookKeeperClientBuilder writerBKCBuilder,
BookKeeperClientBuilder readerBKCBuilder,
- SessionLockFactory lockFactory,
- LogSegmentMetadataStore writerMetadataStore,
- LogSegmentMetadataStore readerMetadataStore,
+ LogStreamMetadataStore writerMetadataStore,
+ LogStreamMetadataStore readerMetadataStore,
LogSegmentMetadataCache logSegmentMetadataCache,
OrderedScheduler scheduler,
OrderedScheduler readAheadScheduler,
- OrderedScheduler lockStateExecutor,
ClientSocketChannelFactory channelFactory,
HashedWheelTimer requestTimer,
ReadAheadExceptionsLogger readAheadExceptionsLogger,
@@ -311,7 +284,6 @@
Integer regionId,
LedgerAllocator ledgerAllocator,
PermitLimiter writeLimiter,
- PermitManager logSegmentRollingPermitManager,
FeatureProvider featureProvider,
StatsLogger statsLogger,
StatsLogger perLogStatsLogger) throws IOException {
@@ -320,8 +292,6 @@
this.conf = conf;
this.dynConf = dynConf;
this.scheduler = scheduler;
- this.lockFactory = lockFactory;
- this.lockStateExecutor = lockStateExecutor;
this.readAheadScheduler = null == readAheadScheduler ? scheduler : readAheadScheduler;
this.statsLogger = statsLogger;
this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger);
@@ -332,15 +302,24 @@
this.streamIdentifier = conf.getUnpartitionedStreamName();
this.ledgerAllocator = ledgerAllocator;
this.writeLimiter = writeLimiter;
- this.logSegmentRollingPermitManager = logSegmentRollingPermitManager;
if (null == writerMetadataStore) {
- this.writerMetadataStore = new ZKLogSegmentMetadataStore(conf, writerZKC, scheduler);
+ this.writerMetadataStore = new ZKLogStreamMetadataStore(
+ clientId,
+ conf,
+ writerZKC,
+ scheduler,
+ statsLogger);
} else {
this.writerMetadataStore = writerMetadataStore;
}
if (null == readerMetadataStore) {
- this.readerMetadataStore = new ZKLogSegmentMetadataStore(conf, readerZKC, scheduler);
+ this.readerMetadataStore = new ZKLogStreamMetadataStore(
+ clientId,
+ conf,
+ readerZKC,
+ scheduler,
+ statsLogger);
} else {
this.readerMetadataStore = readerMetadataStore;
}
@@ -407,26 +386,13 @@
this.readAheadExceptionsLogger = readAheadExceptionsLogger;
}
- synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
- if (createIfNull && null == lockStateExecutor && ownExecutor) {
- lockStateExecutor = OrderedScheduler.newBuilder()
- .corePoolSize(1).name("BKDL-LockState").build();
- }
- return lockStateExecutor;
+ @VisibleForTesting
+ LogStreamMetadataStore getWriterMetadataStore() {
+ return writerMetadataStore;
}
- private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
- if (createIfNull && null == lockFactory) {
- lockFactory = new ZKSessionLockFactory(
- writerZKC,
- clientId,
- getLockStateExecutor(createIfNull),
- conf.getZKNumRetries(),
- conf.getLockTimeoutMilliSeconds(),
- conf.getZKRetryBackoffStartMillis(),
- statsLogger);
- }
- return lockFactory;
+ URI getUri() {
+ return uri;
}
DistributedLogConfiguration getConf() {
@@ -457,12 +423,16 @@
return this.featureProvider;
}
- private synchronized BKLogReadHandler getReadHandlerForListener(boolean create) {
+ private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(
+ boolean create, LogSegmentListener listener) {
if (null == readHandlerForListener && create) {
readHandlerForListener = createReadHandler();
- // start fetch the log segments
+ readHandlerForListener.registerListener(listener);
+ // start fetch the log segments after created the listener
readHandlerForListener.asyncStartFetchLogSegments();
+ return readHandlerForListener;
}
+ readHandlerForListener.registerListener(listener);
return readHandlerForListener;
}
@@ -483,8 +453,7 @@
@Override
public void registerListener(LogSegmentListener listener) throws IOException {
- BKLogReadHandler readHandler = getReadHandlerForListener(true);
- readHandler.registerListener(listener);
+ getReadHandlerAndRegisterListener(true, listener);
}
@Override
@@ -523,14 +492,12 @@
boolean isHandleForReading) {
return createReadHandler(
subscriberId,
- getLockStateExecutor(true),
null,
true, /* deserialize record set */
isHandleForReading);
}
synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
- OrderedScheduler lockExecutor,
AsyncNotification notification,
boolean deserializeRecordSet,
boolean isHandleForReading) {
@@ -540,12 +507,10 @@
subscriberId,
conf,
dynConf,
- readerZKCBuilder,
readerBKCBuilder,
readerMetadataStore,
logSegmentMetadataCache,
scheduler,
- lockExecutor,
readAheadScheduler,
alertStatsLogger,
readAheadExceptionsLogger,
@@ -585,24 +550,15 @@
}
Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) {
- final ZooKeeper zk;
- try {
- zk = writerZKC.get();
- } catch (InterruptedException e) {
- LOG.error("Failed to initialize zookeeper client : ", e);
- return Future.exception(new DLInterruptedException("Failed to initialize zookeeper client", e));
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- return Future.exception(FutureUtils.zkException(e, uri.getPath()));
- }
-
boolean ownAllocator = null == ledgerAllocator;
- // Fetching Log Metadata
- Future<ZKLogMetadataForWriter> metadataFuture =
- ZKLogMetadataForWriter.of(uri, name, streamIdentifier,
- zk, writerZKC.getDefaultACL(),
- ownAllocator, conf.getCreateStreamIfNotExists() || ownAllocator);
- return metadataFuture.flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() {
+ // Fetching Log Metadata (create if not exists)
+ return writerMetadataStore.getLog(
+ uri,
+ name,
+ ownAllocator,
+ conf.getCreateStreamIfNotExists() || ownAllocator
+ ).flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() {
@Override
public Future<BKLogWriteHandler> apply(ZKLogMetadataForWriter logMetadata) {
Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>();
@@ -615,16 +571,10 @@
private void createWriteHandler(ZKLogMetadataForWriter logMetadata,
boolean lockHandler,
final Promise<BKLogWriteHandler> createPromise) {
- OrderedScheduler lockStateExecutor = getLockStateExecutor(true);
// Build the locks
DistributedLock lock;
if (conf.isWriteLockEnabled()) {
- lock = new ZKDistributedLock(
- lockStateExecutor,
- getLockFactory(true),
- logMetadata.getLockPath(),
- conf.getLockTimeoutMilliSeconds(),
- statsLogger);
+ lock = writerMetadataStore.createWriteLock(logMetadata);
} else {
lock = NopDistributedLock.INSTANCE;
}
@@ -641,7 +591,6 @@
final BKLogWriteHandler writeHandler = new BKLogWriteHandler(
logMetadata,
conf,
- writerZKCBuilder,
writerBKCBuilder,
writerMetadataStore,
logSegmentMetadataCache,
@@ -656,10 +605,6 @@
featureProvider,
dynConf,
lock);
- PermitManager manager = getLogSegmentRollingPermitManager();
- if (manager instanceof Watcher) {
- writeHandler.register((Watcher) manager);
- }
if (lockHandler) {
writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() {
@Override
@@ -684,7 +629,7 @@
}
PermitManager getLogSegmentRollingPermitManager() {
- return logSegmentRollingPermitManager;
+ return writerMetadataStore.getPermitManager();
}
<T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) {
@@ -692,7 +637,7 @@
return readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() {
@Override
public BKLogReadHandler applyE() throws Throwable {
- return getReadHandlerForListener(true);
+ return getReadHandlerAndRegisterListener(true, null);
}
}).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
@Override
@@ -982,7 +927,6 @@
AsyncLogReader reader = new BKAsyncLogReaderDLSN(
this,
scheduler,
- getLockStateExecutor(true),
fromDLSN,
subscriberId,
false,
@@ -1022,7 +966,6 @@
final BKAsyncLogReaderDLSN reader = new BKAsyncLogReaderDLSN(
BKDistributedLogManager.this,
scheduler,
- getLockStateExecutor(true),
fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
subscriberId,
false,
@@ -1266,33 +1209,9 @@
*/
@Override
public void delete() throws IOException {
- BKLogWriteHandler ledgerHandler = createWriteHandler(true);
- try {
- ledgerHandler.deleteLog();
- } finally {
- Utils.closeQuietly(ledgerHandler);
- }
-
- // Delete the ZK path associated with the log stream
- String zkPath = getZKPath();
- // Safety check when we are using the shared zookeeper
- if (zkPath.toLowerCase().contains("distributedlog")) {
- try {
- LOG.info("Delete the path associated with the log {}, ZK Path {}", name, zkPath);
- ZKUtil.deleteRecursive(writerZKC.get(), zkPath);
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while accessing ZK", ie);
- throw new DLInterruptedException("Error initializing zk", ie);
- } catch (KeeperException ke) {
- LOG.error("Error accessing entry in zookeeper", ke);
- throw new IOException("Error initializing zk", ke);
- }
- } else {
- LOG.warn("Skip deletion of unrecognized ZK Path {}", zkPath);
- }
+ FutureUtils.result(writerMetadataStore.deleteLog(uri, getStreamName()));
}
-
/**
* The DistributedLogManager may archive/purge any logs for transactionId
* less than or equal to minImageTxId.
@@ -1377,9 +1296,6 @@
SchedulerUtils.shutdownScheduler(readAheadScheduler, schedTimeout, TimeUnit.MILLISECONDS);
LOG.info("Stopped BKDL ReadAhead Executor Service for {}.", name);
}
-
- SchedulerUtils.shutdownScheduler(getLockStateExecutor(false), schedTimeout, TimeUnit.MILLISECONDS);
- LOG.info("Stopped BKDL Lock State Executor for {}.", name);
}
if (ownWriterBKC) {
writerBKC.close();
@@ -1410,16 +1326,6 @@
FutureUtils.result(asyncClose());
}
- public boolean scheduleTask(Runnable task) {
- try {
- scheduler.submit(task);
- return true;
- } catch (RejectedExecutionException ree) {
- LOG.error("Task {} is rejected : ", task, ree);
- return false;
- }
- }
-
private FuturePool buildFuturePool(ExecutorService executorService,
StatsLogger statsLogger) {
FuturePool futurePool = new ExecutorServiceFuturePool(executorService);
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index f8d347a..2c9fe44 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -21,7 +21,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption;
@@ -33,30 +32,24 @@
import com.twitter.distributedlog.callback.NamespaceListener;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.feature.CoreFeatureKeys;
import com.twitter.distributedlog.impl.ZKLogMetadataStore;
-import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.BKDLConfig;
import com.twitter.distributedlog.metadata.LogMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.LimitedPermitManager;
import com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.PermitLimiter;
-import com.twitter.distributedlog.util.PermitManager;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.distributedlog.util.SimplePermitLimiter;
import com.twitter.distributedlog.util.Utils;
@@ -272,7 +265,6 @@
private final BKDLConfig bkdlConfig;
private final OrderedScheduler scheduler;
private final OrderedScheduler readAheadExecutor;
- private final OrderedScheduler lockStateExecutor;
private final ClientSocketChannelFactory channelFactory;
private final HashedWheelTimer requestTimer;
// zookeeper clients
@@ -300,16 +292,12 @@
private final LedgerAllocator allocator;
// access control manager
private AccessControlManager accessControlManager;
- // log segment rolling permit manager
- private final PermitManager logSegmentRollingPermitManager;
// log metadata store
private final LogMetadataStore metadataStore;
// log segment metadata store
private final LogSegmentMetadataCache logSegmentMetadataCache;
- private final LogSegmentMetadataStore writerSegmentMetadataStore;
- private final LogSegmentMetadataStore readerSegmentMetadataStore;
- // lock factory
- private final SessionLockFactory lockFactory;
+ private final LogStreamMetadataStore writerStreamMetadataStore;
+ private final LogStreamMetadataStore readerStreamMetadataStore;
// feature provider
private final FeatureProvider featureProvider;
@@ -371,15 +359,7 @@
this.readAheadExecutor = this.scheduler;
LOG.info("Used shared executor for readahead.");
}
- StatsLogger lockStateStatsLogger = statsLogger.scope("factory").scope("lock_scheduler");
- this.lockStateExecutor = OrderedScheduler.newBuilder()
- .name("DLM-LockState")
- .corePoolSize(conf.getNumLockStateThreads())
- .statsLogger(lockStateStatsLogger)
- .perExecutorStatsLogger(lockStateStatsLogger)
- .traceTaskExecution(conf.getEnableTaskExecutionStats())
- .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
- .build();
+
this.channelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
@@ -427,9 +407,6 @@
}
this.readerBKC = this.sharedReaderBKCBuilder.build();
- this.logSegmentRollingPermitManager = new LimitedPermitManager(
- conf.getLogSegmentRollingConcurrency(), 1, TimeUnit.MINUTES, scheduler);
-
if (conf.getGlobalOutstandingWriteLimit() < 0) {
this.writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
} else {
@@ -458,15 +435,6 @@
} else {
allocator = null;
}
- // Build the lock factory
- this.lockFactory = new ZKSessionLockFactory(
- sharedWriterZKCForDL,
- clientId,
- lockStateExecutor,
- conf.getZKNumRetries(),
- conf.getLockTimeoutMilliSeconds(),
- conf.getZKRetryBackoffStartMillis(),
- statsLogger);
// Stats Loggers
this.readAheadExceptionsLogger = new ReadAheadExceptionsLogger(statsLogger);
@@ -478,11 +446,22 @@
this.metadataStore = new ZKLogMetadataStore(conf, namespace, sharedReaderZKCForDL, scheduler);
}
- // create log segment metadata store
- this.writerSegmentMetadataStore =
- new ZKLogSegmentMetadataStore(conf, sharedWriterZKCForDL, scheduler);
- this.readerSegmentMetadataStore =
- new ZKLogSegmentMetadataStore(conf, sharedReaderZKCForDL, scheduler);
+ // create log stream metadata store
+ this.writerStreamMetadataStore =
+ new ZKLogStreamMetadataStore(
+ clientId,
+ conf,
+ sharedWriterZKCForDL,
+ scheduler,
+ statsLogger);
+ this.readerStreamMetadataStore =
+ new ZKLogStreamMetadataStore(
+ clientId,
+ conf,
+ sharedReaderZKCForDL,
+ scheduler,
+ statsLogger);
+ // create a log segment metadata cache
this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker());
LOG.info("Constructed BK DistributedLogNamespace : clientId = {}, regionId = {}, federated = {}.",
@@ -499,7 +478,7 @@
checkState();
validateName(logName);
URI uri = FutureUtils.result(metadataStore.createLog(logName));
- createUnpartitionedStreams(conf, uri, Lists.newArrayList(logName));
+ FutureUtils.result(writerStreamMetadataStore.getLog(uri, logName, true, true));
}
@Override
@@ -556,7 +535,16 @@
throws IOException, IllegalArgumentException {
checkState();
Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName));
- return uri.isPresent() && checkIfLogExists(conf, uri.get(), logName);
+ if (uri.isPresent()) {
+ try {
+ FutureUtils.result(writerStreamMetadataStore.logExists(uri.get(), logName));
+ return true;
+ } catch (LogNotFoundException lnfe) {
+ return false;
+ }
+ } else {
+ return false;
+ }
}
@Override
@@ -701,8 +689,8 @@
}
@VisibleForTesting
- public LogSegmentMetadataStore getWriterSegmentMetadataStore() {
- return writerSegmentMetadataStore;
+ public LogStreamMetadataStore getWriterStreamMetadataStore() {
+ return writerStreamMetadataStore;
}
@VisibleForTesting
@@ -883,10 +871,8 @@
}
LedgerAllocator dlmLedgerAlloctor = null;
- PermitManager dlmLogSegmentRollingPermitManager = PermitManager.UNLIMITED_PERMIT_MANAGER;
if (ClientSharingOption.SharedClients == clientSharingOption) {
dlmLedgerAlloctor = this.allocator;
- dlmLogSegmentRollingPermitManager = this.logSegmentRollingPermitManager;
}
// if there's a specified perStreamStatsLogger, user it, otherwise use the default one.
StatsLogger perLogStatsLogger = perStreamStatsLogger.or(this.perLogStatsLogger);
@@ -902,13 +888,11 @@
readerZKCForBK, /* ZKC for BookKeeper for DL Readers */
writerBKCBuilder, /* BookKeeper Builder for DL Writers */
readerBKCBuilder, /* BookKeeper Builder for DL Readers */
- lockFactory, /* Lock Factory */
- writerSegmentMetadataStore, /* Log Segment Metadata Store for DL Writers */
- readerSegmentMetadataStore, /* Log Segment Metadata Store for DL Readers */
+ writerStreamMetadataStore, /* Log Segment Metadata Store for DL Writers */
+ readerStreamMetadataStore, /* Log Segment Metadata Store for DL Readers */
logSegmentMetadataCache, /* Log Segment Metadata Cache */
scheduler, /* DL scheduler */
readAheadExecutor, /* Read Aheader Executor */
- lockStateExecutor, /* Lock State Executor */
channelFactory, /* Netty Channel Factory */
requestTimer, /* Request Timer */
readAheadExceptionsLogger, /* ReadAhead Exceptions Logger */
@@ -916,7 +900,6 @@
regionId, /* Region Id */
dlmLedgerAlloctor, /* Ledger Allocator */
writeLimiter, /* Write Limiter */
- dlmLogSegmentRollingPermitManager, /* Log segment rolling limiter */
featureProvider.scope("dl"), /* Feature Provider */
statsLogger, /* Stats Logger */
perLogStatsLogger /* Per Log Stats Logger */
@@ -961,25 +944,6 @@
validateName(nameOfStream);
}
- private static boolean checkIfLogExists(DistributedLogConfiguration conf, URI uri, String name)
- throws IOException, IllegalArgumentException {
- validateInput(conf, uri, name);
- final String logRootPath = uri.getPath() + String.format("/%s", name);
- return withZooKeeperClient(new ZooKeeperClientHandler<Boolean>() {
- @Override
- public Boolean handle(ZooKeeperClient zkc) throws IOException {
- // check existence after syncing
- try {
- return null != Utils.sync(zkc, logRootPath).exists(logRootPath, false);
- } catch (KeeperException e) {
- throw new ZKException("Error on checking if log " + logRootPath + " exists", e.code());
- } catch (InterruptedException e) {
- throw new DLInterruptedException("Interrupted on checking if log " + logRootPath + " exists", e);
- }
- }
- }, conf, uri);
- }
-
public static Map<String, byte[]> enumerateLogsWithMetadataInNamespace(final DistributedLogConfiguration conf, final URI uri)
throws IOException, IllegalArgumentException {
return withZooKeeperClient(new ZooKeeperClientHandler<Map<String, byte[]>>() {
@@ -1025,27 +989,6 @@
return result;
}
- private static void createUnpartitionedStreams(
- final DistributedLogConfiguration conf,
- final URI uri,
- final List<String> streamNames)
- throws IOException, IllegalArgumentException {
- withZooKeeperClient(new ZooKeeperClientHandler<Void>() {
- @Override
- public Void handle(ZooKeeperClient zkc) throws IOException {
- for (String s : streamNames) {
- try {
- BKDistributedLogManager.createLog(conf, zkc, uri, s);
- } catch (InterruptedException e) {
- LOG.error("Interrupted on creating unpartitioned stream {} : ", s, e);
- return null;
- }
- }
- return null;
- }
- }, conf, uri);
- }
-
private void checkState() throws IOException {
if (closed.get()) {
LOG.error("BKDistributedLogNamespace {} is already closed", namespace);
@@ -1079,13 +1022,11 @@
LOG.info("Ledger Allocator stopped.");
}
- // Unregister gauge to avoid GC spiral
- this.logSegmentRollingPermitManager.close();
this.writeLimiter.close();
// Shutdown log segment metadata stores
- Utils.close(writerSegmentMetadataStore);
- Utils.close(readerSegmentMetadataStore);
+ Utils.close(writerStreamMetadataStore);
+ Utils.close(readerStreamMetadataStore);
// Shutdown the schedulers
SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(),
@@ -1113,7 +1054,5 @@
LOG.info("Release external resources used by channel factory.");
requestTimer.stop();
LOG.info("Stopped request timer");
- SchedulerUtils.shutdownScheduler(lockStateExecutor, 5000, TimeUnit.MILLISECONDS);
- LOG.info("Stopped lock state executor");
}
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 2a6e85b..4f138f2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -20,12 +20,9 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LogEmptyException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
import com.twitter.distributedlog.io.AsyncAbortable;
import com.twitter.distributedlog.io.AsyncCloseable;
@@ -33,6 +30,7 @@
import com.twitter.distributedlog.logsegment.PerStreamLogSegmentCache;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.util.Function;
@@ -45,10 +43,6 @@
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction0;
@@ -95,8 +89,8 @@
protected final ZKLogMetadata logMetadata;
protected final DistributedLogConfiguration conf;
- protected final ZooKeeperClient zooKeeperClient;
protected final BookKeeperClient bookKeeperClient;
+ protected final LogStreamMetadataStore streamMetadataStore;
protected final LogSegmentMetadataStore metadataStore;
protected final LogSegmentMetadataCache metadataCache;
protected final int firstNumEntriesPerReadLastRecordScan;
@@ -112,8 +106,6 @@
// Maintain the list of log segments per stream
protected final PerStreamLogSegmentCache logSegmentCache;
-
-
// trace
protected final long metadataLatencyWarnThresholdMillis;
@@ -130,15 +122,13 @@
*/
BKLogHandler(ZKLogMetadata metadata,
DistributedLogConfiguration conf,
- ZooKeeperClientBuilder zkcBuilder,
BookKeeperClientBuilder bkcBuilder,
- LogSegmentMetadataStore metadataStore,
+ LogStreamMetadataStore streamMetadataStore,
LogSegmentMetadataCache metadataCache,
OrderedScheduler scheduler,
StatsLogger statsLogger,
AlertStatsLogger alertStatsLogger,
String lockClientId) {
- Preconditions.checkNotNull(zkcBuilder);
Preconditions.checkNotNull(bkcBuilder);
this.logMetadata = metadata;
this.conf = conf;
@@ -148,13 +138,11 @@
this.logSegmentCache = new PerStreamLogSegmentCache(
metadata.getLogName(),
conf.isLogSegmentSequenceNumberValidationEnabled());
-
firstNumEntriesPerReadLastRecordScan = conf.getFirstNumEntriesPerReadLastRecordScan();
maxNumEntriesPerReadLastRecordScan = conf.getMaxNumEntriesPerReadLastRecordScan();
- this.zooKeeperClient = zkcBuilder.build();
- LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
this.bookKeeperClient = bkcBuilder.build();
- this.metadataStore = metadataStore;
+ this.streamMetadataStore = streamMetadataStore;
+ this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore();
this.metadataCache = metadataCache;
this.lockClientId = lockClientId;
@@ -188,7 +176,8 @@
public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
- checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+ streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+ .addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
readLogSegmentsFromStore(
@@ -234,7 +223,8 @@
public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) {
final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
- checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+ streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+ .addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
readLogSegmentsFromStore(
@@ -381,8 +371,8 @@
* @return the count of records present in the range
*/
public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {
-
- return checkLogStreamExistsAsync().flatMap(new Function<Void, Future<Long>>() {
+ return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+ .flatMap(new Function<Void, Future<Long>>() {
public Future<Long> apply(Void done) {
return readLogSegmentsFromStore(
@@ -417,48 +407,6 @@
return sum;
}
- Future<Void> checkLogStreamExistsAsync() {
- final Promise<Void> promise = new Promise<Void>();
- try {
- final ZooKeeper zk = zooKeeperClient.get();
- zk.sync(logMetadata.getLogSegmentsPath(), new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int syncRc, String path, Object syncCtx) {
- if (KeeperException.Code.NONODE.intValue() == syncRc) {
- promise.setException(new LogNotFoundException(
- String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
- return;
- } else if (KeeperException.Code.OK.intValue() != syncRc){
- promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
- KeeperException.create(KeeperException.Code.get(syncRc))));
- return;
- }
- zk.exists(logMetadata.getLogSegmentsPath(), false, new AsyncCallback.StatCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- if (KeeperException.Code.OK.intValue() == rc) {
- promise.setValue(null);
- } else if (KeeperException.Code.NONODE.intValue() == rc) {
- promise.setException(new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
- } else {
- promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
- KeeperException.create(KeeperException.Code.get(rc))));
- }
- }
- }, null);
- }
- }, null);
-
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
- promise.setException(new DLInterruptedException("Interrupted while checking "
- + logMetadata.getLogSegmentsPath(), ie));
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
- }
- return promise;
- }
-
@Override
public Future<Void> asyncAbort() {
return asyncClose();
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 30a96ff..1963172 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -31,8 +31,6 @@
import com.twitter.distributedlog.callback.LogSegmentNamesListener;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.LockCancelledException;
import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
@@ -40,12 +38,9 @@
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.ZKDistributedLock;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
import com.twitter.distributedlog.readahead.ReadAheadWorker;
import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
@@ -53,7 +48,6 @@
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.ExceptionalFunction0;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
@@ -67,14 +61,13 @@
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Function0;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
+import javax.annotation.Nullable;
+
/**
* Log Handler for Readers.
* <h3>Metrics</h3>
@@ -111,7 +104,7 @@
* becoming idle.
* </ul>
* <h4>Read Lock</h4>
- * All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock}
+ * All read lock related stats are exposed under scope `read_lock`.
* for detail stats.
*/
class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
@@ -126,10 +119,7 @@
protected ReadAheadWorker readAheadWorker = null;
private final boolean isHandleForReading;
- private final SessionLockFactory lockFactory;
- private final OrderedScheduler lockStateExecutor;
private final Optional<String> subscriberId;
- private final String readLockPath;
private DistributedLock readLock;
private Future<Void> lockAcquireFuture;
@@ -156,12 +146,10 @@
Optional<String> subscriberId,
DistributedLogConfiguration conf,
DynamicDistributedLogConfiguration dynConf,
- ZooKeeperClientBuilder zkcBuilder,
BookKeeperClientBuilder bkcBuilder,
- LogSegmentMetadataStore metadataStore,
+ LogStreamMetadataStore streamMetadataStore,
LogSegmentMetadataCache metadataCache,
OrderedScheduler scheduler,
- OrderedScheduler lockStateExecutor,
OrderedScheduler readAheadExecutor,
AlertStatsLogger alertStatsLogger,
ReadAheadExceptionsLogger readAheadExceptionsLogger,
@@ -173,9 +161,8 @@
boolean deserializeRecordSet) {
super(logMetadata,
conf,
- zkcBuilder,
bkcBuilder,
- metadataStore,
+ streamMetadataStore,
metadataCache,
scheduler,
statsLogger,
@@ -206,23 +193,12 @@
Ticker.systemTicker());
this.subscriberId = subscriberId;
- this.readLockPath = logMetadata.getReadLockPath(subscriberId);
- this.lockStateExecutor = lockStateExecutor;
- this.lockFactory = new ZKSessionLockFactory(
- zooKeeperClient,
- getLockClientId(),
- lockStateExecutor,
- conf.getZKNumRetries(),
- conf.getLockTimeoutMilliSeconds(),
- conf.getZKRetryBackoffStartMillis(),
- statsLogger.scope("read_lock"));
-
this.isHandleForReading = isHandleForReading;
}
@VisibleForTesting
String getReadLockPath() {
- return readLockPath;
+ return logMetadataForReader.getReadLockPath(subscriberId);
}
<T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> result) {
@@ -234,38 +210,24 @@
});
}
+ Future<Void> checkLogStreamExists() {
+ return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName());
+ }
+
/**
* Elective stream lock--readers are not required to acquire the lock before using the stream.
*/
synchronized Future<Void> lockStream() {
if (null == lockAcquireFuture) {
- final Function0<DistributedLock> lockFunction = new ExceptionalFunction0<DistributedLock>() {
- @Override
- public DistributedLock applyE() throws IOException {
- // Unfortunately this has a blocking call which we should not execute on the
- // ZK completion thread
- BKLogReadHandler.this.readLock = new ZKDistributedLock(
- lockStateExecutor,
- lockFactory,
- readLockPath,
- conf.getLockTimeoutMilliSeconds(),
- statsLogger.scope("read_lock"));
-
- LOG.info("acquiring readlock {} at {}", getLockClientId(), readLockPath);
- return BKLogReadHandler.this.readLock;
- }
- };
- lockAcquireFuture = ensureReadLockPathExist().flatMap(new ExceptionalFunction<Void, Future<Void>>() {
- @Override
- public Future<Void> applyE(Void in) throws Throwable {
- return scheduler.apply(lockFunction).flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() {
+ lockAcquireFuture = streamMetadataStore.createReadLock(logMetadataForReader, subscriberId)
+ .flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() {
@Override
- public Future<Void> applyE(DistributedLock lock) throws IOException {
+ public Future<Void> applyE(DistributedLock lock) throws Throwable {
+ BKLogReadHandler.this.readLock = lock;
+ LOG.info("acquiring readlock {} at {}", getLockClientId(), getReadLockPath());
return acquireLockOnExecutorThread(lock);
}
});
- }
- });
}
return lockAcquireFuture;
}
@@ -292,14 +254,14 @@
acquireFuture.addEventListener(new FutureEventListener<DistributedLock>() {
@Override
public void onSuccess(DistributedLock lock) {
- LOG.info("acquired readlock {} at {}", getLockClientId(), readLockPath);
+ LOG.info("acquired readlock {} at {}", getLockClientId(), getReadLockPath());
satisfyPromiseAsync(threadAcquirePromise, new Return<Void>(null));
}
@Override
public void onFailure(Throwable cause) {
LOG.info("failed to acquire readlock {} at {}",
- new Object[]{getLockClientId(), readLockPath, cause});
+ new Object[]{ getLockClientId(), getReadLockPath(), cause });
satisfyPromiseAsync(threadAcquirePromise, new Throw<Void>(cause));
}
});
@@ -438,46 +400,6 @@
return handleCache;
}
- private Future<Void> ensureReadLockPathExist() {
- final Promise<Void> promise = new Promise<Void>();
- promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable t) {
- FutureUtils.setException(promise, new LockCancelledException(readLockPath, "Could not ensure read lock path", t));
- return null;
- }
- });
- Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
- Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
- new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
- new org.apache.zookeeper.AsyncCallback.StringCallback() {
- @Override
- public void processResult(final int rc, final String path, Object ctx, String name) {
- scheduler.submit(new Runnable() {
- @Override
- public void run() {
- if (KeeperException.Code.NONODE.intValue() == rc) {
- FutureUtils.setException(promise, new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
- } else if (KeeperException.Code.OK.intValue() == rc) {
- FutureUtils.setValue(promise, null);
- LOG.trace("Created path {}.", path);
- } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
- FutureUtils.setValue(promise, null);
- LOG.trace("Path {} is already existed.", path);
- } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
- FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
- } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
- FutureUtils.setException(promise, new DLInterruptedException(path));
- } else {
- FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- });
- }
- }, null);
- return promise;
- }
-
public Entry.Reader getNextReadAheadEntry() throws IOException {
return readAheadCache.getNextReadAheadEntry();
}
@@ -560,12 +482,16 @@
// Listener for log segments
//
- protected void registerListener(LogSegmentListener listener) {
- listeners.add(listener);
+ protected void registerListener(@Nullable LogSegmentListener listener) {
+ if (null != listener) {
+ listeners.add(listener);
+ }
}
- protected void unregisterListener(LogSegmentListener listener) {
- listeners.remove(listener);
+ protected void unregisterListener(@Nullable LogSegmentListener listener) {
+ if (null != listener) {
+ listeners.remove(listener);
+ }
}
protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 5d3be7d..f2e30ce 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -23,22 +23,21 @@
import com.twitter.distributedlog.bk.LedgerAllocator;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.EndOfStreamException;
import com.twitter.distributedlog.exceptions.LockingException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.function.GetLastTxIdFunction;
import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.logsegment.RollingPolicy;
import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy;
import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
import com.twitter.distributedlog.metadata.MetadataUpdater;
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.util.DLUtils;
@@ -49,9 +48,6 @@
import com.twitter.distributedlog.util.Transaction;
import com.twitter.distributedlog.util.PermitLimiter;
import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.zk.ZKOp;
-import com.twitter.distributedlog.zk.ZKTransaction;
-import com.twitter.distributedlog.zk.ZKVersionedSetOp;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
@@ -60,19 +56,11 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
@@ -84,7 +72,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
-import static com.google.common.base.Charsets.UTF_8;
import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
/**
@@ -102,11 +89,11 @@
class BKLogWriteHandler extends BKLogHandler {
static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
+ protected final ZKLogMetadataForWriter logMetadataForWriter;
protected final DistributedLock lock;
protected final LedgerAllocator ledgerAllocator;
protected final MaxTxId maxTxId;
protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
- protected final boolean sanityCheckTxnId;
protected final boolean validateLogSegmentSequenceNumber;
protected final int regionId;
protected final RollingPolicy rollingPolicy;
@@ -164,9 +151,8 @@
*/
BKLogWriteHandler(ZKLogMetadataForWriter logMetadata,
DistributedLogConfiguration conf,
- ZooKeeperClientBuilder zkcBuilder,
BookKeeperClientBuilder bkcBuilder,
- LogSegmentMetadataStore metadataStore,
+ LogStreamMetadataStore streamMetadataStore,
LogSegmentMetadataCache metadataCache,
OrderedScheduler scheduler,
LedgerAllocator allocator,
@@ -181,14 +167,14 @@
DistributedLock lock /** owned by handler **/) {
super(logMetadata,
conf,
- zkcBuilder,
bkcBuilder,
- metadataStore,
+ streamMetadataStore,
metadataCache,
scheduler,
statsLogger,
alertStatsLogger,
clientId);
+ this.logMetadataForWriter = logMetadata;
this.perLogStatsLogger = perLogStatsLogger;
this.writeLimiter = writeLimiter;
this.featureProvider = featureProvider;
@@ -202,15 +188,13 @@
} else {
this.regionId = DistributedLogConstants.LOCAL_REGION_ID;
}
- this.sanityCheckTxnId = conf.getSanityCheckTxnID();
this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled();
// Construct the max sequence no
maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData());
inprogressLSSNs = new LinkedList<Long>();
// Construct the max txn id.
- maxTxId = new MaxTxId(zooKeeperClient, logMetadata.getMaxTxIdPath(),
- conf.getSanityCheckTxnID(), logMetadata.getMaxTxIdData());
+ maxTxId = new MaxTxId(logMetadata.getMaxTxIdData());
// Schedule fetching log segment list in background before we access it.
// We don't need to watch the log segment list changes for writer, as it manages log segment list.
@@ -291,13 +275,12 @@
}
// Transactional operations for MaxLogSegmentSequenceNo
- void storeMaxSequenceNumber(final Transaction txn,
+ void storeMaxSequenceNumber(final Transaction<Object> txn,
final MaxLogSegmentSequenceNo maxSeqNo,
final long seqNo,
final boolean isInprogress) {
- byte[] data = DLUtils.serializeLogSegmentSequenceNumber(seqNo);
- Op zkOp = Op.setData(logMetadata.getLogSegmentsPath(), data, maxSeqNo.getZkVersion());
- txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() {
+ metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo),
+ new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version version) {
if (validateLogSegmentSequenceNumber) {
@@ -309,69 +292,60 @@
}
}
}
- maxSeqNo.update((ZkVersion) version, seqNo);
+ maxSeqNo.update(version, seqNo);
}
@Override
public void onAbort(Throwable t) {
// no-op
}
- }));
+ });
}
// Transactional operations for MaxTxId
- void storeMaxTxId(final ZKTransaction txn,
+ void storeMaxTxId(final Transaction<Object> txn,
final MaxTxId maxTxId,
final long txId) {
- byte[] data = maxTxId.couldStore(txId);
- if (null != data) {
- Op zkOp = Op.setData(maxTxId.getZkPath(), data, -1);
- txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() {
- @Override
- public void onCommit(Version version) {
- maxTxId.setMaxTxId(txId);
- }
+ metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId),
+ new Transaction.OpListener<Version>() {
+ @Override
+ public void onCommit(Version version) {
+ maxTxId.update(version, txId);
+ }
- @Override
- public void onAbort(Throwable t) {
-
- }
- }));
- }
+ @Override
+ public void onAbort(Throwable t) {
+ // no-op
+ }
+ });
}
// Transactional operations for logsegment
- void writeLogSegment(final ZKTransaction txn,
- final List<ACL> acl,
- final String inprogressSegmentName,
- final LogSegmentMetadata metadata,
- final String path) {
- byte[] finalisedData = metadata.getFinalisedData().getBytes(UTF_8);
- Op zkOp = Op.create(path, finalisedData, acl, CreateMode.PERSISTENT);
- txn.addOp(new ZKOp(zkOp) {
+ void writeLogSegment(final Transaction<Object> txn,
+ final LogSegmentMetadata metadata) {
+ metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
@Override
- protected void commitOpResult(OpResult opResult) {
- addLogSegmentToCache(inprogressSegmentName, metadata);
+ public void onCommit(Void r) {
+ addLogSegmentToCache(metadata.getSegmentName(), metadata);
}
@Override
- protected void abortOpResult(Throwable t, OpResult opResult) {
+ public void onAbort(Throwable t) {
// no-op
}
});
}
- void deleteLogSegment(final ZKTransaction txn,
- final String logSegmentName,
- final String logSegmentPath) {
- Op zkOp = Op.delete(logSegmentPath, -1);
- txn.addOp(new ZKOp(zkOp) {
+ void deleteLogSegment(final Transaction<Object> txn,
+ final LogSegmentMetadata metadata) {
+ metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
@Override
- protected void commitOpResult(OpResult opResult) {
- removeLogSegmentFromCache(logSegmentName);
+ public void onCommit(Void r) {
+ removeLogSegmentFromCache(metadata.getSegmentName());
}
+
@Override
- protected void abortOpResult(Throwable t, OpResult opResult) {
+ public void onAbort(Throwable t) {
// no-op
}
});
@@ -405,10 +379,6 @@
}
}
- void register(Watcher watcher) {
- this.zooKeeperClient.register(watcher);
- }
-
/**
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.
@@ -539,19 +509,17 @@
FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId));
return;
}
- if (this.sanityCheckTxnId) {
- long highestTxIdWritten = maxTxId.get();
- if (txId < highestTxIdWritten) {
- if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
- LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
- FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
- return;
- }
- else {
- LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
- FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
- return;
- }
+
+ long highestTxIdWritten = maxTxId.get();
+ if (txId < highestTxIdWritten) {
+ if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
+ LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
+ FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
+ return;
+ } else {
+ LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
+ FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
+ return;
}
}
@@ -564,7 +532,7 @@
}
// start the transaction from zookeeper
- final ZKTransaction txn = new ZKTransaction(zooKeeperClient);
+ final Transaction<Object> txn = streamMetadataStore.newTransaction();
// failpoint injected before creating ledger
try {
@@ -617,7 +585,7 @@
// once the ledger handle is obtained from allocator, this function should guarantee
// either the transaction is executed or aborted. Otherwise, the ledger handle will
// just leak from the allocation pool - hence cause "No Ledger Allocator"
- private void createInprogressLogSegment(ZKTransaction txn,
+ private void createInprogressLogSegment(Transaction<Object> txn,
final long txId,
final LedgerHandle lh,
boolean bestEffort,
@@ -634,7 +602,6 @@
return;
}
- final String inprogressZnodeName = inprogressZNodeName(lh.getId(), txId, logSegmentSeqNo);
final String inprogressZnodePath = inprogressZNode(lh.getId(), txId, logSegmentSeqNo);
final LogSegmentMetadata l =
new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath,
@@ -645,12 +612,7 @@
.build();
// Create an inprogress segment
- writeLogSegment(
- txn,
- zooKeeperClient.getDefaultACL(),
- inprogressZnodeName,
- l,
- inprogressZnodePath);
+ writeLogSegment(txn, l);
// Try storing max sequence number.
LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo);
@@ -667,7 +629,7 @@
try {
FutureUtils.setValue(promise, new BKLogSegmentWriter(
getFullyQualifiedName(),
- inprogressZnodeName,
+ l.getSegmentName(),
conf,
conf.getDLLedgerMetadataLayoutVersion(),
new BKLogSegmentEntryWriter(lh),
@@ -888,7 +850,6 @@
}
LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
- final String inprogressZnodePath = inprogressZNode(inprogressZnodeName);
LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);
// validate log segment
@@ -936,7 +897,7 @@
// ignore the case that a new inprogress log segment is pre-allocated
// before completing current inprogress one
LOG.info("Try storing max sequence number {} in completing {}.",
- new Object[] { logSegmentSeqNo, inprogressZnodePath });
+ new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() });
} else {
LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
@@ -949,7 +910,6 @@
}
// Prepare the completion
- final String nameForCompletedLedger = completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo);
final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
long startSequenceId;
try {
@@ -970,17 +930,12 @@
setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());
// prepare the transaction
- ZKTransaction txn = new ZKTransaction(zooKeeperClient);
+ Transaction<Object> txn = streamMetadataStore.newTransaction();
// create completed log segment
- writeLogSegment(
- txn,
- zooKeeperClient.getDefaultACL(),
- nameForCompletedLedger,
- completedLogSegment,
- pathForCompletedLedger);
+ writeLogSegment(txn, completedLogSegment);
// delete inprogress log segment
- deleteLogSegment(txn, inprogressZnodeName, inprogressZnodePath);
+ deleteLogSegment(txn, inprogressLogSegment);
// store max sequence number
storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
// update max txn id.
@@ -991,7 +946,8 @@
@Override
public void onSuccess(Void value) {
LOG.info("Completed {} to {} for {} : {}",
- new Object[] { inprogressZnodeName, nameForCompletedLedger, getFullyQualifiedName(), completedLogSegment });
+ new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(),
+ getFullyQualifiedName(), completedLogSegment });
FutureUtils.setValue(promise, completedLogSegment);
}
@@ -1072,27 +1028,6 @@
}
- public void deleteLog() throws IOException {
- lock.checkOwnershipAndReacquire();
- FutureUtils.result(purgeLogSegmentsOlderThanTxnId(-1));
-
- try {
- Utils.closeQuietly(lock);
- zooKeeperClient.get().exists(logMetadata.getLogSegmentsPath(), false);
- zooKeeperClient.get().exists(logMetadata.getMaxTxIdPath(), false);
- if (logMetadata.getLogRootPath().toLowerCase().contains("distributedlog")) {
- ZKUtil.deleteRecursive(zooKeeperClient.get(), logMetadata.getLogRootPath());
- } else {
- LOG.warn("Skip deletion of unrecognized ZK Path {}", logMetadata.getLogRootPath());
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while deleting log znodes", ie);
- throw new DLInterruptedException("Interrupted while deleting " + logMetadata.getLogRootPath(), ie);
- } catch (KeeperException ke) {
- LOG.error("Error deleting" + logMetadata.getLogRootPath() + " in zookeeper", ke);
- }
- }
-
Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
if (DLSN.InvalidDLSN == dlsn) {
List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
@@ -1321,33 +1256,29 @@
private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
final Promise<LogSegmentMetadata> promise) {
Transaction<Object> deleteTxn = metadataStore.transaction();
- metadataStore.deleteLogSegment(deleteTxn, segmentMetadata);
- deleteTxn.execute().addEventListener(new FutureEventListener<Void>() {
+ metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onCommit(Void r) {
// purge log segment
removeLogSegmentFromCache(segmentMetadata.getZNodeName());
promise.setValue(segmentMetadata);
}
@Override
- public void onFailure(Throwable cause) {
- if (cause instanceof ZKException) {
- ZKException zke = (ZKException) cause;
- if (KeeperException.Code.NONODE == zke.getKeeperExceptionCode()) {
- LOG.error("No log segment {} found for {}.",
- segmentMetadata, getFullyQualifiedName());
- // purge log segment
- removeLogSegmentFromCache(segmentMetadata.getZNodeName());
- promise.setValue(segmentMetadata);
- return;
- }
+ public void onAbort(Throwable t) {
+ if (t instanceof LogSegmentNotFoundException) {
+ // purge log segment
+ removeLogSegmentFromCache(segmentMetadata.getZNodeName());
+ promise.setValue(segmentMetadata);
+ return;
+ } else {
+ LOG.error("Couldn't purge {} for {}: with error {}",
+ new Object[]{ segmentMetadata, getFullyQualifiedName(), t });
+ promise.setException(t);
}
- LOG.error("Couldn't purge {} for {}: with error {}",
- new Object[]{ segmentMetadata, getFullyQualifiedName(), cause });
- promise.setException(cause);
}
});
+ deleteTxn.execute();
}
@Override
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index f4ca45e..0f6db75 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -62,7 +62,6 @@
StatsLogger statsLogger) {
this.readHandler = bkdlm.createReadHandler(
Optional.<String>absent(),
- bkdlm.getLockStateExecutor(true),
this,
conf.getDeserializeRecordSetOnReads(),
true);
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index 46a056b..6f37a59 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -1557,6 +1557,7 @@
*
* @return true if should check txn id with max txn id, otherwise false.
*/
+ @Deprecated
public boolean getSanityCheckTxnID() {
return getBoolean(BKDL_MAXID_SANITYCHECK, BKDL_MAXID_SANITYCHECK_DEFAULT);
}
@@ -1569,6 +1570,7 @@
* @return configuration.
* @see #getSanityCheckTxnID()
*/
+ @Deprecated
public DistributedLogConfiguration setSanityCheckTxnID(boolean enabled) {
setProperty(BKDL_MAXID_SANITYCHECK, enabled);
return this;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
index 80cf350..9bfaaba 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
@@ -17,26 +17,15 @@
*/
package com.twitter.distributedlog;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.util.DLUtils;
-import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
/**
* Utility class for storing and reading max ledger sequence number
*/
class MaxLogSegmentSequenceNo {
- static final Logger LOG = LoggerFactory.getLogger(MaxLogSegmentSequenceNo.class);
-
Version version;
long maxSeqNo;
@@ -55,24 +44,20 @@
if (null != logSegmentsData && null != logSegmentsData.getVersion()) {
version = logSegmentsData.getVersion();
} else {
- version = new ZkVersion(-1);
+ throw new IllegalStateException("Invalid MaxLogSegmentSequenceNo found - " + logSegmentsData);
}
}
}
- synchronized int getZkVersion() {
- return ((ZkVersion) version).getZnodeVersion();
+ synchronized Version getVersion() {
+ return version;
}
synchronized long getSequenceNumber() {
return maxSeqNo;
}
- synchronized MaxLogSegmentSequenceNo update(int zkVersion, long logSegmentSeqNo) {
- return update(new ZkVersion(zkVersion), logSegmentSeqNo);
- }
-
- synchronized MaxLogSegmentSequenceNo update(ZkVersion version, long logSegmentSeqNo) {
+ synchronized MaxLogSegmentSequenceNo update(Version version, long logSegmentSeqNo) {
if (version.compare(this.version) == Version.Occurred.AFTER) {
this.version = version;
this.maxSeqNo = logSegmentSeqNo;
@@ -80,21 +65,8 @@
return this;
}
- synchronized void store(ZooKeeperClient zkc, String path, long logSegmentSeqNo) throws IOException {
- try {
- Stat stat = zkc.get().setData(path,
- DLUtils.serializeLogSegmentSequenceNumber(logSegmentSeqNo), getZkVersion());
- update(stat.getVersion(), logSegmentSeqNo);
- } catch (KeeperException ke) {
- throw new ZKException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
- + path + " : ", ke);
- } catch (ZooKeeperClient.ZooKeeperConnectionException zce) {
- throw new IOException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
- + path + " : ", zce);
- } catch (InterruptedException e) {
- throw new DLInterruptedException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
- + path + " : ", e);
- }
+ public synchronized Versioned<Long> getVersionedData(long seqNo) {
+ return new Versioned<Long>(seqNo, version);
}
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
index c446a8b..ed7218e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
@@ -18,13 +18,11 @@
package com.twitter.distributedlog;
import com.twitter.distributedlog.util.DLUtils;
+import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
/**
* Utility class for storing and reading
* the max seen txid in zookeeper
@@ -32,73 +30,43 @@
class MaxTxId {
static final Logger LOG = LoggerFactory.getLogger(MaxTxId.class);
- private final ZooKeeperClient zkc;
- private final String path;
- private final boolean enabled;
-
+ private Version version;
private long currentMax;
- MaxTxId(ZooKeeperClient zkc, String path, boolean enabled,
- Versioned<byte[]> maxTxIdData) {
- this.zkc = zkc;
- this.path = path;
- this.enabled = enabled && null != maxTxIdData && null != maxTxIdData.getVersion()
- && null != maxTxIdData.getValue();
- if (this.enabled) {
+ MaxTxId(Versioned<byte[]> maxTxIdData) {
+ if (null != maxTxIdData
+ && null != maxTxIdData.getValue()
+ && null != maxTxIdData.getVersion()) {
+ this.version = maxTxIdData.getVersion();
try {
this.currentMax = DLUtils.deserializeTransactionId(maxTxIdData.getValue());
} catch (NumberFormatException e) {
- LOG.warn("Invalid txn id stored in {}", path, e);
- this.currentMax = 0L;
+ LOG.warn("Invalid txn id stored in {}", e);
+ this.currentMax = DistributedLogConstants.INVALID_TXID;
}
} else {
- this.currentMax = -1L;
+ this.currentMax = DistributedLogConstants.INVALID_TXID;
+ if (null != maxTxIdData && null != maxTxIdData.getVersion()) {
+ this.version = maxTxIdData.getVersion();
+ } else {
+ throw new IllegalStateException("Invalid MaxTxId found - " + maxTxIdData);
+ }
}
}
- String getZkPath() {
- return path;
- }
-
- synchronized void setMaxTxId(long txId) {
- if (enabled && this.currentMax < txId) {
+ synchronized void update(Version version, long txId) {
+ if (version.compare(this.version) == Version.Occurred.AFTER) {
+ this.version = version;
this.currentMax = txId;
}
}
- synchronized byte[] couldStore(long maxTxId) {
- if (enabled && currentMax < maxTxId) {
- return DLUtils.serializeTransactionId(maxTxId);
- } else {
- return null;
- }
- }
-
- /**
- * Store the highest TxID encountered so far so that we
- * can enforce the monotonically non-decreasing property
- * This is best effort as this enforcement is only done
- *
- * @param maxTxId - the maximum transaction id seen so far
- * @throws IOException
- */
- synchronized void store(long maxTxId) throws IOException {
- if (enabled && currentMax < maxTxId) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Setting maxTxId to " + maxTxId);
- }
- String txidStr = Long.toString(maxTxId);
- try {
- zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1);
- currentMax = maxTxId;
- } catch (Exception e) {
- LOG.error("Error writing new MaxTxId value {}", maxTxId, e);
- }
- }
- }
-
synchronized long get() {
return currentMax;
}
+ public Versioned<Long> getVersionedData(long txId) {
+ return new Versioned<Long>(Math.max(txId, currentMax), version);
+ }
+
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index f0d2797..1b831ea 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -23,12 +23,16 @@
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.callback.LogSegmentNamesListener;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Transaction;
+import com.twitter.distributedlog.util.Transaction.OpListener;
import com.twitter.distributedlog.zk.DefaultZKOp;
import com.twitter.distributedlog.zk.ZKOp;
import com.twitter.distributedlog.zk.ZKTransaction;
@@ -48,10 +52,8 @@
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -220,30 +222,28 @@
@Override
public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
- String path,
+ ZKLogMetadata logMetadata,
Versioned<Long> lssn,
Transaction.OpListener<Version> listener) {
Version version = lssn.getVersion();
assert(version instanceof ZkVersion);
-
ZkVersion zkVersion = (ZkVersion) version;
byte[] data = DLUtils.serializeLogSegmentSequenceNumber(lssn.getValue());
- Op setDataOp = Op.setData(path, data, zkVersion.getZnodeVersion());
+ Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, zkVersion.getZnodeVersion());
ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
txn.addOp(zkOp);
}
@Override
public void storeMaxTxnId(Transaction<Object> txn,
- String path,
+ ZKLogMetadataForWriter logMetadata,
Versioned<Long> transactionId,
Transaction.OpListener<Version> listener) {
Version version = transactionId.getVersion();
assert(version instanceof ZkVersion);
-
ZkVersion zkVersion = (ZkVersion) version;
byte[] data = DLUtils.serializeTransactionId(transactionId.getValue());
- Op setDataOp = Op.setData(path, data, zkVersion.getZnodeVersion());
+ Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, zkVersion.getZnodeVersion());
ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
txn.addOp(zkOp);
}
@@ -256,29 +256,66 @@
}
@Override
- public void createLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
+ public void createLogSegment(Transaction<Object> txn,
+ LogSegmentMetadata segment,
+ OpListener<Void> listener) {
byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
Op createOp = Op.create(
segment.getZkPath(),
finalisedData,
zkc.getDefaultACL(),
CreateMode.PERSISTENT);
- txn.addOp(DefaultZKOp.of(createOp));
+ txn.addOp(DefaultZKOp.of(createOp, listener));
}
@Override
- public void deleteLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
+ public void deleteLogSegment(Transaction<Object> txn,
+ final LogSegmentMetadata segment,
+ final OpListener<Void> listener) {
Op deleteOp = Op.delete(
segment.getZkPath(),
-1);
- txn.addOp(DefaultZKOp.of(deleteOp));
+ logger.info("Delete segment : {}", segment);
+ txn.addOp(DefaultZKOp.of(deleteOp, new OpListener<Void>() {
+ @Override
+ public void onCommit(Void r) {
+ if (null != listener) {
+ listener.onCommit(r);
+ }
+ }
+
+ @Override
+ public void onAbort(Throwable t) {
+ logger.info("Aborted transaction on deleting segment {}", segment);
+ KeeperException.Code kc;
+ if (t instanceof KeeperException) {
+ kc = ((KeeperException) t).code();
+ } else if (t instanceof ZKException) {
+ kc = ((ZKException) t).getKeeperExceptionCode();
+ } else {
+ abortListener(t);
+ return;
+ }
+ if (KeeperException.Code.NONODE == kc) {
+ abortListener(new LogSegmentNotFoundException(segment.getZkPath()));
+ return;
+ }
+ abortListener(t);
+ }
+
+ private void abortListener(Throwable t) {
+ if (null != listener) {
+ listener.onAbort(t);
+ }
+ }
+ }));
}
@Override
public void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
Op updateOp = Op.setData(segment.getZkPath(), finalisedData, -1);
- txn.addOp(DefaultZKOp.of(updateOp));
+ txn.addOp(DefaultZKOp.of(updateOp, null));
}
// reads
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
index 078c040..37beb16 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
@@ -29,6 +29,17 @@
}
/**
+ * Get the top stream path for a given log.
+ *
+ * @param uri namespace to store the log
+ * @param logName name of the log
+ * @return top stream path
+ */
+ public static String getLogStreamPath(URI uri, String logName) {
+ return String.format("%s/%s", uri.getPath(), logName);
+ }
+
+ /**
* Get the log root path for a given log.
*
* @param uri
@@ -59,14 +70,14 @@
}
protected static final int LAYOUT_VERSION = -1;
- protected final static String LOGSEGMENTS_PATH = "/ledgers";
- protected final static String VERSION_PATH = "/version";
+ public final static String LOGSEGMENTS_PATH = "/ledgers";
+ public final static String VERSION_PATH = "/version";
// writer znodes
- protected final static String MAX_TXID_PATH = "/maxtxid";
- protected final static String LOCK_PATH = "/lock";
- protected final static String ALLOCATION_PATH = "/allocation";
+ public final static String MAX_TXID_PATH = "/maxtxid";
+ public final static String LOCK_PATH = "/lock";
+ public final static String ALLOCATION_PATH = "/allocation";
// reader znodes
- protected final static String READ_LOCK_PATH = "/readLock";
+ public final static String READ_LOCK_PATH = "/readLock";
protected final URI uri;
protected final String logName;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
index 1de712f..9a1548c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
@@ -17,312 +17,15 @@
*/
package com.twitter.distributedlog.impl.metadata;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.exceptions.LogExistsException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import java.io.File;
import java.net.URI;
-import java.util.List;
/**
* Log Metadata for writer
*/
public class ZKLogMetadataForWriter extends ZKLogMetadata {
- static final Logger LOG = LoggerFactory.getLogger(ZKLogMetadataForWriter.class);
-
- static class MetadataIndex {
- static final int LOG_ROOT_PARENT = 0;
- static final int LOG_ROOT = 1;
- static final int MAX_TXID = 2;
- static final int VERSION = 3;
- static final int LOCK = 4;
- static final int READ_LOCK = 5;
- static final int LOGSEGMENTS = 6;
- static final int ALLOCATION = 7;
- }
-
- static int bytesToInt(byte[] b) {
- assert b.length >= 4;
- return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
- }
-
- static byte[] intToBytes(int i) {
- return new byte[]{
- (byte) (i >> 24),
- (byte) (i >> 16),
- (byte) (i >> 8),
- (byte) (i)};
- }
-
- static boolean pathExists(Versioned<byte[]> metadata) {
- return null != metadata.getValue() && null != metadata.getVersion();
- }
-
- static void ensureMetadataExist(Versioned<byte[]> metadata) {
- Preconditions.checkNotNull(metadata.getValue());
- Preconditions.checkNotNull(metadata.getVersion());
- }
-
- public static Future<ZKLogMetadataForWriter> of(
- final URI uri,
- final String logName,
- final String logIdentifier,
- final ZooKeeper zk,
- final List<ACL> acl,
- final boolean ownAllocator,
- final boolean createIfNotExists) {
- final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier);
- try {
- PathUtils.validatePath(logRootPath);
- } catch (IllegalArgumentException e) {
- LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
- return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
- }
-
- return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
- .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
- @Override
- public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
- Promise<List<Versioned<byte[]>>> promise =
- new Promise<List<Versioned<byte[]>>>();
- createMissingMetadata(zk, logRootPath, metadatas, acl,
- ownAllocator, createIfNotExists, promise);
- return promise;
- }
- }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() {
- @Override
- public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
- return processLogMetadatas(uri, logName, logIdentifier, metadatas, ownAllocator);
- }
- });
- }
-
- static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
- String logRootPath,
- boolean ownAllocator) {
- // Note re. persistent lock state initialization: the read lock persistent state (path) is
- // initialized here but only used in the read handler. The reason is its more convenient and
- // less error prone to manage all stream structure in one place.
- final String logRootParentPath = new File(logRootPath).getParent();
- final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
- final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
- final String lockPath = logRootPath + LOCK_PATH;
- final String readLockPath = logRootPath + READ_LOCK_PATH;
- final String versionPath = logRootPath + VERSION_PATH;
- final String allocationPath = logRootPath + ALLOCATION_PATH;
-
- int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
- List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
- checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
- checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
- checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
- checkFutures.add(Utils.zkGetData(zk, versionPath, false));
- checkFutures.add(Utils.zkGetData(zk, lockPath, false));
- checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
- checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
- if (ownAllocator) {
- checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
- }
-
- return Future.collect(checkFutures);
- }
-
- static void createMissingMetadata(final ZooKeeper zk,
- final String logRootPath,
- final List<Versioned<byte[]>> metadatas,
- final List<ACL> acl,
- final boolean ownAllocator,
- final boolean createIfNotExists,
- final Promise<List<Versioned<byte[]>>> promise) {
- final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
- final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
- CreateMode createMode = CreateMode.PERSISTENT;
-
- // log root parent path
- if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
- pathsToCreate.add(null);
- } else {
- String logRootParentPath = new File(logRootPath).getParent();
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
-
- // log root path
- if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
- pathsToCreate.add(null);
- } else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
-
- // max id
- if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
- pathsToCreate.add(null);
- } else {
- byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
- pathsToCreate.add(zeroTxnIdData);
- zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
- }
- // version
- if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
- pathsToCreate.add(null);
- } else {
- byte[] versionData = intToBytes(LAYOUT_VERSION);
- pathsToCreate.add(versionData);
- zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
- }
- // lock path
- if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
- pathsToCreate.add(null);
- } else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
- // read lock path
- if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
- pathsToCreate.add(null);
- } else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
- // log segments path
- if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
- pathsToCreate.add(null);
- } else {
- byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
- DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
- pathsToCreate.add(logSegmentsData);
- zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
- }
- // allocation path
- if (ownAllocator) {
- if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
- pathsToCreate.add(null);
- } else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
- DistributedLogConstants.EMPTY_BYTES, acl, createMode));
- }
- }
- if (zkOps.isEmpty()) {
- // nothing missed
- promise.setValue(metadatas);
- return;
- }
- if (!createIfNotExists) {
- promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
- return;
- }
-
- zk.multi(zkOps, new AsyncCallback.MultiCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
- if (KeeperException.Code.OK.intValue() == rc) {
- List<Versioned<byte[]>> finalMetadatas =
- Lists.newArrayListWithExpectedSize(metadatas.size());
- for (int i = 0; i < pathsToCreate.size(); i++) {
- byte[] dataCreated = pathsToCreate.get(i);
- if (null == dataCreated) {
- finalMetadatas.add(metadatas.get(i));
- } else {
- finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
- }
- }
- promise.setValue(finalMetadatas);
- } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
- promise.setException(new LogExistsException("Someone just created log "
- + logRootPath));
- } else {
- if (LOG.isDebugEnabled()) {
- StringBuilder builder = new StringBuilder();
- for (OpResult result : resultList) {
- if (result instanceof OpResult.ErrorResult) {
- OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
- builder.append(errorResult.getErr()).append(",");
- } else {
- builder.append(0).append(",");
- }
- }
- String resultCodeList = builder.substring(0, builder.length() - 1);
- LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
- }
-
- promise.setException(new ZKException("Failed to create log " + logRootPath,
- KeeperException.Code.get(rc)));
- }
- }
- }, null);
- }
-
- static ZKLogMetadataForWriter processLogMetadatas(URI uri,
- String logName,
- String logIdentifier,
- List<Versioned<byte[]>> metadatas,
- boolean ownAllocator)
- throws UnexpectedException {
- try {
- // max id
- Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
- ensureMetadataExist(maxTxnIdData);
- // version
- Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
- ensureMetadataExist(maxTxnIdData);
- Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
- // lock path
- ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
- // read lock path
- ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
- // max lssn
- Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
- ensureMetadataExist(maxLSSNData);
- try {
- DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
- } catch (NumberFormatException nfe) {
- throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
- }
- // allocation path
- Versioned<byte[]> allocationData;
- if (ownAllocator) {
- allocationData = metadatas.get(MetadataIndex.ALLOCATION);
- ensureMetadataExist(allocationData);
- } else {
- allocationData = new Versioned<byte[]>(null, null);
- }
- return new ZKLogMetadataForWriter(uri, logName, logIdentifier,
- maxLSSNData, maxTxnIdData, allocationData);
- } catch (IllegalArgumentException iae) {
- throw new UnexpectedException("Invalid log " + logName, iae);
- } catch (NullPointerException npe) {
- throw new UnexpectedException("Invalid log " + logName, npe);
- }
- }
-
private final Versioned<byte[]> maxLSSNData;
private final Versioned<byte[]> maxTxIdData;
private final Versioned<byte[]> allocationData;
@@ -334,12 +37,12 @@
* @param logName name of the log
* @param logIdentifier identifier of the log
*/
- private ZKLogMetadataForWriter(URI uri,
- String logName,
- String logIdentifier,
- Versioned<byte[]> maxLSSNData,
- Versioned<byte[]> maxTxIdData,
- Versioned<byte[]> allocationData) {
+ public ZKLogMetadataForWriter(URI uri,
+ String logName,
+ String logIdentifier,
+ Versioned<byte[]> maxLSSNData,
+ Versioned<byte[]> maxTxIdData,
+ Versioned<byte[]> allocationData) {
super(uri, logName, logIdentifier);
this.maxLSSNData = maxLSSNData;
this.maxTxIdData = maxTxIdData;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
new file mode 100644
index 0000000..d89dddb
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
+import com.twitter.distributedlog.exceptions.LockCancelledException;
+import com.twitter.distributedlog.exceptions.LogExistsException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
+import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.lock.SessionLockFactory;
+import com.twitter.distributedlog.lock.ZKDistributedLock;
+import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.SchedulerUtils;
+import com.twitter.distributedlog.zk.LimitedPermitManager;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.PermitManager;
+import com.twitter.distributedlog.util.Transaction;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.distributedlog.zk.ZKTransaction;
+import com.twitter.util.ExceptionalFunction;
+import com.twitter.util.ExceptionalFunction0;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+
+/**
+ * zookeeper based {@link LogStreamMetadataStore}
+ */
+public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
+
+ private final static Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);
+
+ private final String clientId;
+ private final DistributedLogConfiguration conf;
+ private final ZooKeeperClient zooKeeperClient;
+ private final OrderedScheduler scheduler;
+ private final StatsLogger statsLogger;
+ private final LogSegmentMetadataStore logSegmentStore;
+ private final LimitedPermitManager permitManager;
+ // lock
+ private SessionLockFactory lockFactory;
+ private OrderedScheduler lockStateExecutor;
+
+ public ZKLogStreamMetadataStore(String clientId,
+ DistributedLogConfiguration conf,
+ ZooKeeperClient zkc,
+ OrderedScheduler scheduler,
+ StatsLogger statsLogger) {
+ this.clientId = clientId;
+ this.conf = conf;
+ this.zooKeeperClient = zkc;
+ this.scheduler = scheduler;
+ this.statsLogger = statsLogger;
+ // create the log segment metadata store and the permit manager (used for log segment rolling)
+ this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler);
+ this.permitManager = new LimitedPermitManager(
+ conf.getLogSegmentRollingConcurrency(),
+ 1,
+ TimeUnit.MINUTES,
+ scheduler);
+ this.zooKeeperClient.register(permitManager);
+ }
+
+ private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
+ if (createIfNull && null == lockStateExecutor) {
+ StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler");
+ lockStateExecutor = OrderedScheduler.newBuilder()
+ .name("DLM-LockState")
+ .corePoolSize(conf.getNumLockStateThreads())
+ .statsLogger(lockStateStatsLogger)
+ .perExecutorStatsLogger(lockStateStatsLogger)
+ .traceTaskExecution(conf.getEnableTaskExecutionStats())
+ .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
+ .build();
+ }
+ return lockStateExecutor;
+ }
+
+ private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
+ if (createIfNull && null == lockFactory) {
+ lockFactory = new ZKSessionLockFactory(
+ zooKeeperClient,
+ clientId,
+ getLockStateExecutor(createIfNull),
+ conf.getZKNumRetries(),
+ conf.getLockTimeoutMilliSeconds(),
+ conf.getZKRetryBackoffStartMillis(),
+ statsLogger);
+ }
+ return lockFactory;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.zooKeeperClient.unregister(permitManager);
+ this.permitManager.close();
+ this.logSegmentStore.close();
+ SchedulerUtils.shutdownScheduler(
+ getLockStateExecutor(false),
+ conf.getSchedulerShutdownTimeoutMs(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public LogSegmentMetadataStore getLogSegmentMetadataStore() {
+ return logSegmentStore;
+ }
+
+ @Override
+ public PermitManager getPermitManager() {
+ return this.permitManager;
+ }
+
+ @Override
+ public Transaction<Object> newTransaction() {
+ return new ZKTransaction(zooKeeperClient);
+ }
+
+ @Override
+ public Future<Void> logExists(URI uri, final String logName) {
+ final String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath(
+ uri, logName, conf.getUnpartitionedStreamName());
+ final Promise<Void> promise = new Promise<Void>();
+ try {
+ final ZooKeeper zk = zooKeeperClient.get();
+ zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int syncRc, String path, Object syncCtx) {
+ if (KeeperException.Code.NONODE.intValue() == syncRc) {
+ promise.setException(new LogNotFoundException(
+ String.format("Log %s does not exist or has been deleted", logName)));
+ return;
+ } else if (KeeperException.Code.OK.intValue() != syncRc){
+ promise.setException(new ZKException("Error on checking log existence for " + logName,
+ KeeperException.create(KeeperException.Code.get(syncRc))));
+ return;
+ }
+ zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ promise.setValue(null);
+ } else if (KeeperException.Code.NONODE.intValue() == rc) {
+ promise.setException(new LogNotFoundException(
+ String.format("Log %s does not exist or has been deleted", logName)));
+ } else {
+ promise.setException(new ZKException("Error on checking log existence for " + logName,
+ KeeperException.create(KeeperException.Code.get(rc))));
+ }
+ }
+ }, null);
+ }
+ }, null);
+
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
+ promise.setException(new DLInterruptedException("Interrupted while checking "
+ + logSegmentsPath, ie));
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ promise.setException(e);
+ }
+ return promise;
+ }
+
+ //
+ // Create Write Lock
+ //
+
+ @Override
+ public DistributedLock createWriteLock(ZKLogMetadataForWriter metadata) {
+ return new ZKDistributedLock(
+ getLockStateExecutor(true),
+ getLockFactory(true),
+ metadata.getLockPath(),
+ conf.getLockTimeoutMilliSeconds(),
+ statsLogger);
+ }
+
+ //
+ // Create Read Lock
+ //
+
+ private Future<Void> ensureReadLockPathExist(final ZKLogMetadata logMetadata,
+ final String readLockPath) {
+ final Promise<Void> promise = new Promise<Void>();
+ promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Throwable t) {
+ FutureUtils.setException(promise, new LockCancelledException(readLockPath,
+ "Could not ensure read lock path", t));
+ return null;
+ }
+ });
+ Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
+ Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
+ new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
+ new org.apache.zookeeper.AsyncCallback.StringCallback() {
+ @Override
+ public void processResult(final int rc, final String path, Object ctx, String name) {
+ if (KeeperException.Code.NONODE.intValue() == rc) {
+ FutureUtils.setException(promise, new LogNotFoundException(
+ String.format("Log %s does not exist or has been deleted",
+ logMetadata.getFullyQualifiedName())));
+ } else if (KeeperException.Code.OK.intValue() == rc) {
+ FutureUtils.setValue(promise, null);
+ LOG.trace("Created path {}.", path);
+ } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+ FutureUtils.setValue(promise, null);
+ LOG.trace("Path {} is already existed.", path);
+ } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
+ FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
+ } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
+ FutureUtils.setException(promise, new DLInterruptedException(path));
+ } else {
+ FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
+ }
+ }
+ }, null);
+ return promise;
+ }
+
+ @Override
+ public Future<DistributedLock> createReadLock(final ZKLogMetadataForReader metadata,
+ Optional<String> readerId) {
+ final String readLockPath = metadata.getReadLockPath(readerId);
+ return ensureReadLockPathExist(metadata, readLockPath).flatMap(
+ new ExceptionalFunction<Void, Future<DistributedLock>>() {
+ @Override
+ public Future<DistributedLock> applyE(Void value) throws Throwable {
+ // Unfortunately this has a blocking call which we should not execute on the
+ // ZK completion thread
+ return scheduler.apply(new ExceptionalFunction0<DistributedLock>() {
+ @Override
+ public DistributedLock applyE() throws Throwable {
+ return new ZKDistributedLock(
+ getLockStateExecutor(true),
+ getLockFactory(true),
+ readLockPath,
+ conf.getLockTimeoutMilliSeconds(),
+ statsLogger.scope("read_lock"));
+ }
+ });
+ }
+ });
+ }
+
+ //
+ // Create Log
+ //
+
+ static class MetadataIndex {
+ static final int LOG_ROOT_PARENT = 0;
+ static final int LOG_ROOT = 1;
+ static final int MAX_TXID = 2;
+ static final int VERSION = 3;
+ static final int LOCK = 4;
+ static final int READ_LOCK = 5;
+ static final int LOGSEGMENTS = 6;
+ static final int ALLOCATION = 7;
+ }
+
+ static int bytesToInt(byte[] b) {
+ assert b.length >= 4;
+ return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
+ }
+
+ static byte[] intToBytes(int i) {
+ return new byte[]{
+ (byte) (i >> 24),
+ (byte) (i >> 16),
+ (byte) (i >> 8),
+ (byte) (i)};
+ }
+
+ static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
+ String logRootPath,
+ boolean ownAllocator) {
+ // Note re. persistent lock state initialization: the read lock persistent state (path) is
+ // initialized here but only used in the read handler. The reason is its more convenient and
+ // less error prone to manage all stream structure in one place.
+ final String logRootParentPath = new File(logRootPath).getParent();
+ final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+ final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
+ final String lockPath = logRootPath + LOCK_PATH;
+ final String readLockPath = logRootPath + READ_LOCK_PATH;
+ final String versionPath = logRootPath + VERSION_PATH;
+ final String allocationPath = logRootPath + ALLOCATION_PATH;
+
+ int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
+ List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
+ checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
+ checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
+ checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
+ checkFutures.add(Utils.zkGetData(zk, versionPath, false));
+ checkFutures.add(Utils.zkGetData(zk, lockPath, false));
+ checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
+ checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
+ if (ownAllocator) {
+ checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
+ }
+
+ return Future.collect(checkFutures);
+ }
+
+ static boolean pathExists(Versioned<byte[]> metadata) {
+ return null != metadata.getValue() && null != metadata.getVersion();
+ }
+
+ static void ensureMetadataExist(Versioned<byte[]> metadata) {
+ Preconditions.checkNotNull(metadata.getValue());
+ Preconditions.checkNotNull(metadata.getVersion());
+ }
+
+ static void createMissingMetadata(final ZooKeeper zk,
+ final String logRootPath,
+ final List<Versioned<byte[]>> metadatas,
+ final List<ACL> acl,
+ final boolean ownAllocator,
+ final boolean createIfNotExists,
+ final Promise<List<Versioned<byte[]>>> promise) {
+ final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
+ final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
+ CreateMode createMode = CreateMode.PERSISTENT;
+
+ // log root parent path
+ if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
+ pathsToCreate.add(null);
+ } else {
+ String logRootParentPath = new File(logRootPath).getParent();
+ pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+ zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ }
+
+ // log root path
+ if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
+ pathsToCreate.add(null);
+ } else {
+ pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+ zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ }
+
+ // max id
+ if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
+ pathsToCreate.add(null);
+ } else {
+ byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
+ pathsToCreate.add(zeroTxnIdData);
+ zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
+ }
+ // version
+ if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
+ pathsToCreate.add(null);
+ } else {
+ byte[] versionData = intToBytes(LAYOUT_VERSION);
+ pathsToCreate.add(versionData);
+ zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
+ }
+ // lock path
+ if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
+ pathsToCreate.add(null);
+ } else {
+ pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+ zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ }
+ // read lock path
+ if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
+ pathsToCreate.add(null);
+ } else {
+ pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+ zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ }
+ // log segments path
+ if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
+ pathsToCreate.add(null);
+ } else {
+ byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
+ DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
+ pathsToCreate.add(logSegmentsData);
+ zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
+ }
+ // allocation path
+ if (ownAllocator) {
+ if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
+ pathsToCreate.add(null);
+ } else {
+ pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+ zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
+ DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ }
+ }
+ if (zkOps.isEmpty()) {
+ // nothing missed
+ promise.setValue(metadatas);
+ return;
+ }
+ if (!createIfNotExists) {
+ promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
+ return;
+ }
+
+ zk.multi(zkOps, new AsyncCallback.MultiCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ List<Versioned<byte[]>> finalMetadatas =
+ Lists.newArrayListWithExpectedSize(metadatas.size());
+ for (int i = 0; i < pathsToCreate.size(); i++) {
+ byte[] dataCreated = pathsToCreate.get(i);
+ if (null == dataCreated) {
+ finalMetadatas.add(metadatas.get(i));
+ } else {
+ finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
+ }
+ }
+ promise.setValue(finalMetadatas);
+ } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+ promise.setException(new LogExistsException("Someone just created log "
+ + logRootPath));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ StringBuilder builder = new StringBuilder();
+ for (OpResult result : resultList) {
+ if (result instanceof OpResult.ErrorResult) {
+ OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
+ builder.append(errorResult.getErr()).append(",");
+ } else {
+ builder.append(0).append(",");
+ }
+ }
+ String resultCodeList = builder.substring(0, builder.length() - 1);
+ LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
+ }
+
+ promise.setException(new ZKException("Failed to create log " + logRootPath,
+ KeeperException.Code.get(rc)));
+ }
+ }
+ }, null);
+ }
+
+ static ZKLogMetadataForWriter processLogMetadatas(URI uri,
+ String logName,
+ String logIdentifier,
+ List<Versioned<byte[]>> metadatas,
+ boolean ownAllocator)
+ throws UnexpectedException {
+ try {
+ // max id
+ Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
+ ensureMetadataExist(maxTxnIdData);
+ // version
+ Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
+ ensureMetadataExist(maxTxnIdData);
+ Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
+ // lock path
+ ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
+ // read lock path
+ ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
+ // max lssn
+ Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
+ ensureMetadataExist(maxLSSNData);
+ try {
+ DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
+ } catch (NumberFormatException nfe) {
+ throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
+ }
+ // allocation path
+ Versioned<byte[]> allocationData;
+ if (ownAllocator) {
+ allocationData = metadatas.get(MetadataIndex.ALLOCATION);
+ ensureMetadataExist(allocationData);
+ } else {
+ allocationData = new Versioned<byte[]>(null, null);
+ }
+ return new ZKLogMetadataForWriter(uri, logName, logIdentifier,
+ maxLSSNData, maxTxnIdData, allocationData);
+ } catch (IllegalArgumentException iae) {
+ throw new UnexpectedException("Invalid log " + logName, iae);
+ } catch (NullPointerException npe) {
+ throw new UnexpectedException("Invalid log " + logName, npe);
+ }
+ }
+
+ static Future<ZKLogMetadataForWriter> getLog(final URI uri,
+ final String logName,
+ final String logIdentifier,
+ final ZooKeeperClient zooKeeperClient,
+ final boolean ownAllocator,
+ final boolean createIfNotExists) {
+ final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier);
+ try {
+ PathUtils.validatePath(logRootPath);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
+ return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
+ }
+
+ try {
+ final ZooKeeper zk = zooKeeperClient.get();
+ return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
+ .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
+ @Override
+ public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
+ Promise<List<Versioned<byte[]>>> promise =
+ new Promise<List<Versioned<byte[]>>>();
+ createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
+ ownAllocator, createIfNotExists, promise);
+ return promise;
+ }
+ }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() {
+ @Override
+ public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
+ return processLogMetadatas(
+ uri,
+ logName,
+ logIdentifier,
+ metadatas,
+ ownAllocator);
+ }
+ });
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName,
+ KeeperException.Code.CONNECTIONLOSS));
+ } catch (InterruptedException e) {
+ return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
+ }
+ }
+
+ @Override
+ public Future<ZKLogMetadataForWriter> getLog(final URI uri,
+ final String logName,
+ final boolean ownAllocator,
+ final boolean createIfNotExists) {
+ return getLog(
+ uri,
+ logName,
+ conf.getUnpartitionedStreamName(),
+ zooKeeperClient,
+ ownAllocator,
+ createIfNotExists);
+ }
+
+ //
+ // Delete Log
+ //
+
+ @Override
+ public Future<Void> deleteLog(URI uri, final String logName) {
+ final Promise<Void> promise = new Promise<Void>();
+ try {
+ String streamPath = ZKLogMetadata.getLogStreamPath(uri, logName);
+ ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (KeeperException.Code.OK.intValue() != rc) {
+ FutureUtils.setException(promise,
+ new ZKException("Encountered zookeeper issue on deleting log stream "
+ + logName, KeeperException.Code.get(rc)));
+ return;
+ }
+ FutureUtils.setValue(promise, null);
+ }
+ }, null);
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+ + logName, KeeperException.Code.CONNECTIONLOSS));
+ } catch (InterruptedException e) {
+ FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream "
+ + logName));
+ } catch (KeeperException e) {
+ FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+ + logName, e));
+ }
+ return promise;
+ }
+}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
index 2ea1671..5144634 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
@@ -20,6 +20,8 @@
import com.google.common.annotations.Beta;
import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.callback.LogSegmentNamesListener;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.util.Transaction;
import com.twitter.distributedlog.util.Transaction.OpListener;
import com.twitter.util.Future;
@@ -52,15 +54,15 @@
*
* @param txn
* transaction to execute for storing log segment sequence number.
- * @param path
- * path to store sequence number
+ * @param logMetadata
+ * metadata of the log stream
* @param sequenceNumber
* log segment sequence number to store
* @param listener
* listener on the result to this operation
*/
void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
- String path,
+ ZKLogMetadata logMetadata,
Versioned<Long> sequenceNumber,
OpListener<Version> listener);
@@ -69,15 +71,15 @@
*
* @param txn
* transaction to execute for storing transaction id
- * @param path
- * path to store sequence number
+ * @param logMetadata
+ * metadata of the log stream
* @param transactionId
* transaction id to store
* @param listener
* listener on the result to this operation
*/
void storeMaxTxnId(Transaction<Object> txn,
- String path,
+ ZKLogMetadataForWriter logMetadata,
Versioned<Long> transactionId,
OpListener<Version> listener);
@@ -91,8 +93,12 @@
* transaction to execute for this operation
* @param segment
* segment to create
+ * @param opListener
+ * the listener on the operation result
*/
- void createLogSegment(Transaction<Object> txn, LogSegmentMetadata segment);
+ void createLogSegment(Transaction<Object> txn,
+ LogSegmentMetadata segment,
+ OpListener<Void> opListener);
/**
* Delete a log segment <code>segment</code> under transaction <code>txn</code>.
@@ -105,7 +111,9 @@
* @param segment
* segment to delete
*/
- void deleteLogSegment(Transaction<Object> txn, LogSegmentMetadata segment);
+ void deleteLogSegment(Transaction<Object> txn,
+ LogSegmentMetadata segment,
+ OpListener<Void> opListener);
/**
* Update a log segment <code>segment</code> under transaction <code>txn</code>.
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
index baa3240..ac36ef2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
@@ -51,16 +51,15 @@
new ConcurrentHashMap<URI, DLConfig>();
public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) {
- dlConf.setSanityCheckTxnID(bkdlConfig.getSanityCheckTxnID());
dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID());
dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo());
if (bkdlConfig.isFederatedNamespace()) {
dlConf.setCreateStreamIfNotExists(false);
LOG.info("Disabled createIfNotExists for federated namespace.");
}
- LOG.info("Propagate BKDLConfig to DLConfig : sanityCheckTxnID = {}, encodeRegionID = {}," +
+ LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," +
" firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.",
- new Object[] { dlConf.getSanityCheckTxnID(), dlConf.getEncodeRegionIDInLogSegmentMetadata(),
+ new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(),
dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(),
bkdlConfig.isFederatedNamespace() });
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
index d205b3a..0e5e6d4 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
@@ -177,8 +177,8 @@
protected void addNewSegmentAndDeleteOldSegment(Transaction<Object> txn,
LogSegmentMetadata newSegment,
LogSegmentMetadata oldSegment) {
- metadataStore.deleteLogSegment(txn, oldSegment);
- metadataStore.createLogSegment(txn, newSegment);
+ metadataStore.deleteLogSegment(txn, oldSegment, null);
+ metadataStore.createLogSegment(txn, newSegment, null);
}
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
new file mode 100644
index 0000000..db7812e
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.metadata;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.util.PermitManager;
+import com.twitter.distributedlog.util.Transaction;
+import com.twitter.util.Future;
+
+import java.io.Closeable;
+import java.net.URI;
+
+/**
+ * The interface to manage the log stream metadata. The implementation is responsible
+ * for creating the metadata layout.
+ */
+@Beta
+public interface LogStreamMetadataStore extends Closeable {
+
+ /**
+ * Create a transaction for the metadata operations happening in the metadata store.
+ *
+ * @return transaction for the metadata operations
+ */
+ Transaction<Object> newTransaction();
+
+ /**
+ * Ensure the existence of a log stream
+ *
+ * @param uri the location of the log stream
+ * @param logName the name of the log stream
+ * @return future represents the existence of a log stream. {@link com.twitter.distributedlog.LogNotFoundException}
+ * is thrown if the log doesn't exist
+ */
+ Future<Void> logExists(URI uri, String logName);
+
+ /**
+ * Create the read lock for the log stream.
+ *
+ * @param metadata the metadata for a log stream
+ * @param readerId the reader id used for lock
+ * @return the read lock
+ */
+ Future<DistributedLock> createReadLock(ZKLogMetadataForReader metadata,
+ Optional<String> readerId);
+
+ /**
+ * Create the write lock for the log stream.
+ *
+ * @param metadata the metadata for a log stream
+ * @return the write lock
+ */
+ DistributedLock createWriteLock(ZKLogMetadataForWriter metadata);
+
+ /**
+ * Create the metadata of a log.
+ *
+ * @param uri the location to store the metadata of the log
+ * @param streamName the name of the log stream
+ * @param ownAllocator whether to use its own allocator or external allocator
+ * @param createIfNotExists flag to create the stream if it doesn't exist
+ * @return the metadata of the log
+ */
+ Future<ZKLogMetadataForWriter> getLog(URI uri,
+ String streamName,
+ boolean ownAllocator,
+ boolean createIfNotExists);
+
+ /**
+ * Delete the metadata of a log.
+ *
+ * @param uri the location to store the metadata of the log
+ * @param streamName the name of the log stream
+ * @return future represents the result of the deletion.
+ */
+ Future<Void> deleteLog(URI uri, String streamName);
+
+ /**
+ * Get the log segment metadata store.
+ *
+ * @return the log segment metadata store.
+ */
+ LogSegmentMetadataStore getLogSegmentMetadataStore();
+
+ /**
+ * Get the permit manager for this metadata store. It can be used for limiting the concurrent
+ * metadata operations. The implementation can disable handing over the permits when the metadata
+ * store is unavailable (for example zookeeper session expired).
+ *
+ * @return the permit manager
+ */
+ PermitManager getPermitManager();
+
+
+
+}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index bed2fcd..23d8e40 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -267,7 +267,8 @@
protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws IOException {
DistributedLogNamespace namespace = getFactory().getNamespace();
assert(namespace instanceof BKDistributedLogNamespace);
- return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore();
+ return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+ .getLogSegmentMetadataStore();
}
protected ZooKeeperClient getZooKeeperClient() throws IOException {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
index 7d76f29..78292e9 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
@@ -17,29 +17,39 @@
*/
package com.twitter.distributedlog.zk;
+import com.twitter.distributedlog.util.Transaction.OpListener;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
+import javax.annotation.Nullable;
+
/**
* Default zookeeper operation. No action on commiting or aborting.
*/
public class DefaultZKOp extends ZKOp {
- public static DefaultZKOp of(Op op) {
- return new DefaultZKOp(op);
+ public static DefaultZKOp of(Op op, OpListener<Void> listener) {
+ return new DefaultZKOp(op, listener);
}
- private DefaultZKOp(Op op) {
+ private final OpListener<Void> listener;
+
+ private DefaultZKOp(Op op, @Nullable OpListener<Void> opListener) {
super(op);
+ this.listener = opListener;
}
@Override
protected void commitOpResult(OpResult opResult) {
- // no-op
+ if (null != listener) {
+ listener.onCommit(null);
+ }
}
@Override
protected void abortOpResult(Throwable t, OpResult opResult) {
- // no-op
+ if (null != listener) {
+ listener.onAbort(t);
+ }
}
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
similarity index 98%
rename from distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
rename to distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
index dc25023..78ff0a2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
@@ -15,8 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.twitter.distributedlog.util;
+package com.twitter.distributedlog.zk;
+import com.twitter.distributedlog.util.PermitManager;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
index d885593..5b788e2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
@@ -33,7 +33,8 @@
private final OpListener<Version> listener;
- public ZKVersionedSetOp(Op op, OpListener<Version> opListener) {
+ public ZKVersionedSetOp(Op op,
+ @Nullable OpListener<Version> opListener) {
super(op);
this.listener = opListener;
}
@@ -42,7 +43,9 @@
protected void commitOpResult(OpResult opResult) {
assert(opResult instanceof OpResult.SetDataResult);
OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult;
- listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
+ if (null != listener) {
+ listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
+ }
}
@Override
@@ -60,7 +63,9 @@
cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
}
}
- listener.onAbort(cause);
+ if (null != listener) {
+ listener.onAbort(cause);
+ }
}
}
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
index c588cd7..1485ae6 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
@@ -36,6 +36,7 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
@@ -429,7 +430,7 @@
.setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion))
.build();
l.write(dlm.writerZKC);
- writeHandler.maxTxId.store(startTxID);
+ writeHandler.maxTxId.update(Version.ANY, startTxID);
writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
BKLogSegmentWriter writer = new BKLogSegmentWriter(
writeHandler.getFullyQualifiedName(),
@@ -479,7 +480,7 @@
.setInprogress(false)
.build();
l.write(dlm.writerZKC);
- writeHandler.maxTxId.store(startTxID);
+ writeHandler.maxTxId.update(Version.ANY, startTxID);
writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
BKLogSegmentWriter writer = new BKLogSegmentWriter(
writeHandler.getFullyQualifiedName(),
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
index d45a727..6aa38c3 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
@@ -21,6 +21,7 @@
import java.net.URI;
import com.twitter.distributedlog.exceptions.BKTransmitException;
+import com.twitter.distributedlog.util.FutureUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -203,7 +204,7 @@
BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
URI uri = createDLMURI("/" + name);
- BKDistributedLogManager.createLog(conf, dlm.getReaderZKC(), uri, name);
+ FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true));
// Log exists but is empty, better not throw.
AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
@@ -264,7 +265,7 @@
BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
URI uri = createDLMURI("/" + name);
- BKDistributedLogManager.createLog(conf, dlm.getReaderZKC(), uri, name);
+ FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true));
// Log exists but is empty, better not throw.
AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
index 6ad9950..c8a1c74 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
@@ -148,61 +148,6 @@
}
@Test(timeout = 60000)
- public void testSanityCheckTxnID() throws Exception {
- String name = "distrlog-sanity-check-txnid";
- BKDistributedLogManager dlm = createNewDLM(conf, name);
- BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned();
- long txid = 1;
- for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) {
- LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
- out.write(op);
- }
- out.closeAndComplete();
-
- BKSyncLogWriter out1 = dlm.startLogSegmentNonPartitioned();
- LogRecord op1 = DLMTestUtil.getLogRecordInstance(1);
- try {
- out1.write(op1);
- fail("Should fail writing lower txn id if sanityCheckTxnID is enabled.");
- } catch (TransactionIdOutOfOrderException tioooe) {
- // expected
- }
- out1.closeAndComplete();
- dlm.close();
-
- DLMTestUtil.updateBKDLConfig(bkutil.getUri(), bkutil.getZkServers(), bkutil.getBkLedgerPath(), false);
- LOG.info("Disable sanity check txn id.");
- BKDLConfig.clearCachedDLConfigs();
-
- DistributedLogConfiguration newConf = new DistributedLogConfiguration();
- newConf.addConfiguration(conf);
- BKDistributedLogManager newDLM = createNewDLM(newConf, name);
- BKSyncLogWriter out2 = newDLM.startLogSegmentNonPartitioned();
- LogRecord op2 = DLMTestUtil.getLogRecordInstance(1);
- out2.write(op2);
- out2.closeAndComplete();
- newDLM.close();
-
- DLMTestUtil.updateBKDLConfig(bkutil.getUri(), bkutil.getZkServers(), bkutil.getBkLedgerPath(), true);
- LOG.info("Enable sanity check txn id.");
- BKDLConfig.clearCachedDLConfigs();
-
- DistributedLogConfiguration conf3 = new DistributedLogConfiguration();
- conf3.addConfiguration(conf);
- BKDistributedLogManager dlm3 = createNewDLM(newConf, name);
- BKSyncLogWriter out3 = dlm3.startLogSegmentNonPartitioned();
- LogRecord op3 = DLMTestUtil.getLogRecordInstance(1);
- try {
- out3.write(op3);
- fail("Should fail writing lower txn id if sanityCheckTxnID is enabled.");
- } catch (TransactionIdOutOfOrderException tioooe) {
- // expected
- }
- out3.closeAndComplete();
- dlm3.close();
- }
-
- @Test(timeout = 60000)
public void testContinuousReaders() throws Exception {
String name = "distrlog-continuous";
BKDistributedLogManager dlm = createNewDLM(conf, name);
@@ -958,12 +903,9 @@
final AtomicReference<Collection<LogSegmentMetadata>> receivedStreams =
new AtomicReference<Collection<LogSegmentMetadata>>();
- DistributedLogManager dlm = createNewDLM(conf, name);
- ZooKeeperClient zkClient = TestZooKeeperClientBuilder.newBuilder()
- .uri(createDLMURI("/"))
- .build();
+ BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
- BKDistributedLogManager.createLog(conf, zkClient, ((BKDistributedLogManager) dlm).uri, name);
+ FutureUtils.result(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true));
dlm.registerListener(new LogSegmentListener() {
@Override
public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
@@ -992,7 +934,6 @@
// no-op
}
});
- LOG.info("Registered listener for stream {}.", name);
long txid = 1;
for (int i = 0; i < numSegments; i++) {
LOG.info("Waiting for creating log segment {}.", i);
@@ -1018,6 +959,8 @@
assertEquals(seqno * DEFAULT_SEGMENT_SIZE, m.getLastTxId());
++seqno;
}
+
+ dlm.close();
}
@Test(timeout = 60000)
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
index e68b916..ecc20e0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
@@ -102,7 +102,7 @@
dlm.close();
// create the stream
- BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, streamName);
+ namespace.createLog(streamName);
DistributedLogManager newDLM = namespace.openLog(streamName);
LogWriter newWriter = newDLM.startLogSegmentNonPartitioned();
@@ -273,9 +273,9 @@
}
});
latches[0].await();
- BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, "test1");
+ namespace.createLog("test1");
latches[1].await();
- BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, "test2");
+ namespace.createLog("test2");
latches[2].await();
assertEquals(0, numFailures.get());
assertNotNull(receivedStreams.get());
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
index 604be0e..8a734b5 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
@@ -191,7 +191,8 @@
protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogManagerFactory factory) {
DistributedLogNamespace namespace = factory.getNamespace();
assertTrue(namespace instanceof BKDistributedLogNamespace);
- return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore();
+ return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+ .getLogSegmentMetadataStore();
}
@SuppressWarnings("deprecation")
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
index 54b1ab8..027b012 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
@@ -253,6 +253,7 @@
confLocal.setOutputBufferSize(0);
confLocal.setLogSegmentRollingIntervalMinutes(0);
confLocal.setMaxLogSegmentBytes(1);
+ confLocal.setLogSegmentRollingConcurrency(Integer.MAX_VALUE);
int numLogSegments = 10;
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
index 625742e..66b97be 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
@@ -140,7 +140,7 @@
logger.info("Try obtaining ledger handle {}", lh.getId());
byte[] data = zkc.get().getData(allocationPath, false, null);
assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
- txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1)));
+ txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
try {
FutureUtils.result(txn.execute());
fail("Should fail the transaction when setting unexisted path");
@@ -337,7 +337,7 @@
ZKTransaction txn = newTxn();
// close during obtaining ledger.
LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
- txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1)));
+ txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
try {
FutureUtils.result(txn.execute());
fail("Should fail the transaction when setting unexisted path");
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index d874274..2a8c83b 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -27,6 +27,8 @@
import com.twitter.distributedlog.ZooKeeperClientUtils;
import com.twitter.distributedlog.callback.LogSegmentNamesListener;
import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
@@ -57,6 +59,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
/**
* Test ZK based log segment metadata store.
@@ -133,14 +136,14 @@
public void testCreateLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
LogSegmentMetadata segment2 = createLogSegment(1L);
Transaction<Object> createTxn2 = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn2, segment2);
+ lsmStore.createLogSegment(createTxn2, segment2, null);
try {
FutureUtils.result(createTxn2.execute());
fail("Should fail if log segment exists");
@@ -158,13 +161,13 @@
public void testDeleteLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
Transaction<Object> deleteTxn = lsmStore.transaction();
- lsmStore.deleteLogSegment(deleteTxn, segment);
+ lsmStore.deleteLogSegment(deleteTxn, segment, null);
FutureUtils.result(deleteTxn.execute());
assertNull("LogSegment " + segment + " should be deleted",
zkc.get().exists(segment.getZkPath(), false));
@@ -174,7 +177,7 @@
public void testDeleteNonExistentLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> deleteTxn = lsmStore.transaction();
- lsmStore.deleteLogSegment(deleteTxn, segment);
+ lsmStore.deleteLogSegment(deleteTxn, segment, null);
try {
FutureUtils.result(deleteTxn.execute());
fail("Should fail deletion if log segment doesn't exist");
@@ -208,7 +211,7 @@
public void testUpdateLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L, 99L);
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
@@ -230,15 +233,15 @@
LogSegmentMetadata segment2 = createLogSegment(2L);
// create log segment 1
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment1);
+ lsmStore.createLogSegment(createTxn, segment1, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment1 + " should be created",
zkc.get().exists(segment1.getZkPath(), false));
// delete log segment 1 and create log segment 2
Transaction<Object> createDeleteTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createDeleteTxn, segment2);
- lsmStore.deleteLogSegment(createDeleteTxn, segment1);
+ lsmStore.createLogSegment(createDeleteTxn, segment2, null);
+ lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
FutureUtils.result(createDeleteTxn.execute());
// segment 1 should be deleted, segment 2 should be created
assertNull("LogSegment " + segment1 + " should be deleted",
@@ -254,16 +257,16 @@
LogSegmentMetadata segment3 = createLogSegment(3L);
// create log segment 1
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment1);
+ lsmStore.createLogSegment(createTxn, segment1, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment1 + " should be created",
zkc.get().exists(segment1.getZkPath(), false));
// delete log segment 1 and delete log segment 2
Transaction<Object> createDeleteTxn = lsmStore.transaction();
- lsmStore.deleteLogSegment(createDeleteTxn, segment1);
- lsmStore.deleteLogSegment(createDeleteTxn, segment2);
- lsmStore.createLogSegment(createDeleteTxn, segment3);
+ lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
+ lsmStore.deleteLogSegment(createDeleteTxn, segment2, null);
+ lsmStore.createLogSegment(createDeleteTxn, segment3, null);
try {
FutureUtils.result(createDeleteTxn.execute());
fail("Should fail transaction if one operation failed");
@@ -286,7 +289,7 @@
public void testGetLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L, 99L);
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
@@ -304,7 +307,7 @@
for (int i = 0; i < 10; i++) {
LogSegmentMetadata segment = createLogSegment(i);
createdSegments.add(segment);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -353,7 +356,7 @@
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -394,7 +397,7 @@
Transaction<Object> anotherCreateTxn = lsmStore.transaction();
for (int i = numSegments; i < 2 * numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(anotherCreateTxn, segment);
+ lsmStore.createLogSegment(anotherCreateTxn, segment, null);
}
FutureUtils.result(anotherCreateTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -419,7 +422,7 @@
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -459,7 +462,7 @@
Transaction<Object> deleteTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.deleteLogSegment(deleteTxn, segment);
+ lsmStore.deleteLogSegment(deleteTxn, segment, null);
}
FutureUtils.result(deleteTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -491,7 +494,7 @@
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -536,7 +539,7 @@
Transaction<Object> anotherCreateTxn = lsmStore.transaction();
for (int i = numSegments; i < 2 * numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(anotherCreateTxn, segment);
+ lsmStore.createLogSegment(anotherCreateTxn, segment, null);
}
FutureUtils.result(anotherCreateTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -561,7 +564,7 @@
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -602,7 +605,7 @@
Transaction<Object> deleteTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.deleteLogSegment(deleteTxn, segment);
+ lsmStore.deleteLogSegment(deleteTxn, segment, null);
}
FutureUtils.result(deleteTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -634,7 +637,9 @@
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
final Promise<Version> result = new Promise<Version>();
- lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value,
+ ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+ when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
+ lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -659,7 +664,9 @@
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
- lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value,
+ ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+ when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
+ lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -695,7 +702,9 @@
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
String nonExistentPath = rootZkPath + "/non-existent";
- lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value,
+ ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+ when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath);
+ lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -726,7 +735,9 @@
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
final Promise<Version> result = new Promise<Version>();
- lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
+ ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+ when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
+ lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -751,7 +762,9 @@
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
- lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
+ ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+ when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
+ lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -787,7 +800,9 @@
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
String nonExistentPath = rootZkPath + "/non-existent";
- lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value,
+ ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+ when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath);
+ lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
similarity index 93%
rename from distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
rename to distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index 648b828..9a08aa0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -52,14 +52,15 @@
import java.util.List;
import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
import static org.junit.Assert.*;
/**
- * Test {@link ZKLogMetadataForWriter}
+ * Test {@link ZKLogStreamMetadataStore}
*/
-public class TestZKLogMetadataForWriter extends ZooKeeperClusterTestCase {
+public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
- private static final Logger logger = LoggerFactory.getLogger(TestZKLogMetadataForWriter.class);
+ private static final Logger logger = LoggerFactory.getLogger(TestZKLogStreamMetadataStore.class);
private final static int sessionTimeoutMs = 30000;
@@ -91,7 +92,7 @@
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(versionPath, ZKLogMetadataForWriter.intToBytes(LAYOUT_VERSION),
+ txn.create(versionPath, intToBytes(LAYOUT_VERSION),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
@@ -127,7 +128,7 @@
public void testCheckLogMetadataPathsWithAllocator() throws Exception {
String logRootPath = "/" + testName.getMethodName();
List<Versioned<byte[]>> metadatas =
- FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(
+ FutureUtils.result(checkLogMetadataPaths(
zkc.get(), logRootPath, true));
assertEquals("Should have 8 paths",
8, metadatas.size());
@@ -141,7 +142,7 @@
public void testCheckLogMetadataPathsWithoutAllocator() throws Exception {
String logRootPath = "/" + testName.getMethodName();
List<Versioned<byte[]>> metadatas =
- FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(
+ FutureUtils.result(checkLogMetadataPaths(
zkc.get(), logRootPath, false));
assertEquals("Should have 7 paths",
7, metadatas.size());
@@ -167,13 +168,12 @@
}
ZKLogMetadataForWriter logMetadata =
- FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier,
- zkc.get(), zkc.getDefaultACL(), ownAllocator, true));
+ FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, ownAllocator, true));
final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
List<Versioned<byte[]>> metadatas =
- FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator));
+ FutureUtils.result(checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator));
if (ownAllocator) {
assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator,
@@ -184,7 +184,7 @@
}
for (Versioned<byte[]> metadata : metadatas) {
- assertTrue(ZKLogMetadataForWriter.pathExists(metadata));
+ assertTrue(pathExists(metadata));
assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0);
}
@@ -300,8 +300,7 @@
public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception {
String logName = testName.getMethodName();
String logIdentifier = "<default>";
- FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier,
- zkc.get(), zkc.getDefaultACL(), true, false));
+ FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, true, false));
}
@Test(timeout = 60000)
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
similarity index 83%
rename from distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
rename to distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
index ce67c30..f14a217 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
@@ -28,9 +28,10 @@
import java.net.URI;
import java.util.List;
+import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
import static org.junit.Assert.*;
-public class TestZKLogMetadataForWriterUtilFunctions {
+public class TestZKLogStreamMetadataStoreUtils {
@SuppressWarnings("unchecked")
@Test(timeout = 60000, expected = UnexpectedException.class)
@@ -43,7 +44,7 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
}
@SuppressWarnings("unchecked")
@@ -58,7 +59,7 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
}
@SuppressWarnings("unchecked")
@@ -72,8 +73,8 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(9999), null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ new Versioned<byte[]>(intToBytes(9999), null));
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
}
@SuppressWarnings("unchecked")
@@ -87,9 +88,9 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
}
@SuppressWarnings("unchecked")
@@ -103,10 +104,10 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
}
@SuppressWarnings("unchecked")
@@ -120,11 +121,11 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
}
@SuppressWarnings("unchecked")
@@ -138,12 +139,12 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)),
new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
}
@SuppressWarnings("unchecked")
@@ -161,12 +162,12 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null),
maxTxnIdData,
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
logSegmentsData);
ZKLogMetadataForWriter metadata =
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
assertTrue(logSegmentsData == metadata.getMaxLSSNData());
assertNull(metadata.getAllocationData().getValue());
@@ -190,15 +191,16 @@
new Versioned<byte[]>(null, null),
new Versioned<byte[]>(null, null),
maxTxnIdData,
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
logSegmentsData,
allocationData);
ZKLogMetadataForWriter metadata =
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
assertTrue(logSegmentsData == metadata.getMaxLSSNData());
assertTrue(allocationData == metadata.getAllocationData());
}
+
}
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
index 8899c0e..db87a65 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
@@ -17,6 +17,7 @@
*/
package com.twitter.distributedlog.util;
+import com.twitter.distributedlog.zk.LimitedPermitManager;
import org.junit.Test;
import java.util.ArrayList;