DL-117: Stream metadata store

This change is to abstract the zookeeper operations into a stream metadata store, so we can replace zookeeper with other metadata store easily.

So the metadata operations in distributedlog now are managed by 3 classes:

- LogMetadataStore : it is the namespace metadata store : it manages the location (uri) mapping for streams and handle namespace operations.
- LogStreamMetadataStore: it is the stream metadata store : it manages the metadata for a single stream, such as managing read/write lock, retriving/creating stream metadata, deleting metadata and such.
- LogSegmentMetadataStore: it is the segment metadata store : it manages the log segment metadata for individual log segment.

LogMetadataStore and LogSegmentMetadataStore are already there. This change focus on LogStreamMetadataStore

Changed:

* abstract all the zookeeper metadata operation in log handlers to LogStreamMetadataStore
* remove disabling max tx id santify check, as maxTxId update is part of the metadata update transaction

Not changed:

the name of ZKLogMetadataForReader and ZKLogMetadataForWriter are not changed. I will send out a change to rename these two classes as they are not related to zookeeper anymore.
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index cf792e3..2ca064c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -30,7 +30,6 @@
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
 import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
@@ -210,7 +209,6 @@
 
     BKAsyncLogReaderDLSN(BKDistributedLogManager bkdlm,
                          ScheduledExecutorService executorService,
-                         OrderedScheduler lockStateExecutor,
                          DLSN startDLSN,
                          Optional<String> subscriberId,
                          boolean returnEndOfStreamRecord,
@@ -219,7 +217,7 @@
         this.bkDistributedLogManager = bkdlm;
         this.executorService = executorService;
         this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
-                lockStateExecutor, this, deserializeRecordSet, true);
+                this, deserializeRecordSet, true);
         LOG.debug("Starting async reader at {}", startDLSN);
         this.startDLSN = startDLSN;
         this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -414,7 +412,7 @@
         final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
 
         if (!readAheadStarted) {
-            bkLedgerManager.checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+            bkLedgerManager.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
                 @Override
                 public void onSuccess(Void value) {
                     try {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index ac37f3a..0a34caa 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -29,25 +29,22 @@
 import com.twitter.distributedlog.callback.LogSegmentListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
 import com.twitter.distributedlog.function.GetVersionedValueFunction;
-import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
 import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.lock.NopDistributedLock;
-import com.twitter.distributedlog.lock.SessionLockFactory;
 import com.twitter.distributedlog.lock.ZKDistributedLock;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.stats.BroadCastStatsLogger;
 import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.subscription.SubscriptionStateStore;
@@ -75,10 +72,6 @@
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
@@ -93,7 +86,6 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -120,13 +112,6 @@
 class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedLogManager {
     static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
 
-    static void createLog(DistributedLogConfiguration conf, ZooKeeperClient zkc, URI uri, String streamName)
-            throws IOException, InterruptedException {
-        Future<ZKLogMetadataForWriter> createFuture = ZKLogMetadataForWriter.of(
-                        uri, streamName, conf.getUnpartitionedStreamName(), zkc.get(), zkc.getDefaultACL(), true, true);
-        FutureUtils.result(createFuture);
-    }
-
     static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION =
             new Function<LogRecordWithDLSN, Long>() {
                 @Override
@@ -158,12 +143,10 @@
     private final StatsLogger perLogStatsLogger;
     private final AlertStatsLogger alertStatsLogger;
 
-    // lock factory
-    private SessionLockFactory lockFactory = null;
-
-    // log segment metadata stores
-    private final LogSegmentMetadataStore writerMetadataStore;
-    private final LogSegmentMetadataStore readerMetadataStore;
+    // log stream metadata stores
+    private final LogStreamMetadataStore writerMetadataStore;
+    private final LogStreamMetadataStore readerMetadataStore;
+    // log segment metadata cache
     private final LogSegmentMetadataCache logSegmentMetadataCache;
 
     // bookkeeper clients
@@ -183,9 +166,6 @@
     //
     private final LedgerAllocator ledgerAllocator;
     private final PermitLimiter writeLimiter;
-    // Log Segment Rolling Manager to control rolling speed
-    private final PermitManager logSegmentRollingPermitManager;
-    private OrderedScheduler lockStateExecutor = null;
 
     //
     // Reader Related Variables
@@ -237,19 +217,16 @@
              readerBKCBuilder,
              null,
              null,
-             null,
              new LogSegmentMetadataCache(conf, Ticker.systemTicker()),
              OrderedScheduler.newBuilder().name("BKDL-" + name).corePoolSize(1).build(),
              null,
              null,
              null,
-             null,
              new ReadAheadExceptionsLogger(statsLogger),
              DistributedLogConstants.UNKNOWN_CLIENT_ID,
              DistributedLogConstants.LOCAL_REGION_ID,
              null,
              writeLimiter,
-             PermitManager.UNLIMITED_PERMIT_MANAGER,
              featureProvider,
              statsLogger,
              NullStatsLogger.INSTANCE);
@@ -268,12 +245,10 @@
      * @param zkcForReaderBKC zookeeper builder for bookkeeper shared by readers
      * @param writerBKCBuilder bookkeeper builder for writers
      * @param readerBKCBuilder bookkeeper builder for readers
-     * @param lockFactory distributed lock factory
      * @param writerMetadataStore writer metadata store
      * @param readerMetadataStore reader metadata store
      * @param scheduler ordered scheduled used by readers and writers
      * @param readAheadScheduler readAhead scheduler used by readers
-     * @param lockStateExecutor ordered scheduled used by locks to execute lock actions
      * @param channelFactory client socket channel factory to build bookkeeper clients
      * @param requestTimer request timer to build bookkeeper clients
      * @param readAheadExceptionsLogger stats logger to record readahead exceptions
@@ -297,13 +272,11 @@
                             ZooKeeperClient zkcForReaderBKC,
                             BookKeeperClientBuilder writerBKCBuilder,
                             BookKeeperClientBuilder readerBKCBuilder,
-                            SessionLockFactory lockFactory,
-                            LogSegmentMetadataStore writerMetadataStore,
-                            LogSegmentMetadataStore readerMetadataStore,
+                            LogStreamMetadataStore writerMetadataStore,
+                            LogStreamMetadataStore readerMetadataStore,
                             LogSegmentMetadataCache logSegmentMetadataCache,
                             OrderedScheduler scheduler,
                             OrderedScheduler readAheadScheduler,
-                            OrderedScheduler lockStateExecutor,
                             ClientSocketChannelFactory channelFactory,
                             HashedWheelTimer requestTimer,
                             ReadAheadExceptionsLogger readAheadExceptionsLogger,
@@ -311,7 +284,6 @@
                             Integer regionId,
                             LedgerAllocator ledgerAllocator,
                             PermitLimiter writeLimiter,
-                            PermitManager logSegmentRollingPermitManager,
                             FeatureProvider featureProvider,
                             StatsLogger statsLogger,
                             StatsLogger perLogStatsLogger) throws IOException {
@@ -320,8 +292,6 @@
         this.conf = conf;
         this.dynConf = dynConf;
         this.scheduler = scheduler;
-        this.lockFactory = lockFactory;
-        this.lockStateExecutor = lockStateExecutor;
         this.readAheadScheduler = null == readAheadScheduler ? scheduler : readAheadScheduler;
         this.statsLogger = statsLogger;
         this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger);
@@ -332,15 +302,24 @@
         this.streamIdentifier = conf.getUnpartitionedStreamName();
         this.ledgerAllocator = ledgerAllocator;
         this.writeLimiter = writeLimiter;
-        this.logSegmentRollingPermitManager = logSegmentRollingPermitManager;
 
         if (null == writerMetadataStore) {
-            this.writerMetadataStore = new ZKLogSegmentMetadataStore(conf, writerZKC, scheduler);
+            this.writerMetadataStore = new ZKLogStreamMetadataStore(
+                    clientId,
+                    conf,
+                    writerZKC,
+                    scheduler,
+                    statsLogger);
         } else {
             this.writerMetadataStore = writerMetadataStore;
         }
         if (null == readerMetadataStore) {
-            this.readerMetadataStore = new ZKLogSegmentMetadataStore(conf, readerZKC, scheduler);
+            this.readerMetadataStore = new ZKLogStreamMetadataStore(
+                    clientId,
+                    conf,
+                    readerZKC,
+                    scheduler,
+                    statsLogger);
         } else {
             this.readerMetadataStore = readerMetadataStore;
         }
@@ -407,26 +386,13 @@
         this.readAheadExceptionsLogger = readAheadExceptionsLogger;
     }
 
-    synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
-        if (createIfNull && null == lockStateExecutor && ownExecutor) {
-            lockStateExecutor = OrderedScheduler.newBuilder()
-                    .corePoolSize(1).name("BKDL-LockState").build();
-        }
-        return lockStateExecutor;
+    @VisibleForTesting
+    LogStreamMetadataStore getWriterMetadataStore() {
+        return writerMetadataStore;
     }
 
-    private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
-        if (createIfNull && null == lockFactory) {
-            lockFactory = new ZKSessionLockFactory(
-                    writerZKC,
-                    clientId,
-                    getLockStateExecutor(createIfNull),
-                    conf.getZKNumRetries(),
-                    conf.getLockTimeoutMilliSeconds(),
-                    conf.getZKRetryBackoffStartMillis(),
-                    statsLogger);
-        }
-        return lockFactory;
+    URI getUri() {
+        return uri;
     }
 
     DistributedLogConfiguration getConf() {
@@ -457,12 +423,16 @@
         return this.featureProvider;
     }
 
-    private synchronized BKLogReadHandler getReadHandlerForListener(boolean create) {
+    private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(
+            boolean create, LogSegmentListener listener) {
         if (null == readHandlerForListener && create) {
             readHandlerForListener = createReadHandler();
-            // start fetch the log segments
+            readHandlerForListener.registerListener(listener);
+            // start fetch the log segments after created the listener
             readHandlerForListener.asyncStartFetchLogSegments();
+            return readHandlerForListener;
         }
+        readHandlerForListener.registerListener(listener);
         return readHandlerForListener;
     }
 
@@ -483,8 +453,7 @@
 
     @Override
     public void registerListener(LogSegmentListener listener) throws IOException {
-        BKLogReadHandler readHandler = getReadHandlerForListener(true);
-        readHandler.registerListener(listener);
+        getReadHandlerAndRegisterListener(true, listener);
     }
 
     @Override
@@ -523,14 +492,12 @@
                                                     boolean isHandleForReading) {
         return createReadHandler(
                 subscriberId,
-                getLockStateExecutor(true),
                 null,
                 true, /* deserialize record set */
                 isHandleForReading);
     }
 
     synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
-                                                    OrderedScheduler lockExecutor,
                                                     AsyncNotification notification,
                                                     boolean deserializeRecordSet,
                                                     boolean isHandleForReading) {
@@ -540,12 +507,10 @@
                 subscriberId,
                 conf,
                 dynConf,
-                readerZKCBuilder,
                 readerBKCBuilder,
                 readerMetadataStore,
                 logSegmentMetadataCache,
                 scheduler,
-                lockExecutor,
                 readAheadScheduler,
                 alertStatsLogger,
                 readAheadExceptionsLogger,
@@ -585,24 +550,15 @@
     }
 
     Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) {
-        final ZooKeeper zk;
-        try {
-            zk = writerZKC.get();
-        } catch (InterruptedException e) {
-            LOG.error("Failed to initialize zookeeper client : ", e);
-            return Future.exception(new DLInterruptedException("Failed to initialize zookeeper client", e));
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, uri.getPath()));
-        }
-
         boolean ownAllocator = null == ledgerAllocator;
 
-        // Fetching Log Metadata
-        Future<ZKLogMetadataForWriter> metadataFuture =
-                ZKLogMetadataForWriter.of(uri, name, streamIdentifier,
-                        zk, writerZKC.getDefaultACL(),
-                        ownAllocator, conf.getCreateStreamIfNotExists() || ownAllocator);
-        return metadataFuture.flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() {
+        // Fetching Log Metadata (create if not exists)
+        return writerMetadataStore.getLog(
+                uri,
+                name,
+                ownAllocator,
+                conf.getCreateStreamIfNotExists() || ownAllocator
+        ).flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() {
             @Override
             public Future<BKLogWriteHandler> apply(ZKLogMetadataForWriter logMetadata) {
                 Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>();
@@ -615,16 +571,10 @@
     private void createWriteHandler(ZKLogMetadataForWriter logMetadata,
                                     boolean lockHandler,
                                     final Promise<BKLogWriteHandler> createPromise) {
-        OrderedScheduler lockStateExecutor = getLockStateExecutor(true);
         // Build the locks
         DistributedLock lock;
         if (conf.isWriteLockEnabled()) {
-            lock = new ZKDistributedLock(
-                    lockStateExecutor,
-                    getLockFactory(true),
-                    logMetadata.getLockPath(),
-                    conf.getLockTimeoutMilliSeconds(),
-                    statsLogger);
+            lock = writerMetadataStore.createWriteLock(logMetadata);
         } else {
             lock = NopDistributedLock.INSTANCE;
         }
@@ -641,7 +591,6 @@
         final BKLogWriteHandler writeHandler = new BKLogWriteHandler(
                 logMetadata,
                 conf,
-                writerZKCBuilder,
                 writerBKCBuilder,
                 writerMetadataStore,
                 logSegmentMetadataCache,
@@ -656,10 +605,6 @@
                 featureProvider,
                 dynConf,
                 lock);
-        PermitManager manager = getLogSegmentRollingPermitManager();
-        if (manager instanceof Watcher) {
-            writeHandler.register((Watcher) manager);
-        }
         if (lockHandler) {
             writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() {
                 @Override
@@ -684,7 +629,7 @@
     }
 
     PermitManager getLogSegmentRollingPermitManager() {
-        return logSegmentRollingPermitManager;
+        return writerMetadataStore.getPermitManager();
     }
 
     <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) {
@@ -692,7 +637,7 @@
         return readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() {
             @Override
             public BKLogReadHandler applyE() throws Throwable {
-                return getReadHandlerForListener(true);
+                return getReadHandlerAndRegisterListener(true, null);
             }
         }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
             @Override
@@ -982,7 +927,6 @@
         AsyncLogReader reader = new BKAsyncLogReaderDLSN(
                 this,
                 scheduler,
-                getLockStateExecutor(true),
                 fromDLSN,
                 subscriberId,
                 false,
@@ -1022,7 +966,6 @@
         final BKAsyncLogReaderDLSN reader = new BKAsyncLogReaderDLSN(
                 BKDistributedLogManager.this,
                 scheduler,
-                getLockStateExecutor(true),
                 fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
                 subscriberId,
                 false,
@@ -1266,33 +1209,9 @@
      */
     @Override
     public void delete() throws IOException {
-        BKLogWriteHandler ledgerHandler = createWriteHandler(true);
-        try {
-            ledgerHandler.deleteLog();
-        } finally {
-            Utils.closeQuietly(ledgerHandler);
-        }
-
-        // Delete the ZK path associated with the log stream
-        String zkPath = getZKPath();
-        // Safety check when we are using the shared zookeeper
-        if (zkPath.toLowerCase().contains("distributedlog")) {
-            try {
-                LOG.info("Delete the path associated with the log {}, ZK Path {}", name, zkPath);
-                ZKUtil.deleteRecursive(writerZKC.get(), zkPath);
-            } catch (InterruptedException ie) {
-                LOG.error("Interrupted while accessing ZK", ie);
-                throw new DLInterruptedException("Error initializing zk", ie);
-            } catch (KeeperException ke) {
-                LOG.error("Error accessing entry in zookeeper", ke);
-                throw new IOException("Error initializing zk", ke);
-            }
-        } else {
-            LOG.warn("Skip deletion of unrecognized ZK Path {}", zkPath);
-        }
+        FutureUtils.result(writerMetadataStore.deleteLog(uri, getStreamName()));
     }
 
-
     /**
      * The DistributedLogManager may archive/purge any logs for transactionId
      * less than or equal to minImageTxId.
@@ -1377,9 +1296,6 @@
                         SchedulerUtils.shutdownScheduler(readAheadScheduler, schedTimeout, TimeUnit.MILLISECONDS);
                         LOG.info("Stopped BKDL ReadAhead Executor Service for {}.", name);
                     }
-
-                    SchedulerUtils.shutdownScheduler(getLockStateExecutor(false), schedTimeout, TimeUnit.MILLISECONDS);
-                    LOG.info("Stopped BKDL Lock State Executor for {}.", name);
                 }
                 if (ownWriterBKC) {
                     writerBKC.close();
@@ -1410,16 +1326,6 @@
         FutureUtils.result(asyncClose());
     }
 
-    public boolean scheduleTask(Runnable task) {
-        try {
-            scheduler.submit(task);
-            return true;
-        } catch (RejectedExecutionException ree) {
-            LOG.error("Task {} is rejected : ", task, ree);
-            return false;
-        }
-    }
-
     private FuturePool buildFuturePool(ExecutorService executorService,
                                        StatsLogger statsLogger) {
         FuturePool futurePool = new ExecutorServiceFuturePool(executorService);
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index f8d347a..2c9fe44 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -21,7 +21,6 @@
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption;
@@ -33,30 +32,24 @@
 import com.twitter.distributedlog.callback.NamespaceListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.feature.CoreFeatureKeys;
 import com.twitter.distributedlog.impl.ZKLogMetadataStore;
-import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
 import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.LogMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.LimitedPermitManager;
 import com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.PermitLimiter;
-import com.twitter.distributedlog.util.PermitManager;
 import com.twitter.distributedlog.util.SchedulerUtils;
 import com.twitter.distributedlog.util.SimplePermitLimiter;
 import com.twitter.distributedlog.util.Utils;
@@ -272,7 +265,6 @@
     private final BKDLConfig bkdlConfig;
     private final OrderedScheduler scheduler;
     private final OrderedScheduler readAheadExecutor;
-    private final OrderedScheduler lockStateExecutor;
     private final ClientSocketChannelFactory channelFactory;
     private final HashedWheelTimer requestTimer;
     // zookeeper clients
@@ -300,16 +292,12 @@
     private final LedgerAllocator allocator;
     // access control manager
     private AccessControlManager accessControlManager;
-    // log segment rolling permit manager
-    private final PermitManager logSegmentRollingPermitManager;
     // log metadata store
     private final LogMetadataStore metadataStore;
     // log segment metadata store
     private final LogSegmentMetadataCache logSegmentMetadataCache;
-    private final LogSegmentMetadataStore writerSegmentMetadataStore;
-    private final LogSegmentMetadataStore readerSegmentMetadataStore;
-    // lock factory
-    private final SessionLockFactory lockFactory;
+    private final LogStreamMetadataStore writerStreamMetadataStore;
+    private final LogStreamMetadataStore readerStreamMetadataStore;
 
     // feature provider
     private final FeatureProvider featureProvider;
@@ -371,15 +359,7 @@
             this.readAheadExecutor = this.scheduler;
             LOG.info("Used shared executor for readahead.");
         }
-        StatsLogger lockStateStatsLogger = statsLogger.scope("factory").scope("lock_scheduler");
-        this.lockStateExecutor = OrderedScheduler.newBuilder()
-                .name("DLM-LockState")
-                .corePoolSize(conf.getNumLockStateThreads())
-                .statsLogger(lockStateStatsLogger)
-                .perExecutorStatsLogger(lockStateStatsLogger)
-                .traceTaskExecution(conf.getEnableTaskExecutionStats())
-                .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
-                .build();
+
         this.channelFactory = new NioClientSocketChannelFactory(
             Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
             Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
@@ -427,9 +407,6 @@
         }
         this.readerBKC = this.sharedReaderBKCBuilder.build();
 
-        this.logSegmentRollingPermitManager = new LimitedPermitManager(
-                conf.getLogSegmentRollingConcurrency(), 1, TimeUnit.MINUTES, scheduler);
-
         if (conf.getGlobalOutstandingWriteLimit() < 0) {
             this.writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
         } else {
@@ -458,15 +435,6 @@
         } else {
             allocator = null;
         }
-        // Build the lock factory
-        this.lockFactory = new ZKSessionLockFactory(
-                sharedWriterZKCForDL,
-                clientId,
-                lockStateExecutor,
-                conf.getZKNumRetries(),
-                conf.getLockTimeoutMilliSeconds(),
-                conf.getZKRetryBackoffStartMillis(),
-                statsLogger);
 
         // Stats Loggers
         this.readAheadExceptionsLogger = new ReadAheadExceptionsLogger(statsLogger);
@@ -478,11 +446,22 @@
             this.metadataStore = new ZKLogMetadataStore(conf, namespace, sharedReaderZKCForDL, scheduler);
         }
 
-        // create log segment metadata store
-        this.writerSegmentMetadataStore =
-                new ZKLogSegmentMetadataStore(conf, sharedWriterZKCForDL, scheduler);
-        this.readerSegmentMetadataStore =
-                new ZKLogSegmentMetadataStore(conf, sharedReaderZKCForDL, scheduler);
+        // create log stream metadata store
+        this.writerStreamMetadataStore =
+                new ZKLogStreamMetadataStore(
+                        clientId,
+                        conf,
+                        sharedWriterZKCForDL,
+                        scheduler,
+                        statsLogger);
+        this.readerStreamMetadataStore =
+                new ZKLogStreamMetadataStore(
+                        clientId,
+                        conf,
+                        sharedReaderZKCForDL,
+                        scheduler,
+                        statsLogger);
+        // create a log segment metadata cache
         this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker());
 
         LOG.info("Constructed BK DistributedLogNamespace : clientId = {}, regionId = {}, federated = {}.",
@@ -499,7 +478,7 @@
         checkState();
         validateName(logName);
         URI uri = FutureUtils.result(metadataStore.createLog(logName));
-        createUnpartitionedStreams(conf, uri, Lists.newArrayList(logName));
+        FutureUtils.result(writerStreamMetadataStore.getLog(uri, logName, true, true));
     }
 
     @Override
@@ -556,7 +535,16 @@
         throws IOException, IllegalArgumentException {
         checkState();
         Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName));
-        return uri.isPresent() && checkIfLogExists(conf, uri.get(), logName);
+        if (uri.isPresent()) {
+            try {
+                FutureUtils.result(writerStreamMetadataStore.logExists(uri.get(), logName));
+                return true;
+            } catch (LogNotFoundException lnfe) {
+                return false;
+            }
+        } else {
+            return false;
+        }
     }
 
     @Override
@@ -701,8 +689,8 @@
     }
 
     @VisibleForTesting
-    public LogSegmentMetadataStore getWriterSegmentMetadataStore() {
-        return writerSegmentMetadataStore;
+    public LogStreamMetadataStore getWriterStreamMetadataStore() {
+        return writerStreamMetadataStore;
     }
 
     @VisibleForTesting
@@ -883,10 +871,8 @@
         }
 
         LedgerAllocator dlmLedgerAlloctor = null;
-        PermitManager dlmLogSegmentRollingPermitManager = PermitManager.UNLIMITED_PERMIT_MANAGER;
         if (ClientSharingOption.SharedClients == clientSharingOption) {
             dlmLedgerAlloctor = this.allocator;
-            dlmLogSegmentRollingPermitManager = this.logSegmentRollingPermitManager;
         }
         // if there's a specified perStreamStatsLogger, user it, otherwise use the default one.
         StatsLogger perLogStatsLogger = perStreamStatsLogger.or(this.perLogStatsLogger);
@@ -902,13 +888,11 @@
                 readerZKCForBK,                     /* ZKC for BookKeeper for DL Readers */
                 writerBKCBuilder,                   /* BookKeeper Builder for DL Writers */
                 readerBKCBuilder,                   /* BookKeeper Builder for DL Readers */
-                lockFactory,                        /* Lock Factory */
-                writerSegmentMetadataStore,         /* Log Segment Metadata Store for DL Writers */
-                readerSegmentMetadataStore,         /* Log Segment Metadata Store for DL Readers */
+                writerStreamMetadataStore,         /* Log Segment Metadata Store for DL Writers */
+                readerStreamMetadataStore,         /* Log Segment Metadata Store for DL Readers */
                 logSegmentMetadataCache,            /* Log Segment Metadata Cache */
                 scheduler,                          /* DL scheduler */
                 readAheadExecutor,                  /* Read Aheader Executor */
-                lockStateExecutor,                  /* Lock State Executor */
                 channelFactory,                     /* Netty Channel Factory */
                 requestTimer,                       /* Request Timer */
                 readAheadExceptionsLogger,          /* ReadAhead Exceptions Logger */
@@ -916,7 +900,6 @@
                 regionId,                           /* Region Id */
                 dlmLedgerAlloctor,                  /* Ledger Allocator */
                 writeLimiter,                       /* Write Limiter */
-                dlmLogSegmentRollingPermitManager,  /* Log segment rolling limiter */
                 featureProvider.scope("dl"),        /* Feature Provider */
                 statsLogger,                        /* Stats Logger */
                 perLogStatsLogger                   /* Per Log Stats Logger */
@@ -961,25 +944,6 @@
         validateName(nameOfStream);
     }
 
-    private static boolean checkIfLogExists(DistributedLogConfiguration conf, URI uri, String name)
-        throws IOException, IllegalArgumentException {
-        validateInput(conf, uri, name);
-        final String logRootPath = uri.getPath() + String.format("/%s", name);
-        return withZooKeeperClient(new ZooKeeperClientHandler<Boolean>() {
-            @Override
-            public Boolean handle(ZooKeeperClient zkc) throws IOException {
-                // check existence after syncing
-                try {
-                    return null != Utils.sync(zkc, logRootPath).exists(logRootPath, false);
-                } catch (KeeperException e) {
-                    throw new ZKException("Error on checking if log " + logRootPath + " exists", e.code());
-                } catch (InterruptedException e) {
-                    throw new DLInterruptedException("Interrupted on checking if log " + logRootPath + " exists", e);
-                }
-            }
-        }, conf, uri);
-    }
-
     public static Map<String, byte[]> enumerateLogsWithMetadataInNamespace(final DistributedLogConfiguration conf, final URI uri)
         throws IOException, IllegalArgumentException {
         return withZooKeeperClient(new ZooKeeperClientHandler<Map<String, byte[]>>() {
@@ -1025,27 +989,6 @@
         return result;
     }
 
-    private static void createUnpartitionedStreams(
-            final DistributedLogConfiguration conf,
-            final URI uri,
-            final List<String> streamNames)
-        throws IOException, IllegalArgumentException {
-        withZooKeeperClient(new ZooKeeperClientHandler<Void>() {
-            @Override
-            public Void handle(ZooKeeperClient zkc) throws IOException {
-                for (String s : streamNames) {
-                    try {
-                        BKDistributedLogManager.createLog(conf, zkc, uri, s);
-                    } catch (InterruptedException e) {
-                        LOG.error("Interrupted on creating unpartitioned stream {} : ", s, e);
-                        return null;
-                    }
-                }
-                return null;
-            }
-        }, conf, uri);
-    }
-
     private void checkState() throws IOException {
         if (closed.get()) {
             LOG.error("BKDistributedLogNamespace {} is already closed", namespace);
@@ -1079,13 +1022,11 @@
             LOG.info("Ledger Allocator stopped.");
         }
 
-        // Unregister gauge to avoid GC spiral
-        this.logSegmentRollingPermitManager.close();
         this.writeLimiter.close();
 
         // Shutdown log segment metadata stores
-        Utils.close(writerSegmentMetadataStore);
-        Utils.close(readerSegmentMetadataStore);
+        Utils.close(writerStreamMetadataStore);
+        Utils.close(readerStreamMetadataStore);
 
         // Shutdown the schedulers
         SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(),
@@ -1113,7 +1054,5 @@
         LOG.info("Release external resources used by channel factory.");
         requestTimer.stop();
         LOG.info("Stopped request timer");
-        SchedulerUtils.shutdownScheduler(lockStateExecutor, 5000, TimeUnit.MILLISECONDS);
-        LOG.info("Stopped lock state executor");
     }
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 2a6e85b..4f138f2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -20,12 +20,9 @@
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
 import com.twitter.distributedlog.io.AsyncAbortable;
 import com.twitter.distributedlog.io.AsyncCloseable;
@@ -33,6 +30,7 @@
 import com.twitter.distributedlog.logsegment.PerStreamLogSegmentCache;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.util.Function;
@@ -45,10 +43,6 @@
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction0;
@@ -95,8 +89,8 @@
 
     protected final ZKLogMetadata logMetadata;
     protected final DistributedLogConfiguration conf;
-    protected final ZooKeeperClient zooKeeperClient;
     protected final BookKeeperClient bookKeeperClient;
+    protected final LogStreamMetadataStore streamMetadataStore;
     protected final LogSegmentMetadataStore metadataStore;
     protected final LogSegmentMetadataCache metadataCache;
     protected final int firstNumEntriesPerReadLastRecordScan;
@@ -112,8 +106,6 @@
     // Maintain the list of log segments per stream
     protected final PerStreamLogSegmentCache logSegmentCache;
 
-
-
     // trace
     protected final long metadataLatencyWarnThresholdMillis;
 
@@ -130,15 +122,13 @@
      */
     BKLogHandler(ZKLogMetadata metadata,
                  DistributedLogConfiguration conf,
-                 ZooKeeperClientBuilder zkcBuilder,
                  BookKeeperClientBuilder bkcBuilder,
-                 LogSegmentMetadataStore metadataStore,
+                 LogStreamMetadataStore streamMetadataStore,
                  LogSegmentMetadataCache metadataCache,
                  OrderedScheduler scheduler,
                  StatsLogger statsLogger,
                  AlertStatsLogger alertStatsLogger,
                  String lockClientId) {
-        Preconditions.checkNotNull(zkcBuilder);
         Preconditions.checkNotNull(bkcBuilder);
         this.logMetadata = metadata;
         this.conf = conf;
@@ -148,13 +138,11 @@
         this.logSegmentCache = new PerStreamLogSegmentCache(
                 metadata.getLogName(),
                 conf.isLogSegmentSequenceNumberValidationEnabled());
-
         firstNumEntriesPerReadLastRecordScan = conf.getFirstNumEntriesPerReadLastRecordScan();
         maxNumEntriesPerReadLastRecordScan = conf.getMaxNumEntriesPerReadLastRecordScan();
-        this.zooKeeperClient = zkcBuilder.build();
-        LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
         this.bookKeeperClient = bkcBuilder.build();
-        this.metadataStore = metadataStore;
+        this.streamMetadataStore = streamMetadataStore;
+        this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore();
         this.metadataCache = metadataCache;
         this.lockClientId = lockClientId;
 
@@ -188,7 +176,8 @@
 
     public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
         final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
-        checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+        streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+                .addEventListener(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 readLogSegmentsFromStore(
@@ -234,7 +223,8 @@
 
     public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) {
         final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
-        checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+        streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+                .addEventListener(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 readLogSegmentsFromStore(
@@ -381,8 +371,8 @@
      * @return the count of records present in the range
      */
     public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {
-
-        return checkLogStreamExistsAsync().flatMap(new Function<Void, Future<Long>>() {
+        return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+                .flatMap(new Function<Void, Future<Long>>() {
             public Future<Long> apply(Void done) {
 
                 return readLogSegmentsFromStore(
@@ -417,48 +407,6 @@
         return sum;
     }
 
-    Future<Void> checkLogStreamExistsAsync() {
-        final Promise<Void> promise = new Promise<Void>();
-        try {
-            final ZooKeeper zk = zooKeeperClient.get();
-            zk.sync(logMetadata.getLogSegmentsPath(), new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int syncRc, String path, Object syncCtx) {
-                    if (KeeperException.Code.NONODE.intValue() == syncRc) {
-                        promise.setException(new LogNotFoundException(
-                                String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
-                        return;
-                    } else if (KeeperException.Code.OK.intValue() != syncRc){
-                        promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
-                                KeeperException.create(KeeperException.Code.get(syncRc))));
-                        return;
-                    }
-                    zk.exists(logMetadata.getLogSegmentsPath(), false, new AsyncCallback.StatCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, Stat stat) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.setValue(null);
-                            } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                                promise.setException(new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
-                            } else {
-                                promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
-                                        KeeperException.create(KeeperException.Code.get(rc))));
-                            }
-                        }
-                    }, null);
-                }
-            }, null);
-
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
-            promise.setException(new DLInterruptedException("Interrupted while checking "
-                    + logMetadata.getLogSegmentsPath(), ie));
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
     @Override
     public Future<Void> asyncAbort() {
         return asyncClose();
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 30a96ff..1963172 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -31,8 +31,6 @@
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.LockCancelledException;
 import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
@@ -40,12 +38,9 @@
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.ZKDistributedLock;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.readahead.ReadAheadWorker;
 import com.twitter.distributedlog.stats.BroadCastStatsLogger;
 import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
@@ -53,7 +48,6 @@
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.ExceptionalFunction0;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
@@ -67,14 +61,13 @@
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Function0;
 import scala.runtime.AbstractFunction1;
 import scala.runtime.BoxedUnit;
 
+import javax.annotation.Nullable;
+
 /**
  * Log Handler for Readers.
  * <h3>Metrics</h3>
@@ -111,7 +104,7 @@
  * becoming idle.
  * </ul>
  * <h4>Read Lock</h4>
- * All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock}
+ * All read lock related stats are exposed under scope `read_lock`.
  * for detail stats.
  */
 class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
@@ -126,10 +119,7 @@
     protected ReadAheadWorker readAheadWorker = null;
     private final boolean isHandleForReading;
 
-    private final SessionLockFactory lockFactory;
-    private final OrderedScheduler lockStateExecutor;
     private final Optional<String> subscriberId;
-    private final String readLockPath;
     private DistributedLock readLock;
     private Future<Void> lockAcquireFuture;
 
@@ -156,12 +146,10 @@
                      Optional<String> subscriberId,
                      DistributedLogConfiguration conf,
                      DynamicDistributedLogConfiguration dynConf,
-                     ZooKeeperClientBuilder zkcBuilder,
                      BookKeeperClientBuilder bkcBuilder,
-                     LogSegmentMetadataStore metadataStore,
+                     LogStreamMetadataStore streamMetadataStore,
                      LogSegmentMetadataCache metadataCache,
                      OrderedScheduler scheduler,
-                     OrderedScheduler lockStateExecutor,
                      OrderedScheduler readAheadExecutor,
                      AlertStatsLogger alertStatsLogger,
                      ReadAheadExceptionsLogger readAheadExceptionsLogger,
@@ -173,9 +161,8 @@
                      boolean deserializeRecordSet) {
         super(logMetadata,
                 conf,
-                zkcBuilder,
                 bkcBuilder,
-                metadataStore,
+                streamMetadataStore,
                 metadataCache,
                 scheduler,
                 statsLogger,
@@ -206,23 +193,12 @@
                 Ticker.systemTicker());
 
         this.subscriberId = subscriberId;
-        this.readLockPath = logMetadata.getReadLockPath(subscriberId);
-        this.lockStateExecutor = lockStateExecutor;
-        this.lockFactory = new ZKSessionLockFactory(
-                zooKeeperClient,
-                getLockClientId(),
-                lockStateExecutor,
-                conf.getZKNumRetries(),
-                conf.getLockTimeoutMilliSeconds(),
-                conf.getZKRetryBackoffStartMillis(),
-                statsLogger.scope("read_lock"));
-
         this.isHandleForReading = isHandleForReading;
     }
 
     @VisibleForTesting
     String getReadLockPath() {
-        return readLockPath;
+        return logMetadataForReader.getReadLockPath(subscriberId);
     }
 
     <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> result) {
@@ -234,38 +210,24 @@
         });
     }
 
+    Future<Void> checkLogStreamExists() {
+        return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName());
+    }
+
     /**
      * Elective stream lock--readers are not required to acquire the lock before using the stream.
      */
     synchronized Future<Void> lockStream() {
         if (null == lockAcquireFuture) {
-            final Function0<DistributedLock> lockFunction =  new ExceptionalFunction0<DistributedLock>() {
-                @Override
-                public DistributedLock applyE() throws IOException {
-                    // Unfortunately this has a blocking call which we should not execute on the
-                    // ZK completion thread
-                    BKLogReadHandler.this.readLock = new ZKDistributedLock(
-                            lockStateExecutor,
-                            lockFactory,
-                            readLockPath,
-                            conf.getLockTimeoutMilliSeconds(),
-                            statsLogger.scope("read_lock"));
-
-                    LOG.info("acquiring readlock {} at {}", getLockClientId(), readLockPath);
-                    return BKLogReadHandler.this.readLock;
-                }
-            };
-            lockAcquireFuture = ensureReadLockPathExist().flatMap(new ExceptionalFunction<Void, Future<Void>>() {
-                @Override
-                public Future<Void> applyE(Void in) throws Throwable {
-                    return scheduler.apply(lockFunction).flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() {
+            lockAcquireFuture = streamMetadataStore.createReadLock(logMetadataForReader, subscriberId)
+                    .flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() {
                         @Override
-                        public Future<Void> applyE(DistributedLock lock) throws IOException {
+                        public Future<Void> applyE(DistributedLock lock) throws Throwable {
+                            BKLogReadHandler.this.readLock = lock;
+                            LOG.info("acquiring readlock {} at {}", getLockClientId(), getReadLockPath());
                             return acquireLockOnExecutorThread(lock);
                         }
                     });
-                }
-            });
         }
         return lockAcquireFuture;
     }
@@ -292,14 +254,14 @@
         acquireFuture.addEventListener(new FutureEventListener<DistributedLock>() {
             @Override
             public void onSuccess(DistributedLock lock) {
-                LOG.info("acquired readlock {} at {}", getLockClientId(), readLockPath);
+                LOG.info("acquired readlock {} at {}", getLockClientId(), getReadLockPath());
                 satisfyPromiseAsync(threadAcquirePromise, new Return<Void>(null));
             }
 
             @Override
             public void onFailure(Throwable cause) {
                 LOG.info("failed to acquire readlock {} at {}",
-                        new Object[]{getLockClientId(), readLockPath, cause});
+                        new Object[]{ getLockClientId(), getReadLockPath(), cause });
                 satisfyPromiseAsync(threadAcquirePromise, new Throw<Void>(cause));
             }
         });
@@ -438,46 +400,6 @@
         return handleCache;
     }
 
-    private Future<Void> ensureReadLockPathExist() {
-        final Promise<Void> promise = new Promise<Void>();
-        promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                FutureUtils.setException(promise, new LockCancelledException(readLockPath, "Could not ensure read lock path", t));
-                return null;
-            }
-        });
-        Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
-        Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
-                new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
-                new org.apache.zookeeper.AsyncCallback.StringCallback() {
-                    @Override
-                    public void processResult(final int rc, final String path, Object ctx, String name) {
-                        scheduler.submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                if (KeeperException.Code.NONODE.intValue() == rc) {
-                                    FutureUtils.setException(promise, new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
-                                } else if (KeeperException.Code.OK.intValue() == rc) {
-                                    FutureUtils.setValue(promise, null);
-                                    LOG.trace("Created path {}.", path);
-                                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                                    FutureUtils.setValue(promise, null);
-                                    LOG.trace("Path {} is already existed.", path);
-                                } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
-                                    FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
-                                } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
-                                    FutureUtils.setException(promise, new DLInterruptedException(path));
-                                } else {
-                                    FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
-                                }
-                            }
-                        });
-                    }
-                }, null);
-        return promise;
-    }
-
     public Entry.Reader getNextReadAheadEntry() throws IOException {
         return readAheadCache.getNextReadAheadEntry();
     }
@@ -560,12 +482,16 @@
     // Listener for log segments
     //
 
-    protected void registerListener(LogSegmentListener listener) {
-        listeners.add(listener);
+    protected void registerListener(@Nullable LogSegmentListener listener) {
+        if (null != listener) {
+            listeners.add(listener);
+        }
     }
 
-    protected void unregisterListener(LogSegmentListener listener) {
-        listeners.remove(listener);
+    protected void unregisterListener(@Nullable LogSegmentListener listener) {
+        if (null != listener) {
+            listeners.remove(listener);
+        }
     }
 
     protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 5d3be7d..f2e30ce 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -23,22 +23,21 @@
 import com.twitter.distributedlog.bk.LedgerAllocator;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
 import com.twitter.distributedlog.exceptions.LockingException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.function.GetLastTxIdFunction;
 import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.logsegment.RollingPolicy;
 import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy;
 import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.metadata.MetadataUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.util.DLUtils;
@@ -49,9 +48,6 @@
 import com.twitter.distributedlog.util.Transaction;
 import com.twitter.distributedlog.util.PermitLimiter;
 import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.zk.ZKOp;
-import com.twitter.distributedlog.zk.ZKTransaction;
-import com.twitter.distributedlog.zk.ZKVersionedSetOp;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
@@ -60,19 +56,11 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction1;
@@ -84,7 +72,6 @@
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static com.google.common.base.Charsets.UTF_8;
 import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
 
 /**
@@ -102,11 +89,11 @@
 class BKLogWriteHandler extends BKLogHandler {
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
+    protected final ZKLogMetadataForWriter logMetadataForWriter;
     protected final DistributedLock lock;
     protected final LedgerAllocator ledgerAllocator;
     protected final MaxTxId maxTxId;
     protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
-    protected final boolean sanityCheckTxnId;
     protected final boolean validateLogSegmentSequenceNumber;
     protected final int regionId;
     protected final RollingPolicy rollingPolicy;
@@ -164,9 +151,8 @@
      */
     BKLogWriteHandler(ZKLogMetadataForWriter logMetadata,
                       DistributedLogConfiguration conf,
-                      ZooKeeperClientBuilder zkcBuilder,
                       BookKeeperClientBuilder bkcBuilder,
-                      LogSegmentMetadataStore metadataStore,
+                      LogStreamMetadataStore streamMetadataStore,
                       LogSegmentMetadataCache metadataCache,
                       OrderedScheduler scheduler,
                       LedgerAllocator allocator,
@@ -181,14 +167,14 @@
                       DistributedLock lock /** owned by handler **/) {
         super(logMetadata,
                 conf,
-                zkcBuilder,
                 bkcBuilder,
-                metadataStore,
+                streamMetadataStore,
                 metadataCache,
                 scheduler,
                 statsLogger,
                 alertStatsLogger,
                 clientId);
+        this.logMetadataForWriter = logMetadata;
         this.perLogStatsLogger = perLogStatsLogger;
         this.writeLimiter = writeLimiter;
         this.featureProvider = featureProvider;
@@ -202,15 +188,13 @@
         } else {
             this.regionId = DistributedLogConstants.LOCAL_REGION_ID;
         }
-        this.sanityCheckTxnId = conf.getSanityCheckTxnID();
         this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled();
 
         // Construct the max sequence no
         maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData());
         inprogressLSSNs = new LinkedList<Long>();
         // Construct the max txn id.
-        maxTxId = new MaxTxId(zooKeeperClient, logMetadata.getMaxTxIdPath(),
-                conf.getSanityCheckTxnID(), logMetadata.getMaxTxIdData());
+        maxTxId = new MaxTxId(logMetadata.getMaxTxIdData());
 
         // Schedule fetching log segment list in background before we access it.
         // We don't need to watch the log segment list changes for writer, as it manages log segment list.
@@ -291,13 +275,12 @@
     }
 
     // Transactional operations for MaxLogSegmentSequenceNo
-    void storeMaxSequenceNumber(final Transaction txn,
+    void storeMaxSequenceNumber(final Transaction<Object> txn,
                                 final MaxLogSegmentSequenceNo maxSeqNo,
                                 final long seqNo,
                                 final boolean isInprogress) {
-        byte[] data = DLUtils.serializeLogSegmentSequenceNumber(seqNo);
-        Op zkOp = Op.setData(logMetadata.getLogSegmentsPath(), data, maxSeqNo.getZkVersion());
-        txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() {
+        metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo),
+                new Transaction.OpListener<Version>() {
             @Override
             public void onCommit(Version version) {
                 if (validateLogSegmentSequenceNumber) {
@@ -309,69 +292,60 @@
                         }
                     }
                 }
-                maxSeqNo.update((ZkVersion) version, seqNo);
+                maxSeqNo.update(version, seqNo);
             }
 
             @Override
             public void onAbort(Throwable t) {
                 // no-op
             }
-        }));
+        });
     }
 
     // Transactional operations for MaxTxId
-    void storeMaxTxId(final ZKTransaction txn,
+    void storeMaxTxId(final Transaction<Object> txn,
                       final MaxTxId maxTxId,
                       final long txId) {
-        byte[] data = maxTxId.couldStore(txId);
-        if (null != data) {
-            Op zkOp = Op.setData(maxTxId.getZkPath(), data, -1);
-            txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() {
-                @Override
-                public void onCommit(Version version) {
-                    maxTxId.setMaxTxId(txId);
-                }
+        metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId),
+                new Transaction.OpListener<Version>() {
+                    @Override
+                    public void onCommit(Version version) {
+                                                        maxTxId.update(version, txId);
+                                                                                      }
 
-                @Override
-                public void onAbort(Throwable t) {
-
-                }
-            }));
-        }
+                    @Override
+                    public void onAbort(Throwable t) {
+                        // no-op
+                    }
+                });
     }
 
     // Transactional operations for logsegment
-    void writeLogSegment(final ZKTransaction txn,
-                         final List<ACL> acl,
-                         final String inprogressSegmentName,
-                         final LogSegmentMetadata metadata,
-                         final String path) {
-        byte[] finalisedData = metadata.getFinalisedData().getBytes(UTF_8);
-        Op zkOp = Op.create(path, finalisedData, acl, CreateMode.PERSISTENT);
-        txn.addOp(new ZKOp(zkOp) {
+    void writeLogSegment(final Transaction<Object> txn,
+                         final LogSegmentMetadata metadata) {
+        metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
             @Override
-            protected void commitOpResult(OpResult opResult) {
-                addLogSegmentToCache(inprogressSegmentName, metadata);
+            public void onCommit(Void r) {
+                addLogSegmentToCache(metadata.getSegmentName(), metadata);
             }
 
             @Override
-            protected void abortOpResult(Throwable t, OpResult opResult) {
+            public void onAbort(Throwable t) {
                 // no-op
             }
         });
     }
 
-    void deleteLogSegment(final ZKTransaction txn,
-                          final String logSegmentName,
-                          final String logSegmentPath) {
-        Op zkOp = Op.delete(logSegmentPath, -1);
-        txn.addOp(new ZKOp(zkOp) {
+    void deleteLogSegment(final Transaction<Object> txn,
+                          final LogSegmentMetadata metadata) {
+        metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
             @Override
-            protected void commitOpResult(OpResult opResult) {
-                removeLogSegmentFromCache(logSegmentName);
+            public void onCommit(Void r) {
+                removeLogSegmentFromCache(metadata.getSegmentName());
             }
+
             @Override
-            protected void abortOpResult(Throwable t, OpResult opResult) {
+            public void onAbort(Throwable t) {
                 // no-op
             }
         });
@@ -405,10 +379,6 @@
         }
     }
 
-    void register(Watcher watcher) {
-        this.zooKeeperClient.register(watcher);
-    }
-
     /**
      * Start a new log segment in a BookKeeper ledger.
      * First ensure that we have the write lock for this journal.
@@ -539,19 +509,17 @@
             FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId));
             return;
         }
-        if (this.sanityCheckTxnId) {
-            long highestTxIdWritten = maxTxId.get();
-            if (txId < highestTxIdWritten) {
-                if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
-                    LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
-                    FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
-                    return;
-                }
-                else {
-                    LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
-                    FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
-                    return;
-                }
+
+        long highestTxIdWritten = maxTxId.get();
+        if (txId < highestTxIdWritten) {
+            if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
+                LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
+                FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
+                return;
+            } else {
+                LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
+                FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
+                return;
             }
         }
 
@@ -564,7 +532,7 @@
         }
 
         // start the transaction from zookeeper
-        final ZKTransaction txn = new ZKTransaction(zooKeeperClient);
+        final Transaction<Object> txn = streamMetadataStore.newTransaction();
 
         // failpoint injected before creating ledger
         try {
@@ -617,7 +585,7 @@
     // once the ledger handle is obtained from allocator, this function should guarantee
     // either the transaction is executed or aborted. Otherwise, the ledger handle will
     // just leak from the allocation pool - hence cause "No Ledger Allocator"
-    private void createInprogressLogSegment(ZKTransaction txn,
+    private void createInprogressLogSegment(Transaction<Object> txn,
                                             final long txId,
                                             final LedgerHandle lh,
                                             boolean bestEffort,
@@ -634,7 +602,6 @@
             return;
         }
 
-        final String inprogressZnodeName = inprogressZNodeName(lh.getId(), txId, logSegmentSeqNo);
         final String inprogressZnodePath = inprogressZNode(lh.getId(), txId, logSegmentSeqNo);
         final LogSegmentMetadata l =
             new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath,
@@ -645,12 +612,7 @@
                     .build();
 
         // Create an inprogress segment
-        writeLogSegment(
-                txn,
-                zooKeeperClient.getDefaultACL(),
-                inprogressZnodeName,
-                l,
-                inprogressZnodePath);
+        writeLogSegment(txn, l);
 
         // Try storing max sequence number.
         LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo);
@@ -667,7 +629,7 @@
                 try {
                     FutureUtils.setValue(promise, new BKLogSegmentWriter(
                             getFullyQualifiedName(),
-                            inprogressZnodeName,
+                            l.getSegmentName(),
                             conf,
                             conf.getDLLedgerMetadataLayoutVersion(),
                             new BKLogSegmentEntryWriter(lh),
@@ -888,7 +850,6 @@
         }
 
         LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
-        final String inprogressZnodePath = inprogressZNode(inprogressZnodeName);
         LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);
 
         // validate log segment
@@ -936,7 +897,7 @@
             // ignore the case that a new inprogress log segment is pre-allocated
             // before completing current inprogress one
             LOG.info("Try storing max sequence number {} in completing {}.",
-                    new Object[] { logSegmentSeqNo, inprogressZnodePath });
+                    new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() });
         } else {
             LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
                     new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
@@ -949,7 +910,6 @@
         }
 
         // Prepare the completion
-        final String nameForCompletedLedger = completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo);
         final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
         long startSequenceId;
         try {
@@ -970,17 +930,12 @@
         setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());
 
         // prepare the transaction
-        ZKTransaction txn = new ZKTransaction(zooKeeperClient);
+        Transaction<Object> txn = streamMetadataStore.newTransaction();
 
         // create completed log segment
-        writeLogSegment(
-                txn,
-                zooKeeperClient.getDefaultACL(),
-                nameForCompletedLedger,
-                completedLogSegment,
-                pathForCompletedLedger);
+        writeLogSegment(txn, completedLogSegment);
         // delete inprogress log segment
-        deleteLogSegment(txn, inprogressZnodeName, inprogressZnodePath);
+        deleteLogSegment(txn, inprogressLogSegment);
         // store max sequence number
         storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
         // update max txn id.
@@ -991,7 +946,8 @@
             @Override
             public void onSuccess(Void value) {
                 LOG.info("Completed {} to {} for {} : {}",
-                        new Object[] { inprogressZnodeName, nameForCompletedLedger, getFullyQualifiedName(), completedLogSegment });
+                        new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(),
+                                getFullyQualifiedName(), completedLogSegment });
                 FutureUtils.setValue(promise, completedLogSegment);
             }
 
@@ -1072,27 +1028,6 @@
 
     }
 
-    public void deleteLog() throws IOException {
-        lock.checkOwnershipAndReacquire();
-        FutureUtils.result(purgeLogSegmentsOlderThanTxnId(-1));
-
-        try {
-            Utils.closeQuietly(lock);
-            zooKeeperClient.get().exists(logMetadata.getLogSegmentsPath(), false);
-            zooKeeperClient.get().exists(logMetadata.getMaxTxIdPath(), false);
-            if (logMetadata.getLogRootPath().toLowerCase().contains("distributedlog")) {
-                ZKUtil.deleteRecursive(zooKeeperClient.get(), logMetadata.getLogRootPath());
-            } else {
-                LOG.warn("Skip deletion of unrecognized ZK Path {}", logMetadata.getLogRootPath());
-            }
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while deleting log znodes", ie);
-            throw new DLInterruptedException("Interrupted while deleting " + logMetadata.getLogRootPath(), ie);
-        } catch (KeeperException ke) {
-            LOG.error("Error deleting" + logMetadata.getLogRootPath() + " in zookeeper", ke);
-        }
-    }
-
     Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
         if (DLSN.InvalidDLSN == dlsn) {
             List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
@@ -1321,33 +1256,29 @@
     private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
                                           final Promise<LogSegmentMetadata> promise) {
         Transaction<Object> deleteTxn = metadataStore.transaction();
-        metadataStore.deleteLogSegment(deleteTxn, segmentMetadata);
-        deleteTxn.execute().addEventListener(new FutureEventListener<Void>() {
+        metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() {
             @Override
-            public void onSuccess(Void result) {
+            public void onCommit(Void r) {
                 // purge log segment
                 removeLogSegmentFromCache(segmentMetadata.getZNodeName());
                 promise.setValue(segmentMetadata);
             }
 
             @Override
-            public void onFailure(Throwable cause) {
-                if (cause instanceof ZKException) {
-                    ZKException zke = (ZKException) cause;
-                    if (KeeperException.Code.NONODE == zke.getKeeperExceptionCode()) {
-                        LOG.error("No log segment {} found for {}.",
-                                segmentMetadata, getFullyQualifiedName());
-                        // purge log segment
-                        removeLogSegmentFromCache(segmentMetadata.getZNodeName());
-                        promise.setValue(segmentMetadata);
-                        return;
-                    }
+            public void onAbort(Throwable t) {
+                if (t instanceof LogSegmentNotFoundException) {
+                    // purge log segment
+                    removeLogSegmentFromCache(segmentMetadata.getZNodeName());
+                    promise.setValue(segmentMetadata);
+                    return;
+                } else {
+                    LOG.error("Couldn't purge {} for {}: with error {}",
+                            new Object[]{ segmentMetadata, getFullyQualifiedName(), t });
+                    promise.setException(t);
                 }
-                LOG.error("Couldn't purge {} for {}: with error {}",
-                        new Object[]{ segmentMetadata, getFullyQualifiedName(), cause });
-                promise.setException(cause);
             }
         });
+        deleteTxn.execute();
     }
 
     @Override
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index f4ca45e..0f6db75 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -62,7 +62,6 @@
                         StatsLogger statsLogger) {
         this.readHandler = bkdlm.createReadHandler(
                 Optional.<String>absent(),
-                bkdlm.getLockStateExecutor(true),
                 this,
                 conf.getDeserializeRecordSetOnReads(),
                 true);
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index 46a056b..6f37a59 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -1557,6 +1557,7 @@
      *
      * @return true if should check txn id with max txn id, otherwise false.
      */
+    @Deprecated
     public boolean getSanityCheckTxnID() {
         return getBoolean(BKDL_MAXID_SANITYCHECK, BKDL_MAXID_SANITYCHECK_DEFAULT);
     }
@@ -1569,6 +1570,7 @@
      * @return configuration.
      * @see #getSanityCheckTxnID()
      */
+    @Deprecated
     public DistributedLogConfiguration setSanityCheckTxnID(boolean enabled) {
         setProperty(BKDL_MAXID_SANITYCHECK, enabled);
         return this;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
index 80cf350..9bfaaba 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
@@ -17,26 +17,15 @@
  */
 package com.twitter.distributedlog;
 
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.util.DLUtils;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
 
 /**
  * Utility class for storing and reading max ledger sequence number
  */
 class MaxLogSegmentSequenceNo {
 
-    static final Logger LOG = LoggerFactory.getLogger(MaxLogSegmentSequenceNo.class);
-
     Version version;
     long maxSeqNo;
 
@@ -55,24 +44,20 @@
             if (null != logSegmentsData && null != logSegmentsData.getVersion()) {
                 version = logSegmentsData.getVersion();
             } else {
-                version = new ZkVersion(-1);
+                throw new IllegalStateException("Invalid MaxLogSegmentSequenceNo found - " + logSegmentsData);
             }
         }
     }
 
-    synchronized int getZkVersion() {
-        return ((ZkVersion) version).getZnodeVersion();
+    synchronized Version getVersion() {
+        return version;
     }
 
     synchronized long getSequenceNumber() {
         return maxSeqNo;
     }
 
-    synchronized MaxLogSegmentSequenceNo update(int zkVersion, long logSegmentSeqNo) {
-        return update(new ZkVersion(zkVersion), logSegmentSeqNo);
-    }
-
-    synchronized MaxLogSegmentSequenceNo update(ZkVersion version, long logSegmentSeqNo) {
+    synchronized MaxLogSegmentSequenceNo update(Version version, long logSegmentSeqNo) {
         if (version.compare(this.version) == Version.Occurred.AFTER) {
             this.version = version;
             this.maxSeqNo = logSegmentSeqNo;
@@ -80,21 +65,8 @@
         return this;
     }
 
-    synchronized void store(ZooKeeperClient zkc, String path, long logSegmentSeqNo) throws IOException {
-        try {
-            Stat stat = zkc.get().setData(path,
-                    DLUtils.serializeLogSegmentSequenceNumber(logSegmentSeqNo), getZkVersion());
-            update(stat.getVersion(), logSegmentSeqNo);
-        } catch (KeeperException ke) {
-            throw new ZKException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
-                                  + path + " : ", ke);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException zce) {
-            throw new IOException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
-                    + path + " : ", zce);
-        } catch (InterruptedException e) {
-            throw new DLInterruptedException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
-                    + path + " : ", e);
-        }
+    public synchronized Versioned<Long> getVersionedData(long seqNo) {
+        return new Versioned<Long>(seqNo, version);
     }
 
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
index c446a8b..ed7218e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
@@ -18,13 +18,11 @@
 package com.twitter.distributedlog;
 
 import com.twitter.distributedlog.util.DLUtils;
+import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * Utility class for storing and reading
  * the max seen txid in zookeeper
@@ -32,73 +30,43 @@
 class MaxTxId {
     static final Logger LOG = LoggerFactory.getLogger(MaxTxId.class);
 
-    private final ZooKeeperClient zkc;
-    private final String path;
-    private final boolean enabled;
-
+    private Version version;
     private long currentMax;
 
-    MaxTxId(ZooKeeperClient zkc, String path, boolean enabled,
-            Versioned<byte[]> maxTxIdData) {
-        this.zkc = zkc;
-        this.path = path;
-        this.enabled = enabled && null != maxTxIdData && null != maxTxIdData.getVersion()
-                && null != maxTxIdData.getValue();
-        if (this.enabled) {
+    MaxTxId(Versioned<byte[]> maxTxIdData) {
+        if (null != maxTxIdData
+                && null != maxTxIdData.getValue()
+                && null != maxTxIdData.getVersion()) {
+            this.version = maxTxIdData.getVersion();
             try {
                 this.currentMax = DLUtils.deserializeTransactionId(maxTxIdData.getValue());
             } catch (NumberFormatException e) {
-                LOG.warn("Invalid txn id stored in {}", path, e);
-                this.currentMax = 0L;
+                LOG.warn("Invalid txn id stored in {}", e);
+                this.currentMax = DistributedLogConstants.INVALID_TXID;
             }
         } else {
-            this.currentMax = -1L;
+            this.currentMax = DistributedLogConstants.INVALID_TXID;
+            if (null != maxTxIdData && null != maxTxIdData.getVersion()) {
+                this.version = maxTxIdData.getVersion();
+            } else {
+                throw new IllegalStateException("Invalid MaxTxId found - " + maxTxIdData);
+            }
         }
     }
 
-    String getZkPath() {
-        return path;
-    }
-
-    synchronized void setMaxTxId(long txId) {
-        if (enabled && this.currentMax < txId) {
+    synchronized void update(Version version, long txId) {
+        if (version.compare(this.version) == Version.Occurred.AFTER) {
+            this.version = version;
             this.currentMax = txId;
         }
     }
 
-    synchronized byte[] couldStore(long maxTxId) {
-        if (enabled && currentMax < maxTxId) {
-            return DLUtils.serializeTransactionId(maxTxId);
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Store the highest TxID encountered so far so that we
-     * can enforce the monotonically non-decreasing property
-     * This is best effort as this enforcement is only done
-     *
-     * @param maxTxId - the maximum transaction id seen so far
-     * @throws IOException
-     */
-    synchronized void store(long maxTxId) throws IOException {
-        if (enabled && currentMax < maxTxId) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Setting maxTxId to " + maxTxId);
-            }
-            String txidStr = Long.toString(maxTxId);
-            try {
-                zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1);
-                currentMax = maxTxId;
-            } catch (Exception e) {
-                LOG.error("Error writing new MaxTxId value {}", maxTxId, e);
-            }
-        }
-    }
-
     synchronized long get() {
         return currentMax;
     }
 
+    public Versioned<Long> getVersionedData(long txId) {
+        return new Versioned<Long>(Math.max(txId, currentMax), version);
+    }
+
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index f0d2797..1b831ea 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -23,12 +23,16 @@
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Transaction;
+import com.twitter.distributedlog.util.Transaction.OpListener;
 import com.twitter.distributedlog.zk.DefaultZKOp;
 import com.twitter.distributedlog.zk.ZKOp;
 import com.twitter.distributedlog.zk.ZKTransaction;
@@ -48,10 +52,8 @@
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -220,30 +222,28 @@
 
     @Override
     public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
-                                                 String path,
+                                                 ZKLogMetadata logMetadata,
                                                  Versioned<Long> lssn,
                                                  Transaction.OpListener<Version> listener) {
         Version version = lssn.getVersion();
         assert(version instanceof ZkVersion);
-
         ZkVersion zkVersion = (ZkVersion) version;
         byte[] data = DLUtils.serializeLogSegmentSequenceNumber(lssn.getValue());
-        Op setDataOp = Op.setData(path, data, zkVersion.getZnodeVersion());
+        Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, zkVersion.getZnodeVersion());
         ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
         txn.addOp(zkOp);
     }
 
     @Override
     public void storeMaxTxnId(Transaction<Object> txn,
-                              String path,
+                              ZKLogMetadataForWriter logMetadata,
                               Versioned<Long> transactionId,
                               Transaction.OpListener<Version> listener) {
         Version version = transactionId.getVersion();
         assert(version instanceof ZkVersion);
-
         ZkVersion zkVersion = (ZkVersion) version;
         byte[] data = DLUtils.serializeTransactionId(transactionId.getValue());
-        Op setDataOp = Op.setData(path, data, zkVersion.getZnodeVersion());
+        Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, zkVersion.getZnodeVersion());
         ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
         txn.addOp(zkOp);
     }
@@ -256,29 +256,66 @@
     }
 
     @Override
-    public void createLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
+    public void createLogSegment(Transaction<Object> txn,
+                                 LogSegmentMetadata segment,
+                                 OpListener<Void> listener) {
         byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
         Op createOp = Op.create(
                 segment.getZkPath(),
                 finalisedData,
                 zkc.getDefaultACL(),
                 CreateMode.PERSISTENT);
-        txn.addOp(DefaultZKOp.of(createOp));
+        txn.addOp(DefaultZKOp.of(createOp, listener));
     }
 
     @Override
-    public void deleteLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
+    public void deleteLogSegment(Transaction<Object> txn,
+                                 final LogSegmentMetadata segment,
+                                 final OpListener<Void> listener) {
         Op deleteOp = Op.delete(
                 segment.getZkPath(),
                 -1);
-        txn.addOp(DefaultZKOp.of(deleteOp));
+        logger.info("Delete segment : {}", segment);
+        txn.addOp(DefaultZKOp.of(deleteOp, new OpListener<Void>() {
+            @Override
+            public void onCommit(Void r) {
+                if (null != listener) {
+                    listener.onCommit(r);
+                }
+            }
+
+            @Override
+            public void onAbort(Throwable t) {
+                logger.info("Aborted transaction on deleting segment {}", segment);
+                KeeperException.Code kc;
+                if (t instanceof KeeperException) {
+                    kc = ((KeeperException) t).code();
+                } else if (t instanceof ZKException) {
+                    kc = ((ZKException) t).getKeeperExceptionCode();
+                } else {
+                    abortListener(t);
+                    return;
+                }
+                if (KeeperException.Code.NONODE == kc) {
+                    abortListener(new LogSegmentNotFoundException(segment.getZkPath()));
+                    return;
+                }
+                abortListener(t);
+            }
+
+            private void abortListener(Throwable t) {
+                if (null != listener) {
+                    listener.onAbort(t);
+                }
+            }
+        }));
     }
 
     @Override
     public void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
         byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
         Op updateOp = Op.setData(segment.getZkPath(), finalisedData, -1);
-        txn.addOp(DefaultZKOp.of(updateOp));
+        txn.addOp(DefaultZKOp.of(updateOp, null));
     }
 
     // reads
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
index 078c040..37beb16 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
@@ -29,6 +29,17 @@
     }
 
     /**
+     * Get the top stream path for a given log.
+     *
+     * @param uri namespace to store the log
+     * @param logName name of the log
+     * @return top stream path
+     */
+    public static String getLogStreamPath(URI uri, String logName) {
+        return String.format("%s/%s", uri.getPath(), logName);
+    }
+
+    /**
      * Get the log root path for a given log.
      *
      * @param uri
@@ -59,14 +70,14 @@
     }
 
     protected static final int LAYOUT_VERSION = -1;
-    protected final static String LOGSEGMENTS_PATH = "/ledgers";
-    protected final static String VERSION_PATH = "/version";
+    public final static String LOGSEGMENTS_PATH = "/ledgers";
+    public final static String VERSION_PATH = "/version";
     // writer znodes
-    protected final static String MAX_TXID_PATH = "/maxtxid";
-    protected final static String LOCK_PATH = "/lock";
-    protected final static String ALLOCATION_PATH = "/allocation";
+    public final static String MAX_TXID_PATH = "/maxtxid";
+    public final static String LOCK_PATH = "/lock";
+    public final static String ALLOCATION_PATH = "/allocation";
     // reader znodes
-    protected final static String READ_LOCK_PATH = "/readLock";
+    public final static String READ_LOCK_PATH = "/readLock";
 
     protected final URI uri;
     protected final String logName;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
index 1de712f..9a1548c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
@@ -17,312 +17,15 @@
  */
 package com.twitter.distributedlog.impl.metadata;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.exceptions.LogExistsException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
 
-import java.io.File;
 import java.net.URI;
-import java.util.List;
 
 /**
  * Log Metadata for writer
  */
 public class ZKLogMetadataForWriter extends ZKLogMetadata {
 
-    static final Logger LOG = LoggerFactory.getLogger(ZKLogMetadataForWriter.class);
-
-    static class MetadataIndex {
-        static final int LOG_ROOT_PARENT = 0;
-        static final int LOG_ROOT = 1;
-        static final int MAX_TXID = 2;
-        static final int VERSION = 3;
-        static final int LOCK = 4;
-        static final int READ_LOCK = 5;
-        static final int LOGSEGMENTS = 6;
-        static final int ALLOCATION = 7;
-    }
-
-    static int bytesToInt(byte[] b) {
-        assert b.length >= 4;
-        return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
-    }
-
-    static byte[] intToBytes(int i) {
-        return new byte[]{
-            (byte) (i >> 24),
-            (byte) (i >> 16),
-            (byte) (i >> 8),
-            (byte) (i)};
-    }
-
-    static boolean pathExists(Versioned<byte[]> metadata) {
-        return null != metadata.getValue() && null != metadata.getVersion();
-    }
-
-    static void ensureMetadataExist(Versioned<byte[]> metadata) {
-        Preconditions.checkNotNull(metadata.getValue());
-        Preconditions.checkNotNull(metadata.getVersion());
-    }
-
-    public static Future<ZKLogMetadataForWriter> of(
-            final URI uri,
-            final String logName,
-            final String logIdentifier,
-            final ZooKeeper zk,
-            final List<ACL> acl,
-            final boolean ownAllocator,
-            final boolean createIfNotExists) {
-        final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier);
-        try {
-            PathUtils.validatePath(logRootPath);
-        } catch (IllegalArgumentException e) {
-            LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
-            return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
-        }
-
-        return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
-                .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
-                    @Override
-                    public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
-                        Promise<List<Versioned<byte[]>>> promise =
-                                new Promise<List<Versioned<byte[]>>>();
-                        createMissingMetadata(zk, logRootPath, metadatas, acl,
-                                ownAllocator, createIfNotExists, promise);
-                        return promise;
-                    }
-                }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() {
-                    @Override
-                    public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
-                        return processLogMetadatas(uri, logName, logIdentifier, metadatas, ownAllocator);
-                    }
-                });
-    }
-
-    static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
-                                                                 String logRootPath,
-                                                                 boolean ownAllocator) {
-        // Note re. persistent lock state initialization: the read lock persistent state (path) is
-        // initialized here but only used in the read handler. The reason is its more convenient and
-        // less error prone to manage all stream structure in one place.
-        final String logRootParentPath = new File(logRootPath).getParent();
-        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
-        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
-        final String lockPath = logRootPath + LOCK_PATH;
-        final String readLockPath = logRootPath + READ_LOCK_PATH;
-        final String versionPath = logRootPath + VERSION_PATH;
-        final String allocationPath = logRootPath + ALLOCATION_PATH;
-
-        int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
-        List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
-        checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
-        checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
-        checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
-        checkFutures.add(Utils.zkGetData(zk, versionPath, false));
-        checkFutures.add(Utils.zkGetData(zk, lockPath, false));
-        checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
-        checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
-        if (ownAllocator) {
-            checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
-        }
-
-        return Future.collect(checkFutures);
-    }
-
-    static void createMissingMetadata(final ZooKeeper zk,
-                                      final String logRootPath,
-                                      final List<Versioned<byte[]>> metadatas,
-                                      final List<ACL> acl,
-                                      final boolean ownAllocator,
-                                      final boolean createIfNotExists,
-                                      final Promise<List<Versioned<byte[]>>> promise) {
-        final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
-        final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
-        CreateMode createMode = CreateMode.PERSISTENT;
-
-        // log root parent path
-        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
-            pathsToCreate.add(null);
-        } else {
-            String logRootParentPath = new File(logRootPath).getParent();
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-
-        // log root path
-        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-
-        // max id
-        if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
-            pathsToCreate.add(zeroTxnIdData);
-            zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
-        }
-        // version
-        if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] versionData = intToBytes(LAYOUT_VERSION);
-            pathsToCreate.add(versionData);
-            zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
-        }
-        // lock path
-        if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-        // read lock path
-        if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-        // log segments path
-        if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
-                    DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
-            pathsToCreate.add(logSegmentsData);
-            zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
-        }
-        // allocation path
-        if (ownAllocator) {
-            if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
-                pathsToCreate.add(null);
-            } else {
-                pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-                zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
-                        DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-            }
-        }
-        if (zkOps.isEmpty()) {
-            // nothing missed
-            promise.setValue(metadatas);
-            return;
-        }
-        if (!createIfNotExists) {
-            promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
-            return;
-        }
-
-        zk.multi(zkOps, new AsyncCallback.MultiCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    List<Versioned<byte[]>> finalMetadatas =
-                            Lists.newArrayListWithExpectedSize(metadatas.size());
-                    for (int i = 0; i < pathsToCreate.size(); i++) {
-                        byte[] dataCreated = pathsToCreate.get(i);
-                        if (null == dataCreated) {
-                            finalMetadatas.add(metadatas.get(i));
-                        } else {
-                            finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
-                        }
-                    }
-                    promise.setValue(finalMetadatas);
-                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                    promise.setException(new LogExistsException("Someone just created log "
-                            + logRootPath));
-                } else {
-                    if (LOG.isDebugEnabled()) {
-                        StringBuilder builder = new StringBuilder();
-                        for (OpResult result : resultList) {
-                            if (result instanceof OpResult.ErrorResult) {
-                                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
-                                builder.append(errorResult.getErr()).append(",");
-                            } else {
-                                builder.append(0).append(",");
-                            }
-                        }
-                        String resultCodeList = builder.substring(0, builder.length() - 1);
-                        LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
-                    }
-
-                    promise.setException(new ZKException("Failed to create log " + logRootPath,
-                            KeeperException.Code.get(rc)));
-                }
-            }
-        }, null);
-    }
-
-    static ZKLogMetadataForWriter processLogMetadatas(URI uri,
-                                                      String logName,
-                                                      String logIdentifier,
-                                                      List<Versioned<byte[]>> metadatas,
-                                                      boolean ownAllocator)
-            throws UnexpectedException {
-        try {
-            // max id
-            Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
-            ensureMetadataExist(maxTxnIdData);
-            // version
-            Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
-            ensureMetadataExist(maxTxnIdData);
-            Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
-            // lock path
-            ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
-            // read lock path
-            ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
-            // max lssn
-            Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
-            ensureMetadataExist(maxLSSNData);
-            try {
-                DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
-            } catch (NumberFormatException nfe) {
-                throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
-            }
-            // allocation path
-            Versioned<byte[]>  allocationData;
-            if (ownAllocator) {
-                allocationData = metadatas.get(MetadataIndex.ALLOCATION);
-                ensureMetadataExist(allocationData);
-            } else {
-                allocationData = new Versioned<byte[]>(null, null);
-            }
-            return new ZKLogMetadataForWriter(uri, logName, logIdentifier,
-                    maxLSSNData, maxTxnIdData, allocationData);
-        } catch (IllegalArgumentException iae) {
-            throw new UnexpectedException("Invalid log " + logName, iae);
-        } catch (NullPointerException npe) {
-            throw new UnexpectedException("Invalid log " + logName, npe);
-        }
-    }
-
     private final Versioned<byte[]> maxLSSNData;
     private final Versioned<byte[]> maxTxIdData;
     private final Versioned<byte[]> allocationData;
@@ -334,12 +37,12 @@
      * @param logName       name of the log
      * @param logIdentifier identifier of the log
      */
-    private ZKLogMetadataForWriter(URI uri,
-                                   String logName,
-                                   String logIdentifier,
-                                   Versioned<byte[]> maxLSSNData,
-                                   Versioned<byte[]> maxTxIdData,
-                                   Versioned<byte[]> allocationData) {
+    public ZKLogMetadataForWriter(URI uri,
+                                  String logName,
+                                  String logIdentifier,
+                                  Versioned<byte[]> maxLSSNData,
+                                  Versioned<byte[]> maxTxIdData,
+                                  Versioned<byte[]> allocationData) {
         super(uri, logName, logIdentifier);
         this.maxLSSNData = maxLSSNData;
         this.maxTxIdData = maxTxIdData;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
new file mode 100644
index 0000000..d89dddb
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
+import com.twitter.distributedlog.exceptions.LockCancelledException;
+import com.twitter.distributedlog.exceptions.LogExistsException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
+import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.lock.SessionLockFactory;
+import com.twitter.distributedlog.lock.ZKDistributedLock;
+import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.SchedulerUtils;
+import com.twitter.distributedlog.zk.LimitedPermitManager;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.PermitManager;
+import com.twitter.distributedlog.util.Transaction;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.distributedlog.zk.ZKTransaction;
+import com.twitter.util.ExceptionalFunction;
+import com.twitter.util.ExceptionalFunction0;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+
+/**
+ * zookeeper based {@link LogStreamMetadataStore}
+ */
+public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
+
+    private final static Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);
+
+    private final String clientId;
+    private final DistributedLogConfiguration conf;
+    private final ZooKeeperClient zooKeeperClient;
+    private final OrderedScheduler scheduler;
+    private final StatsLogger statsLogger;
+    private final LogSegmentMetadataStore logSegmentStore;
+    private final LimitedPermitManager permitManager;
+    // lock
+    private SessionLockFactory lockFactory;
+    private OrderedScheduler lockStateExecutor;
+
+    public ZKLogStreamMetadataStore(String clientId,
+                                    DistributedLogConfiguration conf,
+                                    ZooKeeperClient zkc,
+                                    OrderedScheduler scheduler,
+                                    StatsLogger statsLogger) {
+        this.clientId = clientId;
+        this.conf = conf;
+        this.zooKeeperClient = zkc;
+        this.scheduler = scheduler;
+        this.statsLogger = statsLogger;
+        // create the log segment metadata store and the permit manager (used for log segment rolling)
+        this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler);
+        this.permitManager = new LimitedPermitManager(
+                conf.getLogSegmentRollingConcurrency(),
+                1,
+                TimeUnit.MINUTES,
+                scheduler);
+        this.zooKeeperClient.register(permitManager);
+    }
+
+    private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
+        if (createIfNull && null == lockStateExecutor) {
+            StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler");
+            lockStateExecutor = OrderedScheduler.newBuilder()
+                    .name("DLM-LockState")
+                    .corePoolSize(conf.getNumLockStateThreads())
+                    .statsLogger(lockStateStatsLogger)
+                    .perExecutorStatsLogger(lockStateStatsLogger)
+                    .traceTaskExecution(conf.getEnableTaskExecutionStats())
+                    .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
+                    .build();
+        }
+        return lockStateExecutor;
+    }
+
+    private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
+        if (createIfNull && null == lockFactory) {
+            lockFactory = new ZKSessionLockFactory(
+                    zooKeeperClient,
+                    clientId,
+                    getLockStateExecutor(createIfNull),
+                    conf.getZKNumRetries(),
+                    conf.getLockTimeoutMilliSeconds(),
+                    conf.getZKRetryBackoffStartMillis(),
+                    statsLogger);
+        }
+        return lockFactory;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.zooKeeperClient.unregister(permitManager);
+        this.permitManager.close();
+        this.logSegmentStore.close();
+        SchedulerUtils.shutdownScheduler(
+                getLockStateExecutor(false),
+                conf.getSchedulerShutdownTimeoutMs(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public LogSegmentMetadataStore getLogSegmentMetadataStore() {
+        return logSegmentStore;
+    }
+
+    @Override
+    public PermitManager getPermitManager() {
+        return this.permitManager;
+    }
+
+    @Override
+    public Transaction<Object> newTransaction() {
+        return new ZKTransaction(zooKeeperClient);
+    }
+
+    @Override
+    public Future<Void> logExists(URI uri, final String logName) {
+        final String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath(
+                uri, logName, conf.getUnpartitionedStreamName());
+        final Promise<Void> promise = new Promise<Void>();
+        try {
+            final ZooKeeper zk = zooKeeperClient.get();
+            zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int syncRc, String path, Object syncCtx) {
+                    if (KeeperException.Code.NONODE.intValue() == syncRc) {
+                        promise.setException(new LogNotFoundException(
+                                String.format("Log %s does not exist or has been deleted", logName)));
+                        return;
+                    } else if (KeeperException.Code.OK.intValue() != syncRc){
+                        promise.setException(new ZKException("Error on checking log existence for " + logName,
+                                KeeperException.create(KeeperException.Code.get(syncRc))));
+                        return;
+                    }
+                    zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() {
+                        @Override
+                        public void processResult(int rc, String path, Object ctx, Stat stat) {
+                            if (KeeperException.Code.OK.intValue() == rc) {
+                                promise.setValue(null);
+                            } else if (KeeperException.Code.NONODE.intValue() == rc) {
+                                promise.setException(new LogNotFoundException(
+                                        String.format("Log %s does not exist or has been deleted", logName)));
+                            } else {
+                                promise.setException(new ZKException("Error on checking log existence for " + logName,
+                                        KeeperException.create(KeeperException.Code.get(rc))));
+                            }
+                        }
+                    }, null);
+                }
+            }, null);
+
+        } catch (InterruptedException ie) {
+            LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
+            promise.setException(new DLInterruptedException("Interrupted while checking "
+                    + logSegmentsPath, ie));
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            promise.setException(e);
+        }
+        return promise;
+    }
+
+    //
+    // Create Write Lock
+    //
+
+    @Override
+    public DistributedLock createWriteLock(ZKLogMetadataForWriter metadata) {
+        return new ZKDistributedLock(
+                getLockStateExecutor(true),
+                getLockFactory(true),
+                metadata.getLockPath(),
+                conf.getLockTimeoutMilliSeconds(),
+                statsLogger);
+    }
+
+    //
+    // Create Read Lock
+    //
+
+    private Future<Void> ensureReadLockPathExist(final ZKLogMetadata logMetadata,
+                                                 final String readLockPath) {
+        final Promise<Void> promise = new Promise<Void>();
+        promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Throwable t) {
+                FutureUtils.setException(promise, new LockCancelledException(readLockPath,
+                        "Could not ensure read lock path", t));
+                return null;
+            }
+        });
+        Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
+        Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
+                new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
+                new org.apache.zookeeper.AsyncCallback.StringCallback() {
+                    @Override
+                    public void processResult(final int rc, final String path, Object ctx, String name) {
+                        if (KeeperException.Code.NONODE.intValue() == rc) {
+                            FutureUtils.setException(promise, new LogNotFoundException(
+                                    String.format("Log %s does not exist or has been deleted",
+                                            logMetadata.getFullyQualifiedName())));
+                        } else if (KeeperException.Code.OK.intValue() == rc) {
+                            FutureUtils.setValue(promise, null);
+                            LOG.trace("Created path {}.", path);
+                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                            FutureUtils.setValue(promise, null);
+                            LOG.trace("Path {} is already existed.", path);
+                        } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
+                            FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
+                        } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
+                            FutureUtils.setException(promise, new DLInterruptedException(path));
+                        } else {
+                            FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
+                        }
+                    }
+                }, null);
+        return promise;
+    }
+
+    @Override
+    public Future<DistributedLock> createReadLock(final ZKLogMetadataForReader metadata,
+                                                  Optional<String> readerId) {
+        final String readLockPath = metadata.getReadLockPath(readerId);
+        return ensureReadLockPathExist(metadata, readLockPath).flatMap(
+                new ExceptionalFunction<Void, Future<DistributedLock>>() {
+            @Override
+            public Future<DistributedLock> applyE(Void value) throws Throwable {
+                // Unfortunately this has a blocking call which we should not execute on the
+                // ZK completion thread
+                return scheduler.apply(new ExceptionalFunction0<DistributedLock>() {
+                    @Override
+                    public DistributedLock applyE() throws Throwable {
+                        return new ZKDistributedLock(
+                            getLockStateExecutor(true),
+                            getLockFactory(true),
+                            readLockPath,
+                            conf.getLockTimeoutMilliSeconds(),
+                            statsLogger.scope("read_lock"));
+                    }
+                });
+            }
+        });
+    }
+
+    //
+    // Create Log
+    //
+
+    static class MetadataIndex {
+        static final int LOG_ROOT_PARENT = 0;
+        static final int LOG_ROOT = 1;
+        static final int MAX_TXID = 2;
+        static final int VERSION = 3;
+        static final int LOCK = 4;
+        static final int READ_LOCK = 5;
+        static final int LOGSEGMENTS = 6;
+        static final int ALLOCATION = 7;
+    }
+
+    static int bytesToInt(byte[] b) {
+        assert b.length >= 4;
+        return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
+    }
+
+    static byte[] intToBytes(int i) {
+        return new byte[]{
+            (byte) (i >> 24),
+            (byte) (i >> 16),
+            (byte) (i >> 8),
+            (byte) (i)};
+    }
+
+    static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
+                                                                 String logRootPath,
+                                                                 boolean ownAllocator) {
+        // Note re. persistent lock state initialization: the read lock persistent state (path) is
+        // initialized here but only used in the read handler. The reason is its more convenient and
+        // less error prone to manage all stream structure in one place.
+        final String logRootParentPath = new File(logRootPath).getParent();
+        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
+        final String lockPath = logRootPath + LOCK_PATH;
+        final String readLockPath = logRootPath + READ_LOCK_PATH;
+        final String versionPath = logRootPath + VERSION_PATH;
+        final String allocationPath = logRootPath + ALLOCATION_PATH;
+
+        int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
+        List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
+        checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
+        checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
+        checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
+        checkFutures.add(Utils.zkGetData(zk, versionPath, false));
+        checkFutures.add(Utils.zkGetData(zk, lockPath, false));
+        checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
+        checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
+        if (ownAllocator) {
+            checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
+        }
+
+        return Future.collect(checkFutures);
+    }
+
+    static boolean pathExists(Versioned<byte[]> metadata) {
+        return null != metadata.getValue() && null != metadata.getVersion();
+    }
+
+    static void ensureMetadataExist(Versioned<byte[]> metadata) {
+        Preconditions.checkNotNull(metadata.getValue());
+        Preconditions.checkNotNull(metadata.getVersion());
+    }
+
+    static void createMissingMetadata(final ZooKeeper zk,
+                                      final String logRootPath,
+                                      final List<Versioned<byte[]>> metadatas,
+                                      final List<ACL> acl,
+                                      final boolean ownAllocator,
+                                      final boolean createIfNotExists,
+                                      final Promise<List<Versioned<byte[]>>> promise) {
+        final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
+        final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
+        CreateMode createMode = CreateMode.PERSISTENT;
+
+        // log root parent path
+        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
+            pathsToCreate.add(null);
+        } else {
+            String logRootParentPath = new File(logRootPath).getParent();
+            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+            zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+        }
+
+        // log root path
+        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
+            pathsToCreate.add(null);
+        } else {
+            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+        }
+
+        // max id
+        if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
+            pathsToCreate.add(null);
+        } else {
+            byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
+            pathsToCreate.add(zeroTxnIdData);
+            zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
+        }
+        // version
+        if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
+            pathsToCreate.add(null);
+        } else {
+            byte[] versionData = intToBytes(LAYOUT_VERSION);
+            pathsToCreate.add(versionData);
+            zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
+        }
+        // lock path
+        if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
+            pathsToCreate.add(null);
+        } else {
+            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+        }
+        // read lock path
+        if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
+            pathsToCreate.add(null);
+        } else {
+            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+        }
+        // log segments path
+        if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
+            pathsToCreate.add(null);
+        } else {
+            byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
+                    DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
+            pathsToCreate.add(logSegmentsData);
+            zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
+        }
+        // allocation path
+        if (ownAllocator) {
+            if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
+                pathsToCreate.add(null);
+            } else {
+                pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+                zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
+                        DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            }
+        }
+        if (zkOps.isEmpty()) {
+            // nothing missed
+            promise.setValue(metadatas);
+            return;
+        }
+        if (!createIfNotExists) {
+            promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
+            return;
+        }
+
+        zk.multi(zkOps, new AsyncCallback.MultiCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
+                if (KeeperException.Code.OK.intValue() == rc) {
+                    List<Versioned<byte[]>> finalMetadatas =
+                            Lists.newArrayListWithExpectedSize(metadatas.size());
+                    for (int i = 0; i < pathsToCreate.size(); i++) {
+                        byte[] dataCreated = pathsToCreate.get(i);
+                        if (null == dataCreated) {
+                            finalMetadatas.add(metadatas.get(i));
+                        } else {
+                            finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
+                        }
+                    }
+                    promise.setValue(finalMetadatas);
+                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                    promise.setException(new LogExistsException("Someone just created log "
+                            + logRootPath));
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        StringBuilder builder = new StringBuilder();
+                        for (OpResult result : resultList) {
+                            if (result instanceof OpResult.ErrorResult) {
+                                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
+                                builder.append(errorResult.getErr()).append(",");
+                            } else {
+                                builder.append(0).append(",");
+                            }
+                        }
+                        String resultCodeList = builder.substring(0, builder.length() - 1);
+                        LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
+                    }
+
+                    promise.setException(new ZKException("Failed to create log " + logRootPath,
+                            KeeperException.Code.get(rc)));
+                }
+            }
+        }, null);
+    }
+
+    static ZKLogMetadataForWriter processLogMetadatas(URI uri,
+                                                      String logName,
+                                                      String logIdentifier,
+                                                      List<Versioned<byte[]>> metadatas,
+                                                      boolean ownAllocator)
+            throws UnexpectedException {
+        try {
+            // max id
+            Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
+            ensureMetadataExist(maxTxnIdData);
+            // version
+            Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
+            ensureMetadataExist(maxTxnIdData);
+            Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
+            // lock path
+            ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
+            // read lock path
+            ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
+            // max lssn
+            Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
+            ensureMetadataExist(maxLSSNData);
+            try {
+                DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
+            } catch (NumberFormatException nfe) {
+                throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
+            }
+            // allocation path
+            Versioned<byte[]>  allocationData;
+            if (ownAllocator) {
+                allocationData = metadatas.get(MetadataIndex.ALLOCATION);
+                ensureMetadataExist(allocationData);
+            } else {
+                allocationData = new Versioned<byte[]>(null, null);
+            }
+            return new ZKLogMetadataForWriter(uri, logName, logIdentifier,
+                    maxLSSNData, maxTxnIdData, allocationData);
+        } catch (IllegalArgumentException iae) {
+            throw new UnexpectedException("Invalid log " + logName, iae);
+        } catch (NullPointerException npe) {
+            throw new UnexpectedException("Invalid log " + logName, npe);
+        }
+    }
+
+    static Future<ZKLogMetadataForWriter> getLog(final URI uri,
+                                                 final String logName,
+                                                 final String logIdentifier,
+                                                 final ZooKeeperClient zooKeeperClient,
+                                                 final boolean ownAllocator,
+                                                 final boolean createIfNotExists) {
+        final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier);
+        try {
+            PathUtils.validatePath(logRootPath);
+        } catch (IllegalArgumentException e) {
+            LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
+            return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
+        }
+
+        try {
+            final ZooKeeper zk = zooKeeperClient.get();
+            return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
+                    .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
+                        @Override
+                        public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
+                            Promise<List<Versioned<byte[]>>> promise =
+                                    new Promise<List<Versioned<byte[]>>>();
+                            createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
+                                    ownAllocator, createIfNotExists, promise);
+                            return promise;
+                        }
+                    }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() {
+                        @Override
+                        public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
+                            return processLogMetadatas(
+                                    uri,
+                                    logName,
+                                    logIdentifier,
+                                    metadatas,
+                                    ownAllocator);
+                        }
+                    });
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName,
+                    KeeperException.Code.CONNECTIONLOSS));
+        } catch (InterruptedException e) {
+            return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
+        }
+    }
+
+    @Override
+    public Future<ZKLogMetadataForWriter> getLog(final URI uri,
+                                                 final String logName,
+                                                 final boolean ownAllocator,
+                                                 final boolean createIfNotExists) {
+        return getLog(
+                uri,
+                logName,
+                conf.getUnpartitionedStreamName(),
+                zooKeeperClient,
+                ownAllocator,
+                createIfNotExists);
+    }
+
+    //
+    // Delete Log
+    //
+
+    @Override
+    public Future<Void> deleteLog(URI uri, final String logName) {
+        final Promise<Void> promise = new Promise<Void>();
+        try {
+            String streamPath = ZKLogMetadata.getLogStreamPath(uri, logName);
+            ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    if (KeeperException.Code.OK.intValue() != rc) {
+                        FutureUtils.setException(promise,
+                                new ZKException("Encountered zookeeper issue on deleting log stream "
+                                        + logName, KeeperException.Code.get(rc)));
+                        return;
+                    }
+                    FutureUtils.setValue(promise, null);
+                }
+            }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+                    + logName, KeeperException.Code.CONNECTIONLOSS));
+        } catch (InterruptedException e) {
+            FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream "
+                    + logName));
+        } catch (KeeperException e) {
+            FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+                    + logName, e));
+        }
+        return promise;
+    }
+}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
index 2ea1671..5144634 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
@@ -20,6 +20,8 @@
 import com.google.common.annotations.Beta;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.util.Transaction;
 import com.twitter.distributedlog.util.Transaction.OpListener;
 import com.twitter.util.Future;
@@ -52,15 +54,15 @@
      *
      * @param txn
      *          transaction to execute for storing log segment sequence number.
-     * @param path
-     *          path to store sequence number
+     * @param logMetadata
+     *          metadata of the log stream
      * @param sequenceNumber
      *          log segment sequence number to store
      * @param listener
      *          listener on the result to this operation
      */
     void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
-                                          String path,
+                                          ZKLogMetadata logMetadata,
                                           Versioned<Long> sequenceNumber,
                                           OpListener<Version> listener);
 
@@ -69,15 +71,15 @@
      *
      * @param txn
      *          transaction to execute for storing transaction id
-     * @param path
-     *          path to store sequence number
+     * @param logMetadata
+     *          metadata of the log stream
      * @param transactionId
      *          transaction id to store
      * @param listener
      *          listener on the result to this operation
      */
     void storeMaxTxnId(Transaction<Object> txn,
-                       String path,
+                       ZKLogMetadataForWriter logMetadata,
                        Versioned<Long> transactionId,
                        OpListener<Version> listener);
 
@@ -91,8 +93,12 @@
      *          transaction to execute for this operation
      * @param segment
      *          segment to create
+     * @param opListener
+     *          the listener on the operation result
      */
-    void createLogSegment(Transaction<Object> txn, LogSegmentMetadata segment);
+    void createLogSegment(Transaction<Object> txn,
+                          LogSegmentMetadata segment,
+                          OpListener<Void> opListener);
 
     /**
      * Delete a log segment <code>segment</code> under transaction <code>txn</code>.
@@ -105,7 +111,9 @@
      * @param segment
      *          segment to delete
      */
-    void deleteLogSegment(Transaction<Object> txn, LogSegmentMetadata segment);
+    void deleteLogSegment(Transaction<Object> txn,
+                          LogSegmentMetadata segment,
+                          OpListener<Void> opListener);
 
     /**
      * Update a log segment <code>segment</code> under transaction <code>txn</code>.
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
index baa3240..ac36ef2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
@@ -51,16 +51,15 @@
             new ConcurrentHashMap<URI, DLConfig>();
 
     public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) {
-        dlConf.setSanityCheckTxnID(bkdlConfig.getSanityCheckTxnID());
         dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID());
         dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo());
         if (bkdlConfig.isFederatedNamespace()) {
             dlConf.setCreateStreamIfNotExists(false);
             LOG.info("Disabled createIfNotExists for federated namespace.");
         }
-        LOG.info("Propagate BKDLConfig to DLConfig : sanityCheckTxnID = {}, encodeRegionID = {}," +
+        LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," +
                         " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.",
-                new Object[] { dlConf.getSanityCheckTxnID(), dlConf.getEncodeRegionIDInLogSegmentMetadata(),
+                new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(),
                         dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(),
                         bkdlConfig.isFederatedNamespace() });
     }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
index d205b3a..0e5e6d4 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
@@ -177,8 +177,8 @@
     protected void addNewSegmentAndDeleteOldSegment(Transaction<Object> txn,
                                                     LogSegmentMetadata newSegment,
                                                     LogSegmentMetadata oldSegment) {
-        metadataStore.deleteLogSegment(txn, oldSegment);
-        metadataStore.createLogSegment(txn, newSegment);
+        metadataStore.deleteLogSegment(txn, oldSegment, null);
+        metadataStore.createLogSegment(txn, newSegment, null);
     }
 
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
new file mode 100644
index 0000000..db7812e
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.metadata;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.util.PermitManager;
+import com.twitter.distributedlog.util.Transaction;
+import com.twitter.util.Future;
+
+import java.io.Closeable;
+import java.net.URI;
+
+/**
+ * The interface to manage the log stream metadata. The implementation is responsible
+ * for creating the metadata layout.
+ */
+@Beta
+public interface LogStreamMetadataStore extends Closeable {
+
+    /**
+     * Create a transaction for the metadata operations happening in the metadata store.
+     *
+     * @return transaction for the metadata operations
+     */
+    Transaction<Object> newTransaction();
+
+    /**
+     * Ensure the existence of a log stream
+     *
+     * @param uri the location of the log stream
+     * @param logName the name of the log stream
+     * @return future represents the existence of a log stream. {@link com.twitter.distributedlog.LogNotFoundException}
+     *         is thrown if the log doesn't exist
+     */
+    Future<Void> logExists(URI uri, String logName);
+
+    /**
+     * Create the read lock for the log stream.
+     *
+     * @param metadata the metadata for a log stream
+     * @param readerId the reader id used for lock
+     * @return the read lock
+     */
+    Future<DistributedLock> createReadLock(ZKLogMetadataForReader metadata,
+                                           Optional<String> readerId);
+
+    /**
+     * Create the write lock for the log stream.
+     *
+     * @param metadata the metadata for a log stream
+     * @return the write lock
+     */
+    DistributedLock createWriteLock(ZKLogMetadataForWriter metadata);
+
+    /**
+     * Create the metadata of a log.
+     *
+     * @param uri the location to store the metadata of the log
+     * @param streamName the name of the log stream
+     * @param ownAllocator whether to use its own allocator or external allocator
+     * @param createIfNotExists flag to create the stream if it doesn't exist
+     * @return the metadata of the log
+     */
+    Future<ZKLogMetadataForWriter> getLog(URI uri,
+                                          String streamName,
+                                          boolean ownAllocator,
+                                          boolean createIfNotExists);
+
+    /**
+     * Delete the metadata of a log.
+     *
+     * @param uri the location to store the metadata of the log
+     * @param streamName the name of the log stream
+     * @return future represents the result of the deletion.
+     */
+    Future<Void> deleteLog(URI uri, String streamName);
+
+    /**
+     * Get the log segment metadata store.
+     *
+     * @return the log segment metadata store.
+     */
+    LogSegmentMetadataStore getLogSegmentMetadataStore();
+
+    /**
+     * Get the permit manager for this metadata store. It can be used for limiting the concurrent
+     * metadata operations. The implementation can disable handing over the permits when the metadata
+     * store is unavailable (for example zookeeper session expired).
+     *
+     * @return the permit manager
+     */
+    PermitManager getPermitManager();
+
+
+
+}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index bed2fcd..23d8e40 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -267,7 +267,8 @@
         protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws IOException {
             DistributedLogNamespace namespace = getFactory().getNamespace();
             assert(namespace instanceof BKDistributedLogNamespace);
-            return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore();
+            return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+                    .getLogSegmentMetadataStore();
         }
 
         protected ZooKeeperClient getZooKeeperClient() throws IOException {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
index 7d76f29..78292e9 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
@@ -17,29 +17,39 @@
  */
 package com.twitter.distributedlog.zk;
 
+import com.twitter.distributedlog.util.Transaction.OpListener;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
 
+import javax.annotation.Nullable;
+
 /**
  * Default zookeeper operation. No action on commiting or aborting.
  */
 public class DefaultZKOp extends ZKOp {
 
-    public static DefaultZKOp of(Op op) {
-        return new DefaultZKOp(op);
+    public static DefaultZKOp of(Op op, OpListener<Void> listener) {
+        return new DefaultZKOp(op, listener);
     }
 
-    private DefaultZKOp(Op op) {
+    private final OpListener<Void> listener;
+
+    private DefaultZKOp(Op op, @Nullable OpListener<Void> opListener) {
         super(op);
+        this.listener = opListener;
     }
 
     @Override
     protected void commitOpResult(OpResult opResult) {
-        // no-op
+        if (null != listener) {
+            listener.onCommit(null);
+        }
     }
 
     @Override
     protected void abortOpResult(Throwable t, OpResult opResult) {
-        // no-op
+        if (null != listener) {
+            listener.onAbort(t);
+        }
     }
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
similarity index 98%
rename from distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
rename to distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
index dc25023..78ff0a2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
@@ -15,8 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.twitter.distributedlog.util;
+package com.twitter.distributedlog.zk;
 
+import com.twitter.distributedlog.util.PermitManager;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
index d885593..5b788e2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
@@ -33,7 +33,8 @@
 
     private final OpListener<Version> listener;
 
-    public ZKVersionedSetOp(Op op, OpListener<Version> opListener) {
+    public ZKVersionedSetOp(Op op,
+                            @Nullable OpListener<Version> opListener) {
         super(op);
         this.listener = opListener;
     }
@@ -42,7 +43,9 @@
     protected void commitOpResult(OpResult opResult) {
         assert(opResult instanceof OpResult.SetDataResult);
         OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult;
-        listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
+        if (null != listener) {
+            listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
+        }
     }
 
     @Override
@@ -60,7 +63,9 @@
                 cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
             }
         }
-        listener.onAbort(cause);
+        if (null != listener) {
+            listener.onAbort(cause);
+        }
     }
 
 }
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
index c588cd7..1485ae6 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
@@ -36,6 +36,7 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -429,7 +430,7 @@
                 .setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion))
                 .build();
         l.write(dlm.writerZKC);
-        writeHandler.maxTxId.store(startTxID);
+        writeHandler.maxTxId.update(Version.ANY, startTxID);
         writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
         BKLogSegmentWriter writer = new BKLogSegmentWriter(
                 writeHandler.getFullyQualifiedName(),
@@ -479,7 +480,7 @@
             .setInprogress(false)
             .build();
         l.write(dlm.writerZKC);
-        writeHandler.maxTxId.store(startTxID);
+        writeHandler.maxTxId.update(Version.ANY, startTxID);
         writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
         BKLogSegmentWriter writer = new BKLogSegmentWriter(
                 writeHandler.getFullyQualifiedName(),
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
index d45a727..6aa38c3 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
@@ -21,6 +21,7 @@
 import java.net.URI;
 
 import com.twitter.distributedlog.exceptions.BKTransmitException;
+import com.twitter.distributedlog.util.FutureUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -203,7 +204,7 @@
         BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
 
         URI uri = createDLMURI("/" + name);
-        BKDistributedLogManager.createLog(conf, dlm.getReaderZKC(), uri, name);
+        FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true));
 
         // Log exists but is empty, better not throw.
         AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
@@ -264,7 +265,7 @@
         BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
 
         URI uri = createDLMURI("/" + name);
-        BKDistributedLogManager.createLog(conf, dlm.getReaderZKC(), uri, name);
+        FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true));
 
         // Log exists but is empty, better not throw.
         AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
index 6ad9950..c8a1c74 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
@@ -148,61 +148,6 @@
     }
 
     @Test(timeout = 60000)
-    public void testSanityCheckTxnID() throws Exception {
-        String name = "distrlog-sanity-check-txnid";
-        BKDistributedLogManager dlm = createNewDLM(conf, name);
-        BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned();
-        long txid = 1;
-        for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) {
-            LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-            out.write(op);
-        }
-        out.closeAndComplete();
-
-        BKSyncLogWriter out1 = dlm.startLogSegmentNonPartitioned();
-        LogRecord op1 = DLMTestUtil.getLogRecordInstance(1);
-        try {
-            out1.write(op1);
-            fail("Should fail writing lower txn id if sanityCheckTxnID is enabled.");
-        } catch (TransactionIdOutOfOrderException tioooe) {
-            // expected
-        }
-        out1.closeAndComplete();
-        dlm.close();
-
-        DLMTestUtil.updateBKDLConfig(bkutil.getUri(), bkutil.getZkServers(), bkutil.getBkLedgerPath(), false);
-        LOG.info("Disable sanity check txn id.");
-        BKDLConfig.clearCachedDLConfigs();
-
-        DistributedLogConfiguration newConf = new DistributedLogConfiguration();
-        newConf.addConfiguration(conf);
-        BKDistributedLogManager newDLM = createNewDLM(newConf, name);
-        BKSyncLogWriter out2 = newDLM.startLogSegmentNonPartitioned();
-        LogRecord op2 = DLMTestUtil.getLogRecordInstance(1);
-        out2.write(op2);
-        out2.closeAndComplete();
-        newDLM.close();
-
-        DLMTestUtil.updateBKDLConfig(bkutil.getUri(), bkutil.getZkServers(), bkutil.getBkLedgerPath(), true);
-        LOG.info("Enable sanity check txn id.");
-        BKDLConfig.clearCachedDLConfigs();
-
-        DistributedLogConfiguration conf3 = new DistributedLogConfiguration();
-        conf3.addConfiguration(conf);
-        BKDistributedLogManager dlm3 = createNewDLM(newConf, name);
-        BKSyncLogWriter out3 = dlm3.startLogSegmentNonPartitioned();
-        LogRecord op3 = DLMTestUtil.getLogRecordInstance(1);
-        try {
-            out3.write(op3);
-            fail("Should fail writing lower txn id if sanityCheckTxnID is enabled.");
-        } catch (TransactionIdOutOfOrderException tioooe) {
-            // expected
-        }
-        out3.closeAndComplete();
-        dlm3.close();
-    }
-
-    @Test(timeout = 60000)
     public void testContinuousReaders() throws Exception {
         String name = "distrlog-continuous";
         BKDistributedLogManager dlm = createNewDLM(conf, name);
@@ -958,12 +903,9 @@
         final AtomicReference<Collection<LogSegmentMetadata>> receivedStreams =
                 new AtomicReference<Collection<LogSegmentMetadata>>();
 
-        DistributedLogManager dlm = createNewDLM(conf, name);
-        ZooKeeperClient zkClient = TestZooKeeperClientBuilder.newBuilder()
-                .uri(createDLMURI("/"))
-                .build();
+        BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
 
-        BKDistributedLogManager.createLog(conf, zkClient, ((BKDistributedLogManager) dlm).uri, name);
+        FutureUtils.result(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true));
         dlm.registerListener(new LogSegmentListener() {
             @Override
             public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
@@ -992,7 +934,6 @@
                 // no-op
             }
         });
-        LOG.info("Registered listener for stream {}.", name);
         long txid = 1;
         for (int i = 0; i < numSegments; i++) {
             LOG.info("Waiting for creating log segment {}.", i);
@@ -1018,6 +959,8 @@
             assertEquals(seqno * DEFAULT_SEGMENT_SIZE, m.getLastTxId());
             ++seqno;
         }
+
+        dlm.close();
     }
 
     @Test(timeout = 60000)
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
index e68b916..ecc20e0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
@@ -102,7 +102,7 @@
         dlm.close();
 
         // create the stream
-        BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, streamName);
+        namespace.createLog(streamName);
 
         DistributedLogManager newDLM = namespace.openLog(streamName);
         LogWriter newWriter = newDLM.startLogSegmentNonPartitioned();
@@ -273,9 +273,9 @@
             }
         });
         latches[0].await();
-        BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, "test1");
+        namespace.createLog("test1");
         latches[1].await();
-        BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, "test2");
+        namespace.createLog("test2");
         latches[2].await();
         assertEquals(0, numFailures.get());
         assertNotNull(receivedStreams.get());
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
index 604be0e..8a734b5 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
@@ -191,7 +191,8 @@
     protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogManagerFactory factory) {
         DistributedLogNamespace namespace = factory.getNamespace();
         assertTrue(namespace instanceof BKDistributedLogNamespace);
-        return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore();
+        return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+                .getLogSegmentMetadataStore();
     }
 
     @SuppressWarnings("deprecation")
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
index 54b1ab8..027b012 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
@@ -253,6 +253,7 @@
         confLocal.setOutputBufferSize(0);
         confLocal.setLogSegmentRollingIntervalMinutes(0);
         confLocal.setMaxLogSegmentBytes(1);
+        confLocal.setLogSegmentRollingConcurrency(Integer.MAX_VALUE);
 
         int numLogSegments = 10;
 
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
index 625742e..66b97be 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
@@ -140,7 +140,7 @@
         logger.info("Try obtaining ledger handle {}", lh.getId());
         byte[] data = zkc.get().getData(allocationPath, false, null);
         assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
-        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1)));
+        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
         try {
             FutureUtils.result(txn.execute());
             fail("Should fail the transaction when setting unexisted path");
@@ -337,7 +337,7 @@
         ZKTransaction txn = newTxn();
         // close during obtaining ledger.
         LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
-        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1)));
+        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
         try {
             FutureUtils.result(txn.execute());
             fail("Should fail the transaction when setting unexisted path");
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index d874274..2a8c83b 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -27,6 +27,8 @@
 import com.twitter.distributedlog.ZooKeeperClientUtils;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
@@ -57,6 +59,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 /**
  * Test ZK based log segment metadata store.
@@ -133,14 +136,14 @@
     public void testCreateLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L);
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment);
+        lsmStore.createLogSegment(createTxn, segment, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
                 zkc.get().exists(segment.getZkPath(), false));
         LogSegmentMetadata segment2 = createLogSegment(1L);
         Transaction<Object> createTxn2 = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn2, segment2);
+        lsmStore.createLogSegment(createTxn2, segment2, null);
         try {
             FutureUtils.result(createTxn2.execute());
             fail("Should fail if log segment exists");
@@ -158,13 +161,13 @@
     public void testDeleteLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L);
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment);
+        lsmStore.createLogSegment(createTxn, segment, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
                 zkc.get().exists(segment.getZkPath(), false));
         Transaction<Object> deleteTxn = lsmStore.transaction();
-        lsmStore.deleteLogSegment(deleteTxn, segment);
+        lsmStore.deleteLogSegment(deleteTxn, segment, null);
         FutureUtils.result(deleteTxn.execute());
         assertNull("LogSegment " + segment + " should be deleted",
                 zkc.get().exists(segment.getZkPath(), false));
@@ -174,7 +177,7 @@
     public void testDeleteNonExistentLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L);
         Transaction<Object> deleteTxn = lsmStore.transaction();
-        lsmStore.deleteLogSegment(deleteTxn, segment);
+        lsmStore.deleteLogSegment(deleteTxn, segment, null);
         try {
             FutureUtils.result(deleteTxn.execute());
             fail("Should fail deletion if log segment doesn't exist");
@@ -208,7 +211,7 @@
     public void testUpdateLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L, 99L);
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment);
+        lsmStore.createLogSegment(createTxn, segment, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
@@ -230,15 +233,15 @@
         LogSegmentMetadata segment2 = createLogSegment(2L);
         // create log segment 1
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment1);
+        lsmStore.createLogSegment(createTxn, segment1, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment1 + " should be created",
                 zkc.get().exists(segment1.getZkPath(), false));
         // delete log segment 1 and create log segment 2
         Transaction<Object> createDeleteTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createDeleteTxn, segment2);
-        lsmStore.deleteLogSegment(createDeleteTxn, segment1);
+        lsmStore.createLogSegment(createDeleteTxn, segment2, null);
+        lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
         FutureUtils.result(createDeleteTxn.execute());
         // segment 1 should be deleted, segment 2 should be created
         assertNull("LogSegment " + segment1 + " should be deleted",
@@ -254,16 +257,16 @@
         LogSegmentMetadata segment3 = createLogSegment(3L);
         // create log segment 1
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment1);
+        lsmStore.createLogSegment(createTxn, segment1, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment1 + " should be created",
                 zkc.get().exists(segment1.getZkPath(), false));
         // delete log segment 1 and delete log segment 2
         Transaction<Object> createDeleteTxn = lsmStore.transaction();
-        lsmStore.deleteLogSegment(createDeleteTxn, segment1);
-        lsmStore.deleteLogSegment(createDeleteTxn, segment2);
-        lsmStore.createLogSegment(createDeleteTxn, segment3);
+        lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
+        lsmStore.deleteLogSegment(createDeleteTxn, segment2, null);
+        lsmStore.createLogSegment(createDeleteTxn, segment3, null);
         try {
             FutureUtils.result(createDeleteTxn.execute());
             fail("Should fail transaction if one operation failed");
@@ -286,7 +289,7 @@
     public void testGetLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L, 99L);
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment);
+        lsmStore.createLogSegment(createTxn, segment, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
@@ -304,7 +307,7 @@
         for (int i = 0; i < 10; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
             createdSegments.add(segment);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -353,7 +356,7 @@
         Transaction<Object> createTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -394,7 +397,7 @@
         Transaction<Object> anotherCreateTxn = lsmStore.transaction();
         for (int i = numSegments; i < 2 * numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(anotherCreateTxn, segment);
+            lsmStore.createLogSegment(anotherCreateTxn, segment, null);
         }
         FutureUtils.result(anotherCreateTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -419,7 +422,7 @@
         Transaction<Object> createTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -459,7 +462,7 @@
         Transaction<Object> deleteTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.deleteLogSegment(deleteTxn, segment);
+            lsmStore.deleteLogSegment(deleteTxn, segment, null);
         }
         FutureUtils.result(deleteTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -491,7 +494,7 @@
         Transaction<Object> createTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -536,7 +539,7 @@
         Transaction<Object> anotherCreateTxn = lsmStore.transaction();
         for (int i = numSegments; i < 2 * numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(anotherCreateTxn, segment);
+            lsmStore.createLogSegment(anotherCreateTxn, segment, null);
         }
         FutureUtils.result(anotherCreateTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -561,7 +564,7 @@
         Transaction<Object> createTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -602,7 +605,7 @@
         Transaction<Object> deleteTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.deleteLogSegment(deleteTxn, segment);
+            lsmStore.deleteLogSegment(deleteTxn, segment, null);
         }
         FutureUtils.result(deleteTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -634,7 +637,9 @@
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
         final Promise<Version> result = new Promise<Version>();
-        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value,
+        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
+        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
             @Override
             public void onCommit(Version r) {
@@ -659,7 +664,9 @@
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
-        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value,
+        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
+        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
@@ -695,7 +702,9 @@
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
-        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value,
+        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath);
+        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
@@ -726,7 +735,9 @@
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
         final Promise<Version> result = new Promise<Version>();
-        lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
+        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
+        lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
             @Override
             public void onCommit(Version r) {
@@ -751,7 +762,9 @@
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
-        lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
+        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
+        lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
@@ -787,7 +800,9 @@
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
-        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value,
+        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath);
+        lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
similarity index 93%
rename from distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
rename to distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index 648b828..9a08aa0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -52,14 +52,15 @@
 import java.util.List;
 
 import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
 import static org.junit.Assert.*;
 
 /**
- * Test {@link ZKLogMetadataForWriter}
+ * Test {@link ZKLogStreamMetadataStore}
  */
-public class TestZKLogMetadataForWriter extends ZooKeeperClusterTestCase {
+public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
 
-    private static final Logger logger = LoggerFactory.getLogger(TestZKLogMetadataForWriter.class);
+    private static final Logger logger = LoggerFactory.getLogger(TestZKLogStreamMetadataStore.class);
 
     private final static int sessionTimeoutMs = 30000;
 
@@ -91,7 +92,7 @@
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
         txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(versionPath, ZKLogMetadataForWriter.intToBytes(LAYOUT_VERSION),
+        txn.create(versionPath, intToBytes(LAYOUT_VERSION),
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
         txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
@@ -127,7 +128,7 @@
     public void testCheckLogMetadataPathsWithAllocator() throws Exception {
         String logRootPath = "/" + testName.getMethodName();
         List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(
+                FutureUtils.result(checkLogMetadataPaths(
                         zkc.get(), logRootPath, true));
         assertEquals("Should have 8 paths",
                 8, metadatas.size());
@@ -141,7 +142,7 @@
     public void testCheckLogMetadataPathsWithoutAllocator() throws Exception {
         String logRootPath = "/" + testName.getMethodName();
         List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(
+                FutureUtils.result(checkLogMetadataPaths(
                         zkc.get(), logRootPath, false));
         assertEquals("Should have 7 paths",
                 7, metadatas.size());
@@ -167,13 +168,12 @@
         }
 
         ZKLogMetadataForWriter logMetadata =
-                FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier,
-                        zkc.get(), zkc.getDefaultACL(), ownAllocator, true));
+                FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, ownAllocator, true));
 
         final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
 
         List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator));
+                FutureUtils.result(checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator));
 
         if (ownAllocator) {
             assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator,
@@ -184,7 +184,7 @@
         }
 
         for (Versioned<byte[]> metadata : metadatas) {
-            assertTrue(ZKLogMetadataForWriter.pathExists(metadata));
+            assertTrue(pathExists(metadata));
             assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0);
         }
 
@@ -300,8 +300,7 @@
     public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception {
         String logName = testName.getMethodName();
         String logIdentifier = "<default>";
-        FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier,
-                        zkc.get(), zkc.getDefaultACL(), true, false));
+        FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, true, false));
     }
 
     @Test(timeout = 60000)
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
similarity index 83%
rename from distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
rename to distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
index ce67c30..f14a217 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
@@ -28,9 +28,10 @@
 import java.net.URI;
 import java.util.List;
 
+import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
 import static org.junit.Assert.*;
 
-public class TestZKLogMetadataForWriterUtilFunctions {
+public class TestZKLogStreamMetadataStoreUtils {
 
     @SuppressWarnings("unchecked")
     @Test(timeout = 60000, expected = UnexpectedException.class)
@@ -43,7 +44,7 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
 
     @SuppressWarnings("unchecked")
@@ -58,7 +59,7 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
                 new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
 
     @SuppressWarnings("unchecked")
@@ -72,8 +73,8 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(9999), null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+                new Versioned<byte[]>(intToBytes(9999), null));
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
 
     @SuppressWarnings("unchecked")
@@ -87,9 +88,9 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
                 new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
 
     @SuppressWarnings("unchecked")
@@ -103,10 +104,10 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
 
     @SuppressWarnings("unchecked")
@@ -120,11 +121,11 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
     }
 
     @SuppressWarnings("unchecked")
@@ -138,12 +139,12 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)),
                 new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
     }
 
     @SuppressWarnings("unchecked")
@@ -161,12 +162,12 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 maxTxnIdData,
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 logSegmentsData);
         ZKLogMetadataForWriter metadata =
-                ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+                processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
         assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
         assertTrue(logSegmentsData == metadata.getMaxLSSNData());
         assertNull(metadata.getAllocationData().getValue());
@@ -190,15 +191,16 @@
                 new Versioned<byte[]>(null, null),
                 new Versioned<byte[]>(null, null),
                 maxTxnIdData,
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
                 logSegmentsData,
                 allocationData);
         ZKLogMetadataForWriter metadata =
-                ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
+                processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
         assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
         assertTrue(logSegmentsData == metadata.getMaxLSSNData());
         assertTrue(allocationData == metadata.getAllocationData());
     }
+
 }
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
index 8899c0e..db87a65 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog.util;
 
+import com.twitter.distributedlog.zk.LimitedPermitManager;
 import org.junit.Test;
 
 import java.util.ArrayList;