DL-116: Add tool for deleting subscriber from subscription store
Test Plan:
1. manually create znode for subscribers resume point
2. use the tool to delete the subscriberId
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java
index 9905cea..27d5c1d 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java
@@ -56,4 +56,14 @@
*/
public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition);
+ /**
+ * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the
+ * data stored under this subscriber will be lost.
+ * @param subscriberId subscriber id
+ * @return future represent success or failure.
+ * return true only if there's such subscriber and we removed it successfully.
+ * return false if there's no such subscriber, or we failed to remove.
+ */
+ public Future<Boolean> deleteSubscriber(String subscriberId);
+
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
index fb154c1..f1e6251 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java
@@ -20,9 +20,12 @@
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.Promise;
+
+import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
@@ -59,7 +62,7 @@
ZKSubscriptionStateStore ss = subscribers.get(subscriberId);
if (ss == null) {
ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc,
- String.format("%s/%s", zkPath, subscriberId));
+ getSubscriberZKPath(subscriberId));
ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS);
if (oldSS == null) {
ss = newSS;
@@ -75,6 +78,10 @@
return ss;
}
+ private String getSubscriberZKPath(String subscriberId) {
+ return String.format("%s/%s", zkPath, subscriberId);
+ }
+
@Override
public Future<DLSN> getLastCommitPosition(String subscriberId) {
return getSubscriber(subscriberId).getLastCommitPosition();
@@ -141,6 +148,13 @@
}
@Override
+ public Future<Boolean> deleteSubscriber(String subscriberId) {
+ subscribers.remove(subscriberId);
+ String path = getSubscriberZKPath(subscriberId);
+ return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
+ }
+
+ @Override
public void close() throws IOException {
// no-op
for (SubscriptionStateStore store : subscribers.values()) {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index 0862d54..bed2fcd 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -105,6 +105,8 @@
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.util.Await;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
import static com.google.common.base.Charsets.UTF_8;
@@ -802,7 +804,7 @@
return 0;
}
numThreads = Math.min(streams.size(), numThreads);
- final int numStreamsPerThreads = streams.size() / numThreads;
+ final int numStreamsPerThreads = streams.size() / numThreads + 1;
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
final int tid = i;
@@ -2723,6 +2725,120 @@
}
}
+ public static class DeleteSubscriberCommand extends PerDLCommand {
+
+ int numThreads = 1;
+ String streamPrefix = null;
+ String subscriberId = null;
+ AtomicInteger streamIndex = new AtomicInteger();
+
+ DeleteSubscriberCommand() {
+ super("delete_subscriber", "Delete the subscriber in subscription store. ");
+ options.addOption("s", "subscriberId", true, "SubscriberId to remove from the stream");
+ options.addOption("t", "threads", true, "Number of threads");
+ options.addOption("ft", "filter", true, "Stream filter by prefix");
+ }
+
+ @Override
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ super.parseCommandLine(cmdline);
+ if (!cmdline.hasOption("s")) {
+ throw new ParseException("No subscriberId provided.");
+ } else {
+ subscriberId = cmdline.getOptionValue("s");
+ }
+ if (cmdline.hasOption("t")) {
+ numThreads = Integer.parseInt(cmdline.getOptionValue("t"));
+ }
+ if (cmdline.hasOption("ft")) {
+ streamPrefix = cmdline.getOptionValue("ft");
+ }
+ }
+
+ @Override
+ protected String getUsage() {
+ return "delete_subscriber [options]";
+ }
+
+ @Override
+ protected int runCmd() throws Exception {
+ getConf().setZkAclId(getZkAclId());
+ return deleteSubscriber(getFactory());
+ }
+
+ private int deleteSubscriber(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception {
+ Collection<String> streamCollection = factory.enumerateAllLogsInNamespace();
+ final List<String> streams = new ArrayList<String>();
+ if (null != streamPrefix) {
+ for (String s : streamCollection) {
+ if (s.startsWith(streamPrefix)) {
+ streams.add(s);
+ }
+ }
+ } else {
+ streams.addAll(streamCollection);
+ }
+ if (0 == streams.size()) {
+ return 0;
+ }
+ System.out.println("Streams : " + streams);
+ if (!getForce() && !IOUtils.confirmPrompt("Do you want to delete subscriber "
+ + subscriberId + " for " + streams.size() + " streams ?")) {
+ return 0;
+ }
+ numThreads = Math.min(streams.size(), numThreads);
+ final int numStreamsPerThreads = streams.size() / numThreads + 1;
+ Thread[] threads = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ final int tid = i;
+ threads[i] = new Thread("RemoveSubscriberThread-" + i) {
+ @Override
+ public void run() {
+ try {
+ deleteSubscriber(factory, streams, tid, numStreamsPerThreads);
+ System.out.println("Thread " + tid + " finished.");
+ } catch (Exception e) {
+ System.err.println("Thread " + tid + " quits with exception : " + e.getMessage());
+ }
+ }
+ };
+ threads[i].start();
+ }
+ for (int i = 0; i < numThreads; i++) {
+ threads[i].join();
+ }
+ return 0;
+ }
+
+ private void deleteSubscriber(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams,
+ int tid, int numStreamsPerThreads) throws Exception {
+ int startIdx = tid * numStreamsPerThreads;
+ int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads);
+ for (int i = startIdx; i < endIdx; i++) {
+ final String s = streams.get(i);
+ DistributedLogManager dlm =
+ factory.createDistributedLogManagerWithSharedClients(s);
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ dlm.getSubscriptionsStore().deleteSubscriber(subscriberId)
+ .addEventListener(new FutureEventListener<Boolean>() {
+ @Override
+ public void onFailure(Throwable cause) {
+ System.out.println("Failed to delete subscriber for stream " + s);
+ cause.printStackTrace();
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onSuccess(Boolean value) {
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await();
+ dlm.close();
+ }
+ }
+ }
+
public DistributedLogTool() {
super();
addCommand(new AuditBKSpaceCommand());
@@ -2748,6 +2864,7 @@
addCommand(new DeserializeDLSNCommand());
addCommand(new SerializeDLSNCommand());
addCommand(new WatchNamespaceCommand());
+ addCommand(new DeleteSubscriberCommand());
}
@Override
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
index 0731117..fce9bcd 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
@@ -432,6 +432,44 @@
return promise;
}
+ /**
+ * Delete the given <i>path</i> from zookeeper.
+ *
+ * @param zkc
+ * zookeeper client
+ * @param path
+ * path to delete
+ * @param version
+ * version used to set data
+ * @return future representing if the delete is successful. Return true if the node is deleted,
+ * false if the node doesn't exist, otherwise future will throw exception
+ *
+ */
+ public static Future<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) {
+ ZooKeeper zk;
+ try {
+ zk = zkc.get();
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ return Future.exception(FutureUtils.zkException(e, path));
+ } catch (InterruptedException e) {
+ return Future.exception(FutureUtils.zkException(e, path));
+ }
+ final Promise<Boolean> promise = new Promise<Boolean>();
+ zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (KeeperException.Code.OK.intValue() == rc ) {
+ promise.setValue(true);
+ } else if (KeeperException.Code.NONODE.intValue() == rc) {
+ promise.setValue(false);
+ } else {
+ promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+ }
+ }
+ }, null);
+ return promise;
+ }
+
public static Future<Void> asyncClose(@Nullable AsyncCloseable closeable,
boolean swallowIOException) {
if (null == closeable) {