Issue 225: Create log should create missing path components

Descriptions of the changes in this PR:

reuse the methods used by `rename` to create missing path components.

(the test is covered by #227)

Author: Sijie Guo <sijie@apache.org>

Reviewers: Jia Zhai <None>

This closes #228 from sijie/fix_create_log_pr
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index b3250fa..c046fc6 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -35,7 +35,7 @@
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -363,6 +363,7 @@
     }
 
     static void createMissingMetadata(final ZooKeeper zk,
+                                      final String basePath,
                                       final String logRootPath,
                                       final List<Versioned<byte[]>> metadatas,
                                       final List<ACL> acl,
@@ -374,10 +375,10 @@
         CreateMode createMode = CreateMode.PERSISTENT;
 
         // log root parent path
+        String logRootParentPath = Utils.getParent(logRootPath);
         if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
             pathsToCreate.add(null);
         } else {
-            String logRootParentPath = Utils.getParent(logRootPath);
             pathsToCreate.add(EMPTY_BYTES);
             zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
         }
@@ -425,7 +426,7 @@
             pathsToCreate.add(null);
         } else {
             byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
-                    DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
+                DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
             pathsToCreate.add(logSegmentsData);
             zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
         }
@@ -436,7 +437,7 @@
             } else {
                 pathsToCreate.add(EMPTY_BYTES);
                 zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
-                        EMPTY_BYTES, acl, createMode));
+                    EMPTY_BYTES, acl, createMode));
             }
         }
         if (zkOps.isEmpty()) {
@@ -449,6 +450,41 @@
             return;
         }
 
+        getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
+            .whenComplete(new FutureEventListener<List<String>>() {
+                @Override
+                public void onSuccess(List<String> paths) {
+                    for (String path : paths) {
+                        pathsToCreate.add(EMPTY_BYTES);
+                        zkOps.add(
+                            0, Op.create(path, EMPTY_BYTES, acl, createMode));
+                    }
+                    executeCreateMissingPathTxn(
+                        zk,
+                        zkOps,
+                        pathsToCreate,
+                        metadatas,
+                        logRootPath,
+                        promise
+                    );
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    promise.completeExceptionally(cause);
+                    return;
+                }
+            });
+
+    }
+
+    private static void executeCreateMissingPathTxn(ZooKeeper zk,
+                                                    List<Op> zkOps,
+                                                    List<byte[]> pathsToCreate,
+                                                    List<Versioned<byte[]>> metadatas,
+                                                    String logRootPath,
+                                                    CompletableFuture<List<Versioned<byte[]>>> promise) {
+
         zk.multi(zkOps, new AsyncCallback.MultiCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
@@ -549,29 +585,30 @@
         try {
             final ZooKeeper zk = zooKeeperClient.get();
             return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
-                    .thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<List<Versioned<byte[]>>>>() {
-                        @Override
-                        public CompletableFuture<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
-                            CompletableFuture<List<Versioned<byte[]>>> promise =
-                                    new CompletableFuture<List<Versioned<byte[]>>>();
-                            createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
-                                    ownAllocator, createIfNotExists, promise);
-                            return promise;
-                        }
-                    }).thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<LogMetadataForWriter>>() {
-                        @Override
-                        public CompletableFuture<LogMetadataForWriter> apply(List<Versioned<byte[]>> metadatas) {
-                            try {
-                                return FutureUtils.value(
-                                    processLogMetadatas(
-                                        uri,
-                                        logName,
-                                        logIdentifier,
-                                        metadatas,
-                                        ownAllocator));
-                            } catch (UnexpectedException e) {
-                                return FutureUtils.exception(e);
-                            }
+                    .thenCompose(metadatas -> {
+                        CompletableFuture<List<Versioned<byte[]>>> promise =
+                                new CompletableFuture<List<Versioned<byte[]>>>();
+                        createMissingMetadata(
+                            zk,
+                            uri.getPath(),
+                            logRootPath,
+                            metadatas,
+                            zooKeeperClient.getDefaultACL(),
+                            ownAllocator,
+                            createIfNotExists,
+                            promise);
+                        return promise;
+                    }).thenCompose(metadatas -> {
+                        try {
+                            return FutureUtils.value(
+                                processLogMetadatas(
+                                    uri,
+                                    logName,
+                                    logIdentifier,
+                                    metadatas,
+                                    ownAllocator));
+                        } catch (UnexpectedException e) {
+                            return FutureUtils.exception(e);
                         }
                     });
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
@@ -749,16 +786,22 @@
 
     @VisibleForTesting
     static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
+        ZooKeeper zk;
+        try {
+            zk = zkc.get();
+        } catch (ZooKeeperConnectionException | InterruptedException e) {
+            return FutureUtils.exception(e);
+        }
         String basePath = uri.getPath();
         String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
-        LinkedList<String> missingPaths = Lists.newLinkedList();
+        return getMissingPaths(zk, basePath, logStreamPath);
+    }
 
+    @VisibleForTesting
+    static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) {
+        LinkedList<String> missingPaths = Lists.newLinkedList();
         CompletableFuture<List<String>> future = FutureUtils.createFuture();
-        try {
-            existPath(zkc.get(), logStreamPath, basePath, missingPaths, future);
-        } catch (ZooKeeperConnectionException | InterruptedException e) {
-            future.completeExceptionally(e);
-        }
+        existPath(zk, logStreamPath, basePath, missingPaths, future);
         return future;
     }