ISSUE #211: Support listing logs by prefix
Descriptions of the changes in this PR:
- extend `getLogs` to `getLogs(prefix)`, so it provides a filesystem `listFiles`-like semantic.
Author: Sijie Guo <sijie@apache.org>
Reviewers: Jia Zhai <None>
This closes #212 from sijie/5_support_list_logs_by_prefix_pr, closes #211
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index b756feb..cd5a17a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -214,7 +214,14 @@
@Override
public Iterator<String> getLogs() throws IOException {
checkState();
- return Utils.ioResult(driver.getLogMetadataStore().getLogs());
+ return Utils.ioResult(driver.getLogMetadataStore().getLogs(""));
+ }
+
+ @Override
+ public Iterator<String> getLogs(String logNamePrefix) throws IOException {
+ checkState();
+ logNamePrefix = validateAndNormalizeName(logNamePrefix);
+ return Utils.ioResult(driver.getLogMetadataStore().getLogs(logNamePrefix));
}
@Override
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
index 8766f17..dafc099 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
@@ -163,6 +163,16 @@
Iterator<String> getLogs()
throws IOException;
+ /**
+ * Retrieve the logs under a given <i>logNamePrefix</i>.
+ *
+ * @param logNamePrefix log name prefix
+ * @return iterator of the logs under the log name prefix
+ * @throws IOException when encountered issues with backend.
+ */
+ Iterator<String> getLogs(String logNamePrefix)
+ throws IOException;
+
//
// Methods for namespace
//
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
index 43fed26..32e7ab7 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
@@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.callback.NamespaceListener;
@@ -70,9 +71,14 @@
}
@Override
- public CompletableFuture<Iterator<String>> getLogs() {
+ public CompletableFuture<Iterator<String>> getLogs(String logNamePrefix) {
final CompletableFuture<Iterator<String>> promise = new CompletableFuture<Iterator<String>>();
- final String nsRootPath = namespace.getPath();
+ final String nsRootPath;
+ if (StringUtils.isEmpty(logNamePrefix)) {
+ nsRootPath = namespace.getPath();
+ } else {
+ nsRootPath = namespace.getPath() + "/" + logNamePrefix;
+ }
try {
final ZooKeeper zk = zkc.get();
zk.sync(nsRootPath, new AsyncCallback.VoidCallback() {
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
index 5187dfc..84f5ac7 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
@@ -715,7 +715,12 @@
}
@Override
- public CompletableFuture<Iterator<String>> getLogs() {
+ public CompletableFuture<Iterator<String>> getLogs(String logNamePrefix) {
+ if (!"".equals(logNamePrefix)) {
+ return FutureUtils.exception(
+ new UnexpectedException("Get logs by prefix is not supported by federated metadata store"));
+ }
+
if (duplicatedLogFound.get()) {
return duplicatedLogException(duplicatedLogName.get());
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
index 8135678..62b75a4 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
@@ -51,9 +51,11 @@
/**
* Retrieves logs from the namespace.
*
+ * @param logNamePrefix
+ * log name prefix.
* @return iterator of logs of the namespace.
*/
- CompletableFuture<Iterator<String>> getLogs();
+ CompletableFuture<Iterator<String>> getLogs(String logNamePrefix);
/**
* Register a namespace listener on streams changes.
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
index 93e2802..9eac193 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog.impl;
import static org.junit.Assert.*;
+
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import java.net.URI;
@@ -36,8 +37,6 @@
import org.junit.Test;
import org.junit.rules.TestName;
-
-
/**
* Test ZK based metadata store.
*/
@@ -106,7 +105,20 @@
logs.add(logName);
createLogInNamespace(uri, logName);
}
- Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs()));
+ Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs("")));
+ assertEquals(10, result.size());
+ assertTrue(Sets.difference(logs, result).isEmpty());
+ }
+
+ @Test(timeout = 60000)
+ public void testGetLogsPrefix() throws Exception {
+ Set<String> logs = Sets.newHashSet();
+ for (int i = 0; i < 10; i++) {
+ String logName = "test-" + i;
+ logs.add(logName);
+ createLogInNamespace(uri, "test/" + logName);
+ }
+ Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs("test")));
assertEquals(10, result.size());
assertTrue(Sets.difference(logs, result).isEmpty());
}
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
index 190c9d9..2faf131 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
@@ -177,7 +177,7 @@
assertEquals(logName, logsIter.next());
assertFalse(logsIter.hasNext());
// get logs should return the log
- Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs());
+ Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs(""));
assertTrue(newLogsIter.hasNext());
assertEquals(logName, newLogsIter.next());
assertFalse(newLogsIter.hasNext());
@@ -274,7 +274,7 @@
assertTrue(metadataStore.duplicatedLogFound.get());
}
try {
- Utils.ioResult(metadataStore.getLogs());
+ Utils.ioResult(metadataStore.getLogs(""));
fail("should throw exception when duplicated log found");
} catch (UnexpectedException ue) {
// should throw unexpected exception
@@ -338,7 +338,7 @@
do {
TimeUnit.MILLISECONDS.sleep(20);
receivedLogs = new TreeSet<String>();
- Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
+ Iterator<String> logs = Utils.ioResult(metadataStore.getLogs(""));
receivedLogs.addAll(Lists.newArrayList(logs));
} while (receivedLogs.size() < numLogs);
assertEquals(numLogs, receivedLogs.size());
@@ -387,7 +387,7 @@
do {
TimeUnit.MILLISECONDS.sleep(20);
receivedLogs = new TreeSet<String>();
- Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
+ Iterator<String> logs = Utils.ioResult(metadataStore.getLogs(""));
receivedLogs.addAll(Lists.newArrayList(logs));
} while (receivedLogs.size() < 3 * maxLogsPerSubnamespace - 1);