ISSUE #211: Support listing logs by prefix

Descriptions of the changes in this PR:

- extend `getLogs` to `getLogs(prefix)`, so it provides a filesystem `listFiles`-like semantic.

Author: Sijie Guo <sijie@apache.org>

Reviewers: Jia Zhai <None>

This closes #212 from sijie/5_support_list_logs_by_prefix_pr, closes #211
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index b756feb..cd5a17a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -214,7 +214,14 @@
     @Override
     public Iterator<String> getLogs() throws IOException {
         checkState();
-        return Utils.ioResult(driver.getLogMetadataStore().getLogs());
+        return Utils.ioResult(driver.getLogMetadataStore().getLogs(""));
+    }
+
+    @Override
+    public Iterator<String> getLogs(String logNamePrefix) throws IOException {
+        checkState();
+        logNamePrefix = validateAndNormalizeName(logNamePrefix);
+        return Utils.ioResult(driver.getLogMetadataStore().getLogs(logNamePrefix));
     }
 
     @Override
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
index 8766f17..dafc099 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
@@ -163,6 +163,16 @@
     Iterator<String> getLogs()
             throws IOException;
 
+    /**
+     * Retrieve the logs under a given <i>logNamePrefix</i>.
+     *
+     * @param logNamePrefix log name prefix
+     * @return iterator of the logs under the log name prefix
+     * @throws IOException when encountered issues with backend.
+     */
+    Iterator<String> getLogs(String logNamePrefix)
+            throws IOException;
+
     //
     // Methods for namespace
     //
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
index 43fed26..32e7ab7 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang.StringUtils;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.callback.NamespaceListener;
@@ -70,9 +71,14 @@
     }
 
     @Override
-    public CompletableFuture<Iterator<String>> getLogs() {
+    public CompletableFuture<Iterator<String>> getLogs(String logNamePrefix) {
         final CompletableFuture<Iterator<String>> promise = new CompletableFuture<Iterator<String>>();
-        final String nsRootPath = namespace.getPath();
+        final String nsRootPath;
+        if (StringUtils.isEmpty(logNamePrefix)) {
+            nsRootPath = namespace.getPath();
+        } else {
+            nsRootPath = namespace.getPath() + "/" + logNamePrefix;
+        }
         try {
             final ZooKeeper zk = zkc.get();
             zk.sync(nsRootPath, new AsyncCallback.VoidCallback() {
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
index 5187dfc..84f5ac7 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
@@ -715,7 +715,12 @@
     }
 
     @Override
-    public CompletableFuture<Iterator<String>> getLogs() {
+    public CompletableFuture<Iterator<String>> getLogs(String logNamePrefix) {
+        if (!"".equals(logNamePrefix)) {
+            return FutureUtils.exception(
+                new UnexpectedException("Get logs by prefix is not supported by federated metadata store"));
+        }
+
         if (duplicatedLogFound.get()) {
             return duplicatedLogException(duplicatedLogName.get());
         }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
index 8135678..62b75a4 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
@@ -51,9 +51,11 @@
     /**
      * Retrieves logs from the namespace.
      *
+     * @param logNamePrefix
+     *          log name prefix.
      * @return iterator of logs of the namespace.
      */
-    CompletableFuture<Iterator<String>> getLogs();
+    CompletableFuture<Iterator<String>> getLogs(String logNamePrefix);
 
     /**
      * Register a namespace listener on streams changes.
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
index 93e2802..9eac193 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog.impl;
 
 import static org.junit.Assert.*;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import java.net.URI;
@@ -36,8 +37,6 @@
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-
-
 /**
  * Test ZK based metadata store.
  */
@@ -106,7 +105,20 @@
             logs.add(logName);
             createLogInNamespace(uri, logName);
         }
-        Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs()));
+        Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs("")));
+        assertEquals(10, result.size());
+        assertTrue(Sets.difference(logs, result).isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testGetLogsPrefix() throws Exception {
+        Set<String> logs = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            String logName = "test-" + i;
+            logs.add(logName);
+            createLogInNamespace(uri, "test/" + logName);
+        }
+        Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs("test")));
         assertEquals(10, result.size());
         assertTrue(Sets.difference(logs, result).isEmpty());
     }
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
index 190c9d9..2faf131 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
@@ -177,7 +177,7 @@
         assertEquals(logName, logsIter.next());
         assertFalse(logsIter.hasNext());
         // get logs should return the log
-        Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs());
+        Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs(""));
         assertTrue(newLogsIter.hasNext());
         assertEquals(logName, newLogsIter.next());
         assertFalse(newLogsIter.hasNext());
@@ -274,7 +274,7 @@
             assertTrue(metadataStore.duplicatedLogFound.get());
         }
         try {
-            Utils.ioResult(metadataStore.getLogs());
+            Utils.ioResult(metadataStore.getLogs(""));
             fail("should throw exception when duplicated log found");
         } catch (UnexpectedException ue) {
             // should throw unexpected exception
@@ -338,7 +338,7 @@
         do {
             TimeUnit.MILLISECONDS.sleep(20);
             receivedLogs = new TreeSet<String>();
-            Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
+            Iterator<String> logs = Utils.ioResult(metadataStore.getLogs(""));
             receivedLogs.addAll(Lists.newArrayList(logs));
         } while (receivedLogs.size() < numLogs);
         assertEquals(numLogs, receivedLogs.size());
@@ -387,7 +387,7 @@
         do {
             TimeUnit.MILLISECONDS.sleep(20);
             receivedLogs = new TreeSet<String>();
-            Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
+            Iterator<String> logs = Utils.ioResult(metadataStore.getLogs(""));
             receivedLogs.addAll(Lists.newArrayList(logs));
         } while (receivedLogs.size() < 3 * maxLogsPerSubnamespace - 1);