BOOKKEEPER-926: Compacted entries are not properly synced before updating index

…ting index

Author: Matteo Merli <mmerli@apache.org>

Reviewers: Guo Sijie <sijie@apache.org>

Closes #41 from merlimat/bk-926
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 2821ec8..e5ee8d7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -126,7 +126,7 @@
     /**
      * A scanner wrapper to check whether a ledger is alive in an entry log file
      */
-    class CompactionScannerFactory implements EntryLogger.EntryLogListener {
+    class CompactionScannerFactory {
         List<EntryLocation> offsets = new ArrayList<EntryLocation>();
 
         EntryLogScanner newScanner(final EntryLogMetadata meta) {
@@ -141,66 +141,38 @@
                 }
 
                 @Override
-                public void process(final long ledgerId, long offset, ByteBuffer entry)
-                        throws IOException {
+                public void process(final long ledgerId, long offset, ByteBuffer entry) throws IOException {
                     throttler.acquire(entry.remaining());
-                    synchronized (CompactionScannerFactory.this) {
-                        if (offsets.size() > maxOutstandingRequests) {
-                            waitEntrylogFlushed();
-                        }
-                        entry.getLong(); // discard ledger id, we already have it
-                        long entryId = entry.getLong();
-                        entry.rewind();
 
-                        long newoffset = entryLogger.addEntry(ledgerId, entry);
-                        offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
+                    if (offsets.size() > maxOutstandingRequests) {
+                        flush();
                     }
+                    entry.getLong(); // discard ledger id, we already have it
+                    long entryId = entry.getLong();
+                    entry.rewind();
+
+                    long newoffset = entryLogger.addEntry(ledgerId, entry);
+                    offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
+
                 }
             };
         }
 
-        final Object flushLock = new Object();
-
-        @Override
-        public void onRotateEntryLog() {
-            synchronized (flushLock) {
-                flushLock.notifyAll();
+        void flush() throws IOException {
+            if (offsets.isEmpty()) {
+                LOG.debug("Skipping entry log flushing, as there are no offset!");
+                return;
             }
-        }
 
-        synchronized private void waitEntrylogFlushed() throws IOException {
+            // Before updating the index, we want to wait until all the compacted entries are flushed into the
+            // entryLog
             try {
-                if (offsets.size() <= 0) {
-                    LOG.debug("Skipping entry log flushing, as there is no offset!");
-                    return;
-                }
+                entryLogger.flush();
 
-                EntryLocation lastOffset = offsets.get(offsets.size()-1);
-                long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
-                while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) {
-                    synchronized (flushLock) {
-                        flushLock.wait(1000);
-                    }
-
-                    lastOffset = offsets.get(offsets.size()-1);
-                    lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
-                }
-                if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) {
-                    throw new IOException("Shutdown before flushed");
-                }
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                throw new IOException("Interrupted waiting for flush", ie);
+                ledgerStorage.updateEntriesLocations(offsets);
+            } finally {
+                offsets.clear();
             }
-
-            ledgerStorage.updateEntriesLocations(offsets);
-            offsets.clear();
-        }
-
-        synchronized void flush() throws IOException {
-            waitEntrylogFlushed();
-
-            ledgerStorage.flushEntriesLocationsIndex();
         }
     }
 
@@ -227,7 +199,6 @@
         this.compactionRateByEntries  = conf.getCompactionRateByEntries();
         this.compactionRateByBytes = conf.getCompactionRateByBytes();
         this.scannerFactory = new CompactionScannerFactory();
-        entryLogger.addListener(this.scannerFactory);
 
         this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
             @Override
@@ -456,7 +427,6 @@
         List<EntryLogMetadata> logsToCompact = new ArrayList<EntryLogMetadata>();
         logsToCompact.addAll(entryLogMetaMap.values());
         Collections.sort(logsToCompact, sizeComparator);
-        List<Long> toRemove = new ArrayList<Long>();
 
         for (EntryLogMetadata meta : logsToCompact) {
             if (meta.getUsage() >= threshold) {
@@ -464,11 +434,15 @@
             }
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Compacting entry log {} below threshold {}.", meta.getEntryLogId(), threshold);
+                LOG.debug("Compacting entry log {} below threshold {}", meta.getEntryLogId(), threshold);
             }
             try {
                 compactEntryLog(scannerFactory, meta);
-                toRemove.add(meta.getEntryLogId());
+                scannerFactory.flush();
+
+                LOG.info("Removing entry log {} after compaction", meta.getEntryLogId());
+                removeEntryLog(meta.getEntryLogId());
+
             } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
                 LOG.warn("No writable ledger directory available, aborting compaction", nwlde);
                 break;
@@ -483,18 +457,6 @@
                 return;
             }
         }
-        try {
-            // compaction finished, flush any outstanding offsets
-            scannerFactory.flush();
-        } catch (IOException ioe) {
-            LOG.error("Cannot flush compacted entries, skip removal", ioe);
-            return;
-        }
-
-        // offsets have been flushed, its now safe to remove the old entrylogs
-        for (Long l : toRemove) {
-            removeEntryLog(l);
-        }
     }
 
     /**
@@ -545,7 +507,7 @@
             return;
         }
 
-        LOG.info("Compacting entry log : {}", entryLogMeta.getEntryLogId());
+        LOG.info("Compacting entry log : {} - Usage: {} %", entryLogMeta.getEntryLogId(), entryLogMeta.getUsage());
 
         try {
             entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(),
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 4f3bb87..5d384ba 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -33,6 +33,8 @@
 import java.util.Collection;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactionScannerFactory;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
@@ -49,7 +51,7 @@
 import org.apache.bookkeeper.util.TestUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback;
-
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -114,6 +116,7 @@
         baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE);
         // Disable skip list for compaction
         baseConf.setGcWaitTime(gcWaitTime);
+        baseConf.setFlushInterval(100);
         baseConf.setMinorCompactionThreshold(minorCompactionThreshold);
         baseConf.setMajorCompactionThreshold(majorCompactionThreshold);
         baseConf.setMinorCompactionInterval(minorCompactionInterval);
@@ -631,4 +634,62 @@
         storage.gcThread.resumeMinorGC();
         storage.gcThread.resumeMajorGC();
     }
+
+    @Test(timeout = 60000)
+    public void testCompactionWithEntryLogRollover() throws Exception {
+        // Disable bookie gc during this test
+        baseConf.setGcWaitTime(60000);
+        baseConf.setMinorCompactionInterval(0);
+        baseConf.setMajorCompactionInterval(0);
+        restartBookies();
+
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+        // remove ledger2 and ledger3
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+        LOG.info("Finished deleting the ledgers contains most entries.");
+
+        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) bs.get(0).getBookie().ledgerStorage;
+        GarbageCollectorThread garbageCollectorThread = ledgerStorage.gcThread;
+        CompactionScannerFactory compactionScannerFactory = garbageCollectorThread.scannerFactory;
+        long entryLogId = 0;
+        EntryLogger entryLogger = ledgerStorage.entryLogger;
+
+        LOG.info("Before compaction -- Least unflushed log id: {}", entryLogger.getLeastUnflushedLogId());
+
+        // Compact entryLog 0
+        EntryLogScanner scanner = compactionScannerFactory.newScanner(entryLogger.getEntryLogMetadata(entryLogId));
+
+        entryLogger.scanEntryLog(entryLogId, scanner);
+
+        long entryLogIdAfterCompaction = entryLogger.getLeastUnflushedLogId();
+        LOG.info("After compaction -- Least unflushed log id: {}", entryLogIdAfterCompaction);
+
+        // Add more entries to trigger entrylog roll over
+        LedgerHandle[] lhs2 = prepareData(3, false);
+
+        for (LedgerHandle lh : lhs2) {
+            lh.close();
+        }
+
+        // Wait for entry logger to move forward
+        while (entryLogger.getLeastUnflushedLogId() <= entryLogIdAfterCompaction) {
+            Thread.sleep(100);
+        }
+
+        long entryLogIdBeforeFlushing = entryLogger.getLeastUnflushedLogId();
+        LOG.info("Added more data -- Least unflushed log id: {}", entryLogIdBeforeFlushing);
+
+        Assert.assertTrue(entryLogIdAfterCompaction < entryLogIdBeforeFlushing);
+
+        // Wait for entries to be flushed on entry logs and update index
+        // This operation should succeed even if the entry log rolls over after the last entry was compacted
+        compactionScannerFactory.flush();
+    }
 }