BOOKKEEPER-926: Compacted entries are not properly synced before updating index
…ting index
Author: Matteo Merli <mmerli@apache.org>
Reviewers: Guo Sijie <sijie@apache.org>
Closes #41 from merlimat/bk-926
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 2821ec8..e5ee8d7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -126,7 +126,7 @@
/**
* A scanner wrapper to check whether a ledger is alive in an entry log file
*/
- class CompactionScannerFactory implements EntryLogger.EntryLogListener {
+ class CompactionScannerFactory {
List<EntryLocation> offsets = new ArrayList<EntryLocation>();
EntryLogScanner newScanner(final EntryLogMetadata meta) {
@@ -141,66 +141,38 @@
}
@Override
- public void process(final long ledgerId, long offset, ByteBuffer entry)
- throws IOException {
+ public void process(final long ledgerId, long offset, ByteBuffer entry) throws IOException {
throttler.acquire(entry.remaining());
- synchronized (CompactionScannerFactory.this) {
- if (offsets.size() > maxOutstandingRequests) {
- waitEntrylogFlushed();
- }
- entry.getLong(); // discard ledger id, we already have it
- long entryId = entry.getLong();
- entry.rewind();
- long newoffset = entryLogger.addEntry(ledgerId, entry);
- offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
+ if (offsets.size() > maxOutstandingRequests) {
+ flush();
}
+ entry.getLong(); // discard ledger id, we already have it
+ long entryId = entry.getLong();
+ entry.rewind();
+
+ long newoffset = entryLogger.addEntry(ledgerId, entry);
+ offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
+
}
};
}
- final Object flushLock = new Object();
-
- @Override
- public void onRotateEntryLog() {
- synchronized (flushLock) {
- flushLock.notifyAll();
+ void flush() throws IOException {
+ if (offsets.isEmpty()) {
+ LOG.debug("Skipping entry log flushing, as there are no offset!");
+ return;
}
- }
- synchronized private void waitEntrylogFlushed() throws IOException {
+ // Before updating the index, we want to wait until all the compacted entries are flushed into the
+ // entryLog
try {
- if (offsets.size() <= 0) {
- LOG.debug("Skipping entry log flushing, as there is no offset!");
- return;
- }
+ entryLogger.flush();
- EntryLocation lastOffset = offsets.get(offsets.size()-1);
- long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
- while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) {
- synchronized (flushLock) {
- flushLock.wait(1000);
- }
-
- lastOffset = offsets.get(offsets.size()-1);
- lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
- }
- if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) {
- throw new IOException("Shutdown before flushed");
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted waiting for flush", ie);
+ ledgerStorage.updateEntriesLocations(offsets);
+ } finally {
+ offsets.clear();
}
-
- ledgerStorage.updateEntriesLocations(offsets);
- offsets.clear();
- }
-
- synchronized void flush() throws IOException {
- waitEntrylogFlushed();
-
- ledgerStorage.flushEntriesLocationsIndex();
}
}
@@ -227,7 +199,6 @@
this.compactionRateByEntries = conf.getCompactionRateByEntries();
this.compactionRateByBytes = conf.getCompactionRateByBytes();
this.scannerFactory = new CompactionScannerFactory();
- entryLogger.addListener(this.scannerFactory);
this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
@Override
@@ -456,7 +427,6 @@
List<EntryLogMetadata> logsToCompact = new ArrayList<EntryLogMetadata>();
logsToCompact.addAll(entryLogMetaMap.values());
Collections.sort(logsToCompact, sizeComparator);
- List<Long> toRemove = new ArrayList<Long>();
for (EntryLogMetadata meta : logsToCompact) {
if (meta.getUsage() >= threshold) {
@@ -464,11 +434,15 @@
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Compacting entry log {} below threshold {}.", meta.getEntryLogId(), threshold);
+ LOG.debug("Compacting entry log {} below threshold {}", meta.getEntryLogId(), threshold);
}
try {
compactEntryLog(scannerFactory, meta);
- toRemove.add(meta.getEntryLogId());
+ scannerFactory.flush();
+
+ LOG.info("Removing entry log {} after compaction", meta.getEntryLogId());
+ removeEntryLog(meta.getEntryLogId());
+
} catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
LOG.warn("No writable ledger directory available, aborting compaction", nwlde);
break;
@@ -483,18 +457,6 @@
return;
}
}
- try {
- // compaction finished, flush any outstanding offsets
- scannerFactory.flush();
- } catch (IOException ioe) {
- LOG.error("Cannot flush compacted entries, skip removal", ioe);
- return;
- }
-
- // offsets have been flushed, its now safe to remove the old entrylogs
- for (Long l : toRemove) {
- removeEntryLog(l);
- }
}
/**
@@ -545,7 +507,7 @@
return;
}
- LOG.info("Compacting entry log : {}", entryLogMeta.getEntryLogId());
+ LOG.info("Compacting entry log : {} - Usage: {} %", entryLogMeta.getEntryLogId(), entryLogMeta.getUsage());
try {
entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(),
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 4f3bb87..5d384ba 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -33,6 +33,8 @@
import java.util.Collection;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactionScannerFactory;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
@@ -49,7 +51,7 @@
import org.apache.bookkeeper.util.TestUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback;
-
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -114,6 +116,7 @@
baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE);
// Disable skip list for compaction
baseConf.setGcWaitTime(gcWaitTime);
+ baseConf.setFlushInterval(100);
baseConf.setMinorCompactionThreshold(minorCompactionThreshold);
baseConf.setMajorCompactionThreshold(majorCompactionThreshold);
baseConf.setMinorCompactionInterval(minorCompactionInterval);
@@ -631,4 +634,62 @@
storage.gcThread.resumeMinorGC();
storage.gcThread.resumeMajorGC();
}
+
+ @Test(timeout = 60000)
+ public void testCompactionWithEntryLogRollover() throws Exception {
+ // Disable bookie gc during this test
+ baseConf.setGcWaitTime(60000);
+ baseConf.setMinorCompactionInterval(0);
+ baseConf.setMajorCompactionInterval(0);
+ restartBookies();
+
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ // remove ledger2 and ledger3
+ bkc.deleteLedger(lhs[1].getId());
+ bkc.deleteLedger(lhs[2].getId());
+ LOG.info("Finished deleting the ledgers contains most entries.");
+
+ InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) bs.get(0).getBookie().ledgerStorage;
+ GarbageCollectorThread garbageCollectorThread = ledgerStorage.gcThread;
+ CompactionScannerFactory compactionScannerFactory = garbageCollectorThread.scannerFactory;
+ long entryLogId = 0;
+ EntryLogger entryLogger = ledgerStorage.entryLogger;
+
+ LOG.info("Before compaction -- Least unflushed log id: {}", entryLogger.getLeastUnflushedLogId());
+
+ // Compact entryLog 0
+ EntryLogScanner scanner = compactionScannerFactory.newScanner(entryLogger.getEntryLogMetadata(entryLogId));
+
+ entryLogger.scanEntryLog(entryLogId, scanner);
+
+ long entryLogIdAfterCompaction = entryLogger.getLeastUnflushedLogId();
+ LOG.info("After compaction -- Least unflushed log id: {}", entryLogIdAfterCompaction);
+
+ // Add more entries to trigger entrylog roll over
+ LedgerHandle[] lhs2 = prepareData(3, false);
+
+ for (LedgerHandle lh : lhs2) {
+ lh.close();
+ }
+
+ // Wait for entry logger to move forward
+ while (entryLogger.getLeastUnflushedLogId() <= entryLogIdAfterCompaction) {
+ Thread.sleep(100);
+ }
+
+ long entryLogIdBeforeFlushing = entryLogger.getLeastUnflushedLogId();
+ LOG.info("Added more data -- Least unflushed log id: {}", entryLogIdBeforeFlushing);
+
+ Assert.assertTrue(entryLogIdAfterCompaction < entryLogIdBeforeFlushing);
+
+ // Wait for entries to be flushed on entry logs and update index
+ // This operation should succeed even if the entry log rolls over after the last entry was compacted
+ compactionScannerFactory.flush();
+ }
}