ISSUE #209: Support rename log

Descriptions of the changes in this PR:

- add rename operation in `Namespace`
- add rename operation in `LogStreamMetadataStore`
- implement the rename operation use zookeeper `multi` operation

Author: Sijie Guo <sijie@apache.org>
Author: Shoukun Huai <shoukunhuai@gmail.com>
Author: Arvin <arvindevel@gmail.com>

Reviewers: Jia Zhai <None>

This closes #210 from sijie/4_support_rename_pr, closes #209
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index cd5a17a..d2e2169 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -25,6 +25,7 @@
 import java.net.URI;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.feature.FeatureProvider;
@@ -33,6 +34,7 @@
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.common.util.PermitLimiter;
 import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -194,6 +196,28 @@
     }
 
     @Override
+    public CompletableFuture<Void> renameLog(String oldName, String newName) {
+        try {
+            checkState();
+            final String oldLogName = validateAndNormalizeName(oldName);
+            final String newLogName = validateAndNormalizeName(newName);
+
+            return driver.getLogMetadataStore().getLogLocation(oldName)
+                .thenCompose(uriOptional -> {
+                    if (uriOptional.isPresent()) {
+                        return driver.getLogStreamMetadataStore(WRITER)
+                            .renameLog(uriOptional.get(), oldLogName, newLogName);
+                    } else {
+                        return FutureUtils.exception(
+                            new LogNotFoundException("Log " + oldLogName + " isn't found."));
+                    }
+                });
+        } catch (IOException ioe) {
+            return FutureUtils.exception(ioe);
+        }
+    }
+
+    @Override
     public boolean logExists(String logName)
         throws IOException, IllegalArgumentException {
         checkState();
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
index dafc099..712295d 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -107,6 +108,19 @@
             throws InvalidStreamNameException, LogNotFoundException, IOException;
 
     /**
+     * Rename a log from <i>oldName</i> to <i>newName</i>.
+     *
+     * @param oldName old log name
+     * @param newName new log name
+     * @return a future represents the rename result.
+     * @throws InvalidStreamNameException if log name is invalid
+     * @throws LogNotFoundException if old log doesn't exist
+     * @throws org.apache.distributedlog.exceptions.LogExistsException if the new log exists
+     * @throws IOException when encountered issues with backend.
+     */
+    CompletableFuture<Void> renameLog(String oldName, String newName);
+
+    /**
      * Open a log named <i>logName</i>.
      * A distributedlog manager is returned to access log <i>logName</i>.
      *
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 3c55edc..b3250fa 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -17,14 +17,20 @@
  */
 package org.apache.distributedlog.impl.metadata;
 
+import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.distributedlog.DistributedLogConstants.EMPTY_BYTES;
+import static org.apache.distributedlog.DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
 import static org.apache.distributedlog.metadata.LogMetadata.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -35,13 +41,16 @@
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.ZooKeeperClient.ZooKeeperConnectionException;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.exceptions.LockCancelledException;
+import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.exceptions.LogExistsException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
@@ -65,7 +74,10 @@
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Op.Create;
+import org.apache.zookeeper.Op.Delete;
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooKeeper;
@@ -366,16 +378,16 @@
             pathsToCreate.add(null);
         } else {
             String logRootParentPath = Utils.getParent(logRootPath);
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            pathsToCreate.add(EMPTY_BYTES);
+            zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
         }
 
         // log root path
         if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
             pathsToCreate.add(null);
         } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            pathsToCreate.add(EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath, EMPTY_BYTES, acl, createMode));
         }
 
         // max id
@@ -398,15 +410,15 @@
         if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
             pathsToCreate.add(null);
         } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            pathsToCreate.add(EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath + LOCK_PATH, EMPTY_BYTES, acl, createMode));
         }
         // read lock path
         if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
             pathsToCreate.add(null);
         } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            pathsToCreate.add(EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, EMPTY_BYTES, acl, createMode));
         }
         // log segments path
         if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
@@ -422,9 +434,9 @@
             if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
                 pathsToCreate.add(null);
             } else {
-                pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+                pathsToCreate.add(EMPTY_BYTES);
                 zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
-                        DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+                        EMPTY_BYTES, acl, createMode));
             }
         }
         if (zkOps.isEmpty()) {
@@ -620,4 +632,274 @@
         }
         return promise;
     }
+
+    //
+    // Rename Log
+    //
+
+    @Override
+    public CompletableFuture<Void> renameLog(URI uri, String oldStreamName, String newStreamName) {
+        return getLog(
+            uri,
+            oldStreamName,
+            true,
+            false
+        ).thenCompose(metadata -> renameLogMetadata(uri, metadata, newStreamName));
+    }
+
+    private CompletableFuture<Void> renameLogMetadata(URI uri,
+                                                      LogMetadataForWriter oldMetadata,
+                                                      String newStreamName) {
+
+
+        final LinkedList<Op> createOps = Lists.newLinkedList();
+        final LinkedList<Op> deleteOps = Lists.newLinkedList();
+
+        List<ACL> acls = zooKeeperClient.getDefaultACL();
+
+        // get the root path
+        String oldRootPath = oldMetadata.getLogRootPath();
+        String newRootPath = LogMetadata.getLogRootPath(
+            uri, newStreamName, conf.getUnpartitionedStreamName());
+
+        // 0. the log path
+        deleteOps.addFirst(Op.delete(
+            LogMetadata.getLogStreamPath(uri, oldMetadata.getLogName()), -1));
+
+        // 1. the root path
+        createOps.addLast(Op.create(
+            newRootPath, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldRootPath, -1));
+
+        // 2. max id
+        Versioned<byte[]> maxTxIdData = oldMetadata.getMaxTxIdData();
+        deleteOldPathAndCreateNewPath(
+            oldRootPath, MAX_TXID_PATH, maxTxIdData,
+            newRootPath, DLUtils.serializeTransactionId(0L), acls,
+            createOps, deleteOps
+        );
+
+        // 3. version
+        createOps.addLast(Op.create(
+            newRootPath + VERSION_PATH, intToBytes(LAYOUT_VERSION), acls, CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldRootPath + VERSION_PATH, -1));
+
+        // 4. lock path (NOTE: if the stream is locked by a writer, then the delete will fail as you can not
+        //    delete the lock path if children is not empty.
+        createOps.addLast(Op.create(
+            newRootPath + LOCK_PATH, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldRootPath + LOCK_PATH, -1));
+
+        // 5. read lock path (NOTE: same reason as the write lock)
+        createOps.addLast(Op.create(
+            newRootPath + READ_LOCK_PATH, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldRootPath + READ_LOCK_PATH, -1));
+
+        // 6. allocation path
+        Versioned<byte[]> allocationData = oldMetadata.getAllocationData();
+        deleteOldPathAndCreateNewPath(
+            oldRootPath, ALLOCATION_PATH, allocationData,
+            newRootPath, EMPTY_BYTES, acls,
+            createOps, deleteOps);
+
+        // 7. log segments
+        Versioned<byte[]> maxLSSNData = oldMetadata.getMaxLSSNData();
+        deleteOldPathAndCreateNewPath(
+            oldRootPath, LOGSEGMENTS_PATH, maxLSSNData,
+            newRootPath, DLUtils.serializeLogSegmentSequenceNumber(UNASSIGNED_LOGSEGMENT_SEQNO), acls,
+            createOps, deleteOps);
+
+        // 8. copy the log segments
+        CompletableFuture<List<LogSegmentMetadata>> segmentsFuture;
+        if (pathExists(maxLSSNData)) {
+            segmentsFuture = getLogSegments(zooKeeperClient, oldRootPath + LOGSEGMENTS_PATH);
+        } else {
+            segmentsFuture = FutureUtils.value(Collections.emptyList());
+        }
+        return segmentsFuture
+            // copy the segments
+            .thenApply(segments -> {
+                for (LogSegmentMetadata segment : segments) {
+                    deleteOldSegmentAndCreateNewSegment(
+                        segment,
+                        newRootPath + LOGSEGMENTS_PATH,
+                        acls,
+                        createOps,
+                        deleteOps);
+                }
+                return null;
+            })
+            // get the missing paths
+            .thenCompose(ignored ->
+                getMissingPaths(zooKeeperClient, uri, newStreamName)
+            )
+            // create the missing paths and execute the rename transaction
+            .thenCompose(paths -> {
+                for (String path : paths) {
+                    createOps.addFirst(Op.create(
+                        path, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+                }
+                return executeRenameTxn(oldRootPath, newRootPath, createOps, deleteOps);
+            });
+    }
+
+    @VisibleForTesting
+    static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
+        String basePath = uri.getPath();
+        String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
+        LinkedList<String> missingPaths = Lists.newLinkedList();
+
+        CompletableFuture<List<String>> future = FutureUtils.createFuture();
+        try {
+            existPath(zkc.get(), logStreamPath, basePath, missingPaths, future);
+        } catch (ZooKeeperConnectionException | InterruptedException e) {
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+
+    private static void existPath(ZooKeeper zk,
+                                  String path,
+                                  String basePath,
+                                  LinkedList<String> missingPaths,
+                                  CompletableFuture<List<String>> future) {
+        if (basePath.equals(path)) {
+            future.complete(missingPaths);
+            return;
+        }
+        zk.exists(path, false, (rc, path1, ctx, stat) -> {
+            if (Code.OK.intValue() != rc && Code.NONODE.intValue() != rc) {
+                future.completeExceptionally(new ZKException("Failed to check existence of path " + path1,
+                    Code.get(rc)));
+                return;
+            }
+
+            if (Code.OK.intValue() == rc) {
+                future.complete(missingPaths);
+                return;
+            }
+
+            missingPaths.addLast(path);
+            String parentPath = Utils.getParent(path);
+            existPath(zk, parentPath, basePath, missingPaths, future);
+        }, null);
+    }
+
+    private CompletableFuture<Void> executeRenameTxn(String oldLogPath,
+                                                     String newLogPath,
+                                                     LinkedList<Op> createOps,
+                                                     LinkedList<Op> deleteOps) {
+        CompletableFuture<Void> future = FutureUtils.createFuture();
+        List<Op> zkOps = Lists.newArrayListWithExpectedSize(createOps.size() + deleteOps.size());
+        zkOps.addAll(createOps);
+        zkOps.addAll(deleteOps);
+
+        if (LOG.isDebugEnabled()) {
+            for (Op op : zkOps) {
+                if (op instanceof Create) {
+                    Create create = (Create) op;
+                    LOG.debug("op : create {}", create.getPath());
+                } else if (op instanceof Delete) {
+                    Delete delete = (Delete) op;
+                    LOG.debug("op : delete {}, record = {}", delete.getPath(), op.toRequestRecord());
+                } else {
+                    LOG.debug("op : {}", op);
+                }
+            }
+        }
+
+        try {
+            zooKeeperClient.get().multi(zkOps, (rc, path, ctx, opResults) -> {
+                if (Code.OK.intValue() == rc) {
+                    future.complete(null);
+                } else if (Code.NODEEXISTS.intValue() == rc) {
+                    future.completeExceptionally(new LogExistsException("Someone just created new log " + newLogPath));
+                } else if (Code.NOTEMPTY.intValue() == rc) {
+                    future.completeExceptionally(new LockingException(oldLogPath + LOCK_PATH,
+                        "Someone is holding a lock on log " + oldLogPath));
+                } else {
+                    future.completeExceptionally(new ZKException("Failed to rename log "
+                        + oldLogPath + " to " + newLogPath + " at path " + path, Code.get(rc)));
+                }
+            }, null);
+        } catch (ZooKeeperConnectionException e) {
+            future.completeExceptionally(e);
+        } catch (InterruptedException e) {
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+
+    private static void deleteOldSegmentAndCreateNewSegment(LogSegmentMetadata oldMetadata,
+                                                            String newSegmentsPath,
+                                                            List<ACL> acls,
+                                                            LinkedList<Op> createOps,
+                                                            LinkedList<Op> deleteOps) {
+        createOps.addLast(Op.create(
+            newSegmentsPath + "/" + oldMetadata.getZNodeName(),
+            oldMetadata.getFinalisedData().getBytes(UTF_8),
+            acls,
+            CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldMetadata.getZkPath(),
+            -1));
+    }
+
+    private static void deleteOldPathAndCreateNewPath(String oldRootPath,
+                                                      String nodePath,
+                                                      Versioned<byte[]> pathData,
+                                                      String newRootPath,
+                                                      byte[] initData,
+                                                      List<ACL> acls,
+                                                      LinkedList<Op> createOps,
+                                                      LinkedList<Op> deleteOps) {
+        if (pathExists(pathData)) {
+            createOps.addLast(Op.create(
+                newRootPath + nodePath, pathData.getValue(), acls, CreateMode.PERSISTENT));
+            deleteOps.addFirst(Op.delete(
+                oldRootPath + nodePath, (int) ((LongVersion) pathData.getVersion()).getLongVersion()));
+        } else {
+            createOps.addLast(Op.create(
+                newRootPath + nodePath, initData, acls, CreateMode.PERSISTENT));
+        }
+    }
+
+    @VisibleForTesting
+    static CompletableFuture<List<LogSegmentMetadata>> getLogSegments(ZooKeeperClient zk,
+                                                                      String logSegmentsPath) {
+        CompletableFuture<List<LogSegmentMetadata>> future = FutureUtils.createFuture();
+        try {
+            zk.get().getChildren(logSegmentsPath, false, (rc, path, ctx, children, stat) -> {
+                if (Code.OK.intValue() != rc) {
+                    if (Code.NONODE.intValue() == rc) {
+                        future.completeExceptionally(new LogNotFoundException("Log " + path + " not found"));
+                    } else {
+                        future.completeExceptionally(new ZKException("Failed to get log segments from " + path,
+                            Code.get(rc)));
+                    }
+                    return;
+                }
+
+                // get all the segments
+                List<CompletableFuture<LogSegmentMetadata>> futures =
+                    Lists.newArrayListWithExpectedSize(children.size());
+                for (String child : children) {
+                    futures.add(LogSegmentMetadata.read(zk, logSegmentsPath + "/" + child));
+                }
+                FutureUtils.proxyTo(
+                    FutureUtils.collect(futures),
+                    future);
+            }, null);
+        } catch (ZooKeeperConnectionException e) {
+            future.completeExceptionally(e);
+        } catch (InterruptedException e) {
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
index 41dc500..712619a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
@@ -95,6 +95,18 @@
     CompletableFuture<Void> deleteLog(URI uri, String streamName);
 
     /**
+     * Rename the log from <i>oldStreamName</i> to <i>newStreamName</i>.
+     *
+     * @param uri the location to store the metadata of the log
+     * @param oldStreamName the old name of the log stream
+     * @param newStreamName the new name of the log stream
+     * @return future represents the result of the rename operation.
+     */
+    CompletableFuture<Void> renameLog(URI uri,
+                                      String oldStreamName,
+                                      String newStreamName);
+
+    /**
      * Get the log segment metadata store.
      *
      * @return the log segment metadata store.
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index f1cec9d..4aa832a 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -17,36 +17,60 @@
  */
 package org.apache.distributedlog.impl.metadata;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.DistributedLogConstants.EMPTY_BYTES;
 import static org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
 import static org.apache.distributedlog.metadata.LogMetadata.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import java.net.URI;
+import java.util.Collections;
 import java.util.List;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClusterTestCase;
 import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.LogExistsException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Transaction;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -69,6 +93,8 @@
 
     private ZooKeeperClient zkc;
     private URI uri;
+    private OrderedScheduler scheduler;
+    private ZKLogStreamMetadataStore metadataStore;
 
     private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier)
             throws Exception {
@@ -88,17 +114,68 @@
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
         txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES,
+        txn.create(lockPath, EMPTY_BYTES,
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
+        txn.create(readLockPath, EMPTY_BYTES,
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
         txn.create(versionPath, intToBytes(LAYOUT_VERSION),
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
+        txn.create(allocationPath, EMPTY_BYTES,
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
         txn.commit();
     }
 
+    private static void createLog(ZooKeeperClient zk,
+                                  URI uri,
+                                  String logName,
+                                  String logIdentifier,
+                                  int numSegments)
+            throws Exception {
+        final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
+        final String lockPath = logRootPath + LOCK_PATH;
+        final String readLockPath = logRootPath + READ_LOCK_PATH;
+        final String versionPath = logRootPath + VERSION_PATH;
+        final String allocationPath = logRootPath + ALLOCATION_PATH;
+
+        Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        Transaction txn = zk.get().transaction();
+        txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
+                        DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(lockPath, EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(readLockPath, EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(versionPath, intToBytes(LAYOUT_VERSION),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(allocationPath, EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+
+        for (int i = 0; i < numSegments; i++) {
+            LogSegmentMetadata segment = DLMTestUtil.completedLogSegment(
+                logSegmentsPath,
+                i + 1L,
+                1L + i * 1000L,
+                (i + 1) * 1000L,
+                1000,
+                i + 1L,
+                999L,
+                0L);
+            txn.create(
+                segment.getZkPath(),
+                segment.getFinalisedData().getBytes(UTF_8),
+                zk.getDefaultACL(),
+                CreateMode.PERSISTENT);
+        }
+
+        txn.commit();
+    }
+
     @Before
     public void setup() throws Exception {
         zkc = TestZooKeeperClientBuilder.newBuilder()
@@ -117,10 +194,26 @@
         } catch (KeeperException.NodeExistsException nee) {
             logger.debug("The namespace uri already exists.");
         }
+        scheduler = OrderedScheduler.newBuilder()
+            .name("test-scheduler")
+            .corePoolSize(1)
+            .build();
+        metadataStore = new ZKLogStreamMetadataStore(
+            "test-logstream-metadata-store",
+            new DistributedLogConfiguration(),
+            zkc,
+            scheduler,
+            NullStatsLogger.INSTANCE);
     }
 
     @After
     public void teardown() throws Exception {
+        if (null != metadataStore) {
+            metadataStore.close();
+        }
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
         zkc.close();
     }
 
@@ -323,4 +416,167 @@
         testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
     }
 
+    @Test(timeout = 60000, expected = LogNotFoundException.class)
+    public void testGetLogSegmentsLogNotFound() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+
+        String logSegmentsPath = LogMetadata.getLogSegmentsPath(uri, logName, logIdentifier);
+        FutureUtils.result(getLogSegments(zkc, logSegmentsPath));
+    }
+
+    @Test(timeout = 60000)
+    public void testGetLogSegmentsZKExceptions() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+
+        ZooKeeper mockZk = mock(ZooKeeper.class);
+        ZooKeeperClient mockZkc = mock(ZooKeeperClient.class);
+        when(mockZkc.get()).thenReturn(mockZk);
+        doAnswer(invocationOnMock -> {
+            String path = (String) invocationOnMock.getArguments()[0];
+            Children2Callback callback = (Children2Callback) invocationOnMock.getArguments()[2];
+            callback.processResult(Code.BADVERSION.intValue(), path, null, null, null);
+            return null;
+        }).when(mockZk).getChildren(anyString(), anyBoolean(), any(Children2Callback.class), anyObject());
+
+        String logSegmentsPath = LogMetadata.getLogSegmentsPath(uri, logName, logIdentifier);
+        try {
+            FutureUtils.result(getLogSegments(mockZkc, logSegmentsPath));
+            fail("Should fail to get log segments when encountering zk exceptions");
+        } catch (ZKException zke) {
+            assertEquals(Code.BADVERSION, zke.getKeeperExceptionCode());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testGetLogSegments() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+
+        // create log
+        createLog(
+            zkc,
+            uri,
+            logName,
+            logIdentifier,
+            5);
+
+        List<LogSegmentMetadata> segments = FutureUtils.result(
+            getLogSegments(zkc, LogMetadata.getLogSegmentsPath(uri, logName, logIdentifier)));
+        assertEquals(5, segments.size());
+        for (int i = 0; i < 5; i++) {
+            assertEquals(1L + i, segments.get(i).getLogSegmentSequenceNumber());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testGetMissingPathsRecursive() throws Exception {
+        List<String> missingPaths = FutureUtils.result(
+            getMissingPaths(zkc, uri, "path/to/log"));
+
+        assertEquals(
+            Lists.newArrayList(
+                uri.getPath() + "/path/to/log",
+                uri.getPath() + "/path/to",
+                uri.getPath() + "/path"
+            ),
+            missingPaths);
+    }
+
+    @Test(timeout = 60000)
+    public void testGetMissingPathsRecursive2() throws Exception {
+        String path = uri.getPath() + "/path/to/log";
+        ZkUtils.createFullPathOptimistic(
+            zkc.get(), path, EMPTY_BYTES, zkc.getDefaultACL(), CreateMode.PERSISTENT);
+
+        List<String> missingPaths = FutureUtils.result(
+            getMissingPaths(zkc, uri, "path/to/log"));
+
+        assertEquals(
+            Collections.emptyList(),
+            missingPaths);
+    }
+
+    @Test(timeout = 60000)
+    public void testGetMissingPathsFailure() throws Exception {
+        ZooKeeper mockZk = mock(ZooKeeper.class);
+        ZooKeeperClient mockZkc = mock(ZooKeeperClient.class);
+        when(mockZkc.get()).thenReturn(mockZk);
+        doAnswer(invocationOnMock -> {
+            String path = (String) invocationOnMock.getArguments()[0];
+            StatCallback callback = (StatCallback) invocationOnMock.getArguments()[2];
+            callback.processResult(Code.BADVERSION.intValue(), path, null, null);
+            return null;
+        }).when(mockZk).exists(anyString(), anyBoolean(), any(StatCallback.class), anyObject());
+
+        try {
+            FutureUtils.result(getMissingPaths(mockZkc, uri, "path/to/log"));
+            fail("Should fail on getting missing paths on zookeeper exceptions.");
+        } catch (ZKException zke) {
+            assertEquals(Code.BADVERSION, zke.getKeeperExceptionCode());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testRenameLog() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        int numSegments = 5;
+
+        createLog(
+            zkc,
+            uri,
+            logName,
+            logIdentifier,
+            numSegments);
+
+        String newLogName = "path/to/new/" + logName;
+        FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+    }
+
+    @Test(timeout = 60000, expected = LogExistsException.class)
+    public void testRenameLogExists() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        int numSegments = 5;
+        createLog(
+            zkc,
+            uri,
+            logName,
+            logIdentifier,
+            numSegments);
+
+        String newLogName = "path/to/new/" + logName;
+        createLog(
+            zkc,
+            uri,
+            newLogName,
+            logIdentifier,
+            3);
+
+        FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+    }
+
+    @Test(timeout = 60000, expected = LockingException.class)
+    public void testRenameLockedLog() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        int numSegments = 5;
+        createLog(
+            zkc,
+            uri,
+            logName,
+            logIdentifier,
+            numSegments);
+
+        // create a lock
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        String lockPath = logRootPath + LOCK_PATH;
+        zkc.get().create(lockPath + "/test", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+        String newLogName = "path/to/new/" + logName;
+        FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+    }
+
 }