Issue 225: Create log should create missing path components
Descriptions of the changes in this PR:
reuse the methods used by `rename` to create missing path components.
(the test is covered by #227)
Author: Sijie Guo <sijie@apache.org>
Reviewers: Jia Zhai <None>
This closes #228 from sijie/fix_create_log_pr
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index b3250fa..c046fc6 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -35,7 +35,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
@@ -363,6 +363,7 @@
}
static void createMissingMetadata(final ZooKeeper zk,
+ final String basePath,
final String logRootPath,
final List<Versioned<byte[]>> metadatas,
final List<ACL> acl,
@@ -374,10 +375,10 @@
CreateMode createMode = CreateMode.PERSISTENT;
// log root parent path
+ String logRootParentPath = Utils.getParent(logRootPath);
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
pathsToCreate.add(null);
} else {
- String logRootParentPath = Utils.getParent(logRootPath);
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
}
@@ -425,7 +426,7 @@
pathsToCreate.add(null);
} else {
byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
- DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
+ DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
pathsToCreate.add(logSegmentsData);
zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
}
@@ -436,7 +437,7 @@
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
- EMPTY_BYTES, acl, createMode));
+ EMPTY_BYTES, acl, createMode));
}
}
if (zkOps.isEmpty()) {
@@ -449,6 +450,41 @@
return;
}
+ getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
+ .whenComplete(new FutureEventListener<List<String>>() {
+ @Override
+ public void onSuccess(List<String> paths) {
+ for (String path : paths) {
+ pathsToCreate.add(EMPTY_BYTES);
+ zkOps.add(
+ 0, Op.create(path, EMPTY_BYTES, acl, createMode));
+ }
+ executeCreateMissingPathTxn(
+ zk,
+ zkOps,
+ pathsToCreate,
+ metadatas,
+ logRootPath,
+ promise
+ );
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ promise.completeExceptionally(cause);
+ return;
+ }
+ });
+
+ }
+
+ private static void executeCreateMissingPathTxn(ZooKeeper zk,
+ List<Op> zkOps,
+ List<byte[]> pathsToCreate,
+ List<Versioned<byte[]>> metadatas,
+ String logRootPath,
+ CompletableFuture<List<Versioned<byte[]>>> promise) {
+
zk.multi(zkOps, new AsyncCallback.MultiCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
@@ -549,29 +585,30 @@
try {
final ZooKeeper zk = zooKeeperClient.get();
return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
- .thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<List<Versioned<byte[]>>>>() {
- @Override
- public CompletableFuture<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
- CompletableFuture<List<Versioned<byte[]>>> promise =
- new CompletableFuture<List<Versioned<byte[]>>>();
- createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
- ownAllocator, createIfNotExists, promise);
- return promise;
- }
- }).thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<LogMetadataForWriter>>() {
- @Override
- public CompletableFuture<LogMetadataForWriter> apply(List<Versioned<byte[]>> metadatas) {
- try {
- return FutureUtils.value(
- processLogMetadatas(
- uri,
- logName,
- logIdentifier,
- metadatas,
- ownAllocator));
- } catch (UnexpectedException e) {
- return FutureUtils.exception(e);
- }
+ .thenCompose(metadatas -> {
+ CompletableFuture<List<Versioned<byte[]>>> promise =
+ new CompletableFuture<List<Versioned<byte[]>>>();
+ createMissingMetadata(
+ zk,
+ uri.getPath(),
+ logRootPath,
+ metadatas,
+ zooKeeperClient.getDefaultACL(),
+ ownAllocator,
+ createIfNotExists,
+ promise);
+ return promise;
+ }).thenCompose(metadatas -> {
+ try {
+ return FutureUtils.value(
+ processLogMetadatas(
+ uri,
+ logName,
+ logIdentifier,
+ metadatas,
+ ownAllocator));
+ } catch (UnexpectedException e) {
+ return FutureUtils.exception(e);
}
});
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
@@ -749,16 +786,22 @@
@VisibleForTesting
static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
+ ZooKeeper zk;
+ try {
+ zk = zkc.get();
+ } catch (ZooKeeperConnectionException | InterruptedException e) {
+ return FutureUtils.exception(e);
+ }
String basePath = uri.getPath();
String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
- LinkedList<String> missingPaths = Lists.newLinkedList();
+ return getMissingPaths(zk, basePath, logStreamPath);
+ }
+ @VisibleForTesting
+ static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) {
+ LinkedList<String> missingPaths = Lists.newLinkedList();
CompletableFuture<List<String>> future = FutureUtils.createFuture();
- try {
- existPath(zkc.get(), logStreamPath, basePath, missingPaths, future);
- } catch (ZooKeeperConnectionException | InterruptedException e) {
- future.completeExceptionally(e);
- }
+ existPath(zk, logStreamPath, basePath, missingPaths, future);
return future;
}