DL-116: Add tool for deleting subscriber from subscription store

Test Plan:

1. manually create znode for subscribers resume point
2. use the tool to delete the subscriberId
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java
index 9905cea..27d5c1d 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java
@@ -56,4 +56,14 @@
      */
     public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition);
 
+    /**
+     * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the
+     * data stored under this subscriber will be lost.
+     * @param subscriberId subscriber id
+     * @return future represent success or failure.
+     * return true only if there's such subscriber and we removed it successfully.
+     * return false if there's no such subscriber, or we failed to remove.
+     */
+    public Future<Boolean> deleteSubscriber(String subscriberId);
+
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
index fb154c1..f1e6251 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
@@ -20,9 +20,12 @@
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.Promise;
+
+import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
@@ -59,7 +62,7 @@
         ZKSubscriptionStateStore ss = subscribers.get(subscriberId);
         if (ss == null) {
             ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc,
-                    String.format("%s/%s", zkPath, subscriberId));
+                getSubscriberZKPath(subscriberId));
             ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS);
             if (oldSS == null) {
                 ss = newSS;
@@ -75,6 +78,10 @@
         return ss;
     }
 
+    private String getSubscriberZKPath(String subscriberId) {
+        return String.format("%s/%s", zkPath, subscriberId);
+    }
+
     @Override
     public Future<DLSN> getLastCommitPosition(String subscriberId) {
         return getSubscriber(subscriberId).getLastCommitPosition();
@@ -141,6 +148,13 @@
     }
 
     @Override
+    public Future<Boolean> deleteSubscriber(String subscriberId) {
+        subscribers.remove(subscriberId);
+        String path = getSubscriberZKPath(subscriberId);
+        return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
+    }
+
+    @Override
     public void close() throws IOException {
         // no-op
         for (SubscriptionStateStore store : subscribers.values()) {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index 0862d54..bed2fcd 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -105,6 +105,8 @@
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.util.SchedulerUtils;
 import com.twitter.util.Await;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -802,7 +804,7 @@
                 return 0;
             }
             numThreads = Math.min(streams.size(), numThreads);
-            final int numStreamsPerThreads = streams.size() / numThreads;
+            final int numStreamsPerThreads = streams.size() / numThreads + 1;
             Thread[] threads = new Thread[numThreads];
             for (int i = 0; i < numThreads; i++) {
                 final int tid = i;
@@ -2723,6 +2725,120 @@
         }
     }
 
+    public static class DeleteSubscriberCommand extends PerDLCommand {
+
+        int numThreads = 1;
+        String streamPrefix = null;
+        String subscriberId = null;
+        AtomicInteger streamIndex = new AtomicInteger();
+
+        DeleteSubscriberCommand() {
+            super("delete_subscriber", "Delete the subscriber in subscription store. ");
+            options.addOption("s", "subscriberId", true, "SubscriberId to remove from the stream");
+            options.addOption("t", "threads", true, "Number of threads");
+            options.addOption("ft", "filter", true, "Stream filter by prefix");
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            if (!cmdline.hasOption("s")) {
+                throw new ParseException("No subscriberId provided.");
+            } else {
+                subscriberId = cmdline.getOptionValue("s");
+            }
+            if (cmdline.hasOption("t")) {
+                numThreads = Integer.parseInt(cmdline.getOptionValue("t"));
+            }
+            if (cmdline.hasOption("ft")) {
+                streamPrefix = cmdline.getOptionValue("ft");
+            }
+        }
+
+        @Override
+        protected String getUsage() {
+            return "delete_subscriber [options]";
+        }
+
+        @Override
+        protected int runCmd() throws Exception {
+            getConf().setZkAclId(getZkAclId());
+            return deleteSubscriber(getFactory());
+        }
+
+        private int deleteSubscriber(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception {
+            Collection<String> streamCollection = factory.enumerateAllLogsInNamespace();
+            final List<String> streams = new ArrayList<String>();
+            if (null != streamPrefix) {
+                for (String s : streamCollection) {
+                    if (s.startsWith(streamPrefix)) {
+                        streams.add(s);
+                    }
+                }
+            } else {
+                streams.addAll(streamCollection);
+            }
+            if (0 == streams.size()) {
+                return 0;
+            }
+            System.out.println("Streams : " + streams);
+            if (!getForce() && !IOUtils.confirmPrompt("Do you want to delete subscriber "
+                + subscriberId + " for " + streams.size() + " streams ?")) {
+                return 0;
+            }
+            numThreads = Math.min(streams.size(), numThreads);
+            final int numStreamsPerThreads = streams.size() / numThreads + 1;
+            Thread[] threads = new Thread[numThreads];
+            for (int i = 0; i < numThreads; i++) {
+                final int tid = i;
+                threads[i] = new Thread("RemoveSubscriberThread-" + i) {
+                    @Override
+                    public void run() {
+                        try {
+                            deleteSubscriber(factory, streams, tid, numStreamsPerThreads);
+                            System.out.println("Thread " + tid + " finished.");
+                        } catch (Exception e) {
+                            System.err.println("Thread " + tid + " quits with exception : " + e.getMessage());
+                        }
+                    }
+                };
+                threads[i].start();
+            }
+            for (int i = 0; i < numThreads; i++) {
+                threads[i].join();
+            }
+            return 0;
+        }
+
+        private void deleteSubscriber(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams,
+                                      int tid, int numStreamsPerThreads) throws Exception {
+            int startIdx = tid * numStreamsPerThreads;
+            int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads);
+            for (int i = startIdx; i < endIdx; i++) {
+                final String s = streams.get(i);
+                DistributedLogManager dlm =
+                    factory.createDistributedLogManagerWithSharedClients(s);
+                final CountDownLatch countDownLatch = new CountDownLatch(1);
+                dlm.getSubscriptionsStore().deleteSubscriber(subscriberId)
+                    .addEventListener(new FutureEventListener<Boolean>() {
+                        @Override
+                        public void onFailure(Throwable cause) {
+                            System.out.println("Failed to delete subscriber for stream " + s);
+                            cause.printStackTrace();
+                            countDownLatch.countDown();
+                        }
+
+                        @Override
+                        public void onSuccess(Boolean value) {
+                            countDownLatch.countDown();
+                        }
+                    });
+                countDownLatch.await();
+                dlm.close();
+            }
+        }
+    }
+
     public DistributedLogTool() {
         super();
         addCommand(new AuditBKSpaceCommand());
@@ -2748,6 +2864,7 @@
         addCommand(new DeserializeDLSNCommand());
         addCommand(new SerializeDLSNCommand());
         addCommand(new WatchNamespaceCommand());
+        addCommand(new DeleteSubscriberCommand());
     }
 
     @Override
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
index 0731117..fce9bcd 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
@@ -432,6 +432,44 @@
         return promise;
     }
 
+    /**
+     * Delete the given <i>path</i> from zookeeper.
+     *
+     * @param zkc
+     *          zookeeper client
+     * @param path
+     *          path to delete
+     * @param version
+     *          version used to set data
+     * @return future representing if the delete is successful. Return true if the node is deleted,
+     * false if the node doesn't exist, otherwise future will throw exception
+     *
+     */
+    public static Future<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) {
+        ZooKeeper zk;
+        try {
+            zk = zkc.get();
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            return Future.exception(FutureUtils.zkException(e, path));
+        } catch (InterruptedException e) {
+            return Future.exception(FutureUtils.zkException(e, path));
+        }
+        final Promise<Boolean> promise = new Promise<Boolean>();
+        zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                if (KeeperException.Code.OK.intValue() == rc ) {
+                    promise.setValue(true);
+                } else if (KeeperException.Code.NONODE.intValue() == rc) {
+                    promise.setValue(false);
+                } else {
+                    promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                }
+            }
+        }, null);
+        return promise;
+    }
+
     public static Future<Void> asyncClose(@Nullable AsyncCloseable closeable,
                                           boolean swallowIOException) {
         if (null == closeable) {