DL-206: Delete the log should also delete the underline ledgers
Problem:
We're not deleting the ledgers when we delete the whole log stream using dlm.delete() API. This would cause a lot of garbage/orphan ledgers in Bookkeeper.
The fix is to delete the ledger when we delete the log stream. Also added a test to validate.
Author: Yiming Zang <yzang@twitter.com>
Reviewers: Sijie Guo <sijie@apache.org>
This closes #152 from yzang/yzang/DL-206
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
index cffe500..c837ad2 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -950,6 +950,10 @@
*/
@Override
public void delete() throws IOException {
+ // delete the actual log stream and log segments
+ BKLogWriteHandler ledgerHandler = createWriteHandler(true);
+ ledgerHandler.deleteLog();
+ // delete the log stream metadata
Utils.ioResult(driver.getLogStreamMetadataStore(WRITER)
.deleteLog(uri, getStreamName()));
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index a310ea5..4c0019e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -362,6 +362,15 @@
}
/**
+ * Delete the whole log and all log segments under the log
+ */
+ void deleteLog() throws IOException {
+ lock.checkOwnershipAndReacquire();
+ Utils.ioResult(purgeLogSegmentsOlderThanTxnId(-1));
+ Utils.closeQuietly(lock);
+ }
+
+ /**
* The caller could call this before any actions, which to hold the lock for
* the write handler of its whole lifecycle. The lock will only be released
* when closing the write handler.
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index 18e097f..d353894 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.distributedlog;
+import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
@@ -29,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
@@ -41,6 +43,7 @@
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.LogReadException;
+import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
import org.apache.distributedlog.io.Abortables;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
@@ -65,6 +68,7 @@
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
@@ -1202,4 +1206,43 @@
zookeeperClient.close();
}
+
+ @Test(timeout = 60000)
+ public void testDeleteLog() throws Exception {
+ String name = "delete-log-should-delete-ledgers";
+ DistributedLogManager dlm = createNewDLM(conf, name);
+ long txid = 1;
+ // Create the log and write some records
+ BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
+ for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) {
+ writer.write(DLMTestUtil.getLogRecordInstance(txid++));
+ }
+ BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter();
+ writer.closeAndComplete();
+ BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
+ assertNotNull(zkc.exists(blplm.completedLedgerZNode(txid, txid - 1,
+ perStreamLogWriter.getLogSegmentSequenceNumber()), false));
+ Utils.ioResult(blplm.asyncClose());
+
+ // Should be able to open the underline ledger using BK client
+ long ledgerId = perStreamLogWriter.getLogSegmentId();
+ BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver();
+ driver.getReaderBKC().get().openLedgerNoRecovery(ledgerId,
+ BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
+ // Delete the log and we shouldn't be able the open the ledger
+ dlm.delete();
+ try {
+ driver.getReaderBKC().get().openLedgerNoRecovery(ledgerId,
+ BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
+ fail("Should fail to open ledger after we delete the log");
+ } catch (BKException.BKNoSuchLedgerExistsException e) {
+ // ignore
+ }
+ // delete again should not throw any exception
+ try {
+ dlm.delete();
+ } catch (IOException ioe) {
+ fail("Delete log twice should not throw any exception");
+ }
+ }
}