DL-206: Delete the log should also delete the underline ledgers

Problem:
We're not deleting the ledgers when we delete the whole log stream using dlm.delete() API. This would cause a lot of garbage/orphan ledgers in Bookkeeper.

The fix is to delete the ledger when we delete the log stream. Also added a test to validate.

Author: Yiming Zang <yzang@twitter.com>

Reviewers: Sijie Guo <sijie@apache.org>

This closes #152 from yzang/yzang/DL-206
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
index cffe500..c837ad2 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -950,6 +950,10 @@
      */
     @Override
     public void delete() throws IOException {
+        // delete the actual log stream and log segments
+        BKLogWriteHandler ledgerHandler = createWriteHandler(true);
+        ledgerHandler.deleteLog();
+        // delete the log stream metadata
         Utils.ioResult(driver.getLogStreamMetadataStore(WRITER)
                 .deleteLog(uri, getStreamName()));
     }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index a310ea5..4c0019e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -362,6 +362,15 @@
     }
 
     /**
+     * Delete the whole log and all log segments under the log
+     */
+    void deleteLog() throws IOException {
+        lock.checkOwnershipAndReacquire();
+        Utils.ioResult(purgeLogSegmentsOlderThanTxnId(-1));
+        Utils.closeQuietly(lock);
+    }
+
+    /**
      * The caller could call this before any actions, which to hold the lock for
      * the write handler of its whole lifecycle. The lock will only be released
      * when closing the write handler.
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index 18e097f..d353894 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.distributedlog;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
@@ -29,6 +30,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.distributedlog.api.AsyncLogReader;
 import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -41,6 +43,7 @@
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.exceptions.LogReadException;
+import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
 import org.apache.distributedlog.io.Abortables;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
@@ -65,6 +68,7 @@
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.api.subscription.SubscriptionsStore;
 
+import static com.google.common.base.Charsets.UTF_8;
 import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 
@@ -1202,4 +1206,43 @@
 
         zookeeperClient.close();
     }
+
+    @Test(timeout = 60000)
+    public void testDeleteLog() throws Exception {
+        String name = "delete-log-should-delete-ledgers";
+        DistributedLogManager dlm = createNewDLM(conf, name);
+        long txid = 1;
+        // Create the log and write some records
+        BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
+        for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) {
+            writer.write(DLMTestUtil.getLogRecordInstance(txid++));
+        }
+        BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter();
+        writer.closeAndComplete();
+        BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
+        assertNotNull(zkc.exists(blplm.completedLedgerZNode(txid, txid - 1,
+            perStreamLogWriter.getLogSegmentSequenceNumber()), false));
+        Utils.ioResult(blplm.asyncClose());
+
+        // Should be able to open the underline ledger using BK client
+        long ledgerId = perStreamLogWriter.getLogSegmentId();
+        BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver();
+        driver.getReaderBKC().get().openLedgerNoRecovery(ledgerId,
+            BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
+        // Delete the log and we shouldn't be able the open the ledger
+        dlm.delete();
+        try {
+            driver.getReaderBKC().get().openLedgerNoRecovery(ledgerId,
+                BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
+            fail("Should fail to open ledger after we delete the log");
+        } catch (BKException.BKNoSuchLedgerExistsException e) {
+            // ignore
+        }
+        // delete again should not throw any exception
+        try {
+            dlm.delete();
+        } catch (IOException ioe) {
+            fail("Delete log twice should not throw any exception");
+        }
+    }
 }