Fix logic in Bookkeeper forceAllowCompaction to run only when force is set or configured interval (#2675)
Without the attached fix, when forceAllowCompaction and isForceGCAllowWhenNoSpace are set. The majorCompaction runs with every garbage collection. This results due to the fact that enableMajorCompaction which takes a negative interval into account to disable compaction always returns true since the difference in time will always be larger than -1. The time check must be "and" with the enableMajorCompaciton for the time check to be valid. And the force flag may also be valid when the isForce(Major/Minor)CompactionAllow is set to true which will validate the threshold in the force case is set.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 3d6f132..fb54890 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -212,10 +212,10 @@
}
if (isForceAllowCompaction) {
- if (minorCompactionThreshold > 0 || minorCompactionThreshold < 1.0f) {
+ if (minorCompactionThreshold > 0 && minorCompactionThreshold < 1.0f) {
isForceMinorCompactionAllow = true;
}
- if (majorCompactionThreshold > 0 || majorCompactionThreshold < 1.0f) {
+ if (majorCompactionThreshold > 0 && majorCompactionThreshold < 1.0f) {
isForceMajorCompactionAllow = true;
}
}
@@ -361,8 +361,9 @@
}
long curTime = System.currentTimeMillis();
- if ((isForceMajorCompactionAllow || enableMajorCompaction) && (!suspendMajor)
- && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
+ if (((isForceMajorCompactionAllow && force)
+ || (enableMajorCompaction && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)))
+ && (!suspendMajor)) {
// enter major compaction
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
majorCompacting.set(true);
@@ -372,8 +373,9 @@
lastMinorCompactionTime = lastMajorCompactionTime;
gcStats.getMajorCompactionCounter().inc();
majorCompacting.set(false);
- } else if ((isForceMinorCompactionAllow || enableMinorCompaction) && (!suspendMinor)
- && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
+ } else if (((isForceMinorCompactionAllow && force)
+ || (enableMinorCompaction && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)))
+ && (!suspendMinor)) {
// enter minor compaction
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
minorCompacting.set(true);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index fef31a0..e361421 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -468,6 +468,77 @@
@Test
+ public void testForceMinorCompaction() throws Exception {
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ // disable major compaction
+ baseConf.setMajorCompactionThreshold(0.0f);
+ baseConf.setGcWaitTime(60000);
+ baseConf.setMinorCompactionInterval(-1);
+ baseConf.setMajorCompactionInterval(-1);
+ baseConf.setForceAllowCompaction(true);
+
+ // restart bookies
+ restartBookies(baseConf);
+
+ getGCThread().enableForceGC();
+ getGCThread().triggerGC().get();
+ assertTrue(
+ "ACTIVE_ENTRY_LOG_COUNT should have been updated",
+ getStatsProvider(0)
+ .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_COUNT)
+ .getSample().intValue() > 0);
+ assertTrue(
+ "ACTIVE_ENTRY_LOG_SPACE_BYTES should have been updated",
+ getStatsProvider(0)
+ .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_SPACE_BYTES)
+ .getSample().intValue() > 0);
+
+ long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+ long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+ assertFalse(getGCThread().enableMajorCompaction);
+ assertFalse(getGCThread().enableMinorCompaction);
+
+ // remove ledger2 and ledger3
+ bkc.deleteLedger(lhs[1].getId());
+ bkc.deleteLedger(lhs[2].getId());
+
+ LOG.info("Finished deleting the ledgers contains most entries.");
+ getGCThread().enableForceGC();
+ getGCThread().triggerGC().get();
+
+ // after garbage collection, major compaction should not be executed
+ assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
+ assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
+
+ // entry logs ([0,1,2].log) should be compacted.
+ for (File ledgerDirectory : tmpDirs) {
+ assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
+ + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
+ }
+
+ // even though entry log files are removed, we still can access entries for ledger1
+ // since those entries have been compacted to a new entry log
+ verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
+
+ assertTrue(
+ "RECLAIMED_COMPACTION_SPACE_BYTES should have been updated",
+ getStatsProvider(0)
+ .getCounter("bookie.gc." + RECLAIMED_COMPACTION_SPACE_BYTES)
+ .get().intValue() > 0);
+ assertTrue(
+ "RECLAIMED_DELETION_SPACE_BYTES should have been updated",
+ getStatsProvider(0)
+ .getCounter("bookie.gc." + RECLAIMED_DELETION_SPACE_BYTES)
+ .get().intValue() > 0);
+ }
+
+ @Test
public void testMinorCompactionWithNoWritableLedgerDirs() throws Exception {
// prepare data
LedgerHandle[] lhs = prepareData(3, false);
@@ -657,6 +728,55 @@
}
@Test
+ public void testForceMajorCompaction() throws Exception {
+
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, true);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ // disable minor compaction
+ baseConf.setMinorCompactionThreshold(0.0f);
+ baseConf.setGcWaitTime(60000);
+ baseConf.setMinorCompactionInterval(-1);
+ baseConf.setMajorCompactionInterval(-1);
+ baseConf.setForceAllowCompaction(true);
+
+ // restart bookies
+ restartBookies(baseConf);
+
+ long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+ long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+ assertFalse(getGCThread().enableMajorCompaction);
+ assertFalse(getGCThread().enableMinorCompaction);
+ assertTrue(getGCThread().isForceMajorCompactionAllow);
+ assertFalse(getGCThread().isForceMinorCompactionAllow);
+
+ // remove ledger1 and ledger3
+ bkc.deleteLedger(lhs[0].getId());
+ bkc.deleteLedger(lhs[2].getId());
+ LOG.info("Finished deleting the ledgers contains most entries.");
+ getGCThread().enableForceGC();
+ getGCThread().triggerGC().get();
+
+ // after garbage collection, minor compaction should not be executed
+ assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
+ assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime);
+
+ // entry logs ([0,1,2].log) should be compacted
+ for (File ledgerDirectory : tmpDirs) {
+ assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
+ + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
+ }
+
+ // even entry log files are removed, we still can access entries for ledger2
+ // since those entries has been compacted to new entry log
+ verifyLedger(lhs[1].getId(), 0, lhs[1].getLastAddConfirmed());
+ }
+
+ @Test
public void testCompactionPersistence() throws Exception {
/*
* for this test scenario we are assuming that there will be only one