| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.bookie; |
| |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ACTIVE_ENTRY_LOG_COUNT; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ACTIVE_ENTRY_LOG_SPACE_BYTES; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MAJOR_COMPACTION_COUNT; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MINOR_COMPACTION_COUNT; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_COMPACTION_SPACE_BYTES; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_DELETION_SPACE_BYTES; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.THREAD_RUNTIME; |
| import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTED_SUFFIX; |
| import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import com.google.common.util.concurrent.UncheckedExecutionException; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| import io.netty.buffer.UnpooledByteBufAllocator; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Supplier; |
| import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException; |
| import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; |
| import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; |
| import org.apache.bookkeeper.bookie.storage.ldb.PersistentEntryLogMetadataMap; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.client.BookKeeper.DigestType; |
| import org.apache.bookkeeper.client.LedgerEntry; |
| import org.apache.bookkeeper.client.LedgerHandle; |
| import org.apache.bookkeeper.client.api.LedgerMetadata; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.conf.TestBKConfiguration; |
| import org.apache.bookkeeper.meta.LedgerManager; |
| import org.apache.bookkeeper.proto.BookieServer; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; |
| import org.apache.bookkeeper.proto.checksum.DigestManager; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.test.BookKeeperClusterTestCase; |
| import org.apache.bookkeeper.test.TestStatsProvider; |
| import org.apache.bookkeeper.util.DiskChecker; |
| import org.apache.bookkeeper.util.TestUtils; |
| import org.apache.bookkeeper.versioning.Version; |
| import org.apache.bookkeeper.versioning.Versioned; |
| import org.apache.zookeeper.AsyncCallback; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class tests the entry log compaction functionality. |
| */ |
| public abstract class CompactionTest extends BookKeeperClusterTestCase { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(CompactionTest.class); |
| |
| private static final int ENTRY_SIZE = 1024; |
| private static final int NUM_BOOKIES = 1; |
| |
| private final boolean isThrottleByBytes; |
| private final DigestType digestType; |
| private final byte[] passwdBytes; |
| private final int numEntries; |
| private final int gcWaitTime; |
| private final double minorCompactionThreshold; |
| private final double majorCompactionThreshold; |
| private final long minorCompactionInterval; |
| private final long majorCompactionInterval; |
| private final String msg; |
| private final boolean useMetadataCache; |
| |
| public CompactionTest(boolean isByBytes, boolean useMetadataCache) { |
| super(NUM_BOOKIES); |
| |
| this.isThrottleByBytes = isByBytes; |
| this.useMetadataCache = useMetadataCache; |
| this.digestType = DigestType.CRC32; |
| this.passwdBytes = "".getBytes(); |
| numEntries = 100; |
| gcWaitTime = 1000; |
| minorCompactionThreshold = 0.1f; |
| majorCompactionThreshold = 0.5f; |
| minorCompactionInterval = 2 * gcWaitTime / 1000; |
| majorCompactionInterval = 4 * gcWaitTime / 1000; |
| |
| // a dummy message |
| StringBuilder msgSB = new StringBuilder(); |
| for (int i = 0; i < ENTRY_SIZE; i++) { |
| msgSB.append("a"); |
| } |
| msg = msgSB.toString(); |
| } |
| |
| @Before |
| @Override |
| public void setUp() throws Exception { |
| // Set up the configuration properties needed. |
| baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE); |
| // Disable skip list for compaction |
| baseConf.setGcWaitTime(gcWaitTime); |
| baseConf.setFlushInterval(100); |
| baseConf.setMinorCompactionThreshold(minorCompactionThreshold); |
| baseConf.setMajorCompactionThreshold(majorCompactionThreshold); |
| baseConf.setMinorCompactionInterval(minorCompactionInterval); |
| baseConf.setMajorCompactionInterval(majorCompactionInterval); |
| baseConf.setEntryLogFilePreAllocationEnabled(false); |
| baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); |
| baseConf.setIsThrottleByBytes(this.isThrottleByBytes); |
| baseConf.setIsForceGCAllowWhenNoSpace(false); |
| baseConf.setGcEntryLogMetadataCacheEnabled(useMetadataCache); |
| super.setUp(); |
| } |
| |
| private GarbageCollectorThread getGCThread() throws Exception { |
| assertEquals(1, bookieCount()); |
| BookieServer server = serverByIndex(0); |
| return ((InterleavedLedgerStorage) server.getBookie().getLedgerStorage()).gcThread; |
| } |
| |
| LedgerHandle[] prepareData(int numEntryLogs, boolean changeNum) |
| throws Exception { |
| // since an entry log file can hold at most 100 entries |
| // first ledger write 2 entries, which is less than low water mark |
| int num1 = 2; |
| // third ledger write more than high water mark entries |
| int num3 = (int) (numEntries * 0.7f); |
| // second ledger write remaining entries, which is higher than low water mark |
| // and less than high water mark |
| int num2 = numEntries - num3 - num1; |
| |
| LedgerHandle[] lhs = new LedgerHandle[3]; |
| for (int i = 0; i < 3; ++i) { |
| lhs[i] = bkc.createLedger(NUM_BOOKIES, NUM_BOOKIES, digestType, passwdBytes); |
| } |
| |
| for (int n = 0; n < numEntryLogs; n++) { |
| for (int k = 0; k < num1; k++) { |
| lhs[0].addEntry(msg.getBytes()); |
| } |
| for (int k = 0; k < num2; k++) { |
| lhs[1].addEntry(msg.getBytes()); |
| } |
| for (int k = 0; k < num3; k++) { |
| lhs[2].addEntry(msg.getBytes()); |
| } |
| if (changeNum) { |
| --num2; |
| ++num3; |
| } |
| } |
| |
| return lhs; |
| } |
| |
| private void verifyLedger(long lid, long startEntryId, long endEntryId) throws Exception { |
| LedgerHandle lh = bkc.openLedger(lid, digestType, passwdBytes); |
| Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId); |
| while (entries.hasMoreElements()) { |
| LedgerEntry entry = entries.nextElement(); |
| assertEquals(msg, new String(entry.getEntry())); |
| } |
| } |
| |
| @Test |
| public void testDisableCompaction() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| // disable compaction |
| // restart bookies |
| restartBookies(c -> { |
| c.setMinorCompactionThreshold(0.0f); |
| c.setMajorCompactionThreshold(0.0f); |
| return c; |
| }); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| |
| // remove ledger2 and ledger3 |
| // so entry log 1 and 2 would have ledger1 entries left |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertFalse(getGCThread().enableMinorCompaction); |
| getGCThread().triggerGC().get(); |
| if (useMetadataCache) { |
| assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap); |
| } |
| |
| // after garbage collection, compaction should not be executed |
| assertEquals(lastMinorCompactionTime, getGCThread().lastMinorCompactionTime); |
| assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); |
| |
| // entry logs ([0,1].log) should not be compacted. |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertTrue("Not Found entry log file ([0,1].log that should have been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, false, 0, 1)); |
| } |
| } |
| |
| @Test |
| public void testForceGarbageCollectionWhenDisableCompactionConfigurationSettings() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| restartBookies(c -> { |
| c.setForceAllowCompaction(true); |
| c.setMajorCompactionThreshold(0.5f); |
| c.setMinorCompactionThreshold(0.2f); |
| c.setMajorCompactionInterval(0); |
| c.setMinorCompactionInterval(0); |
| return c; |
| }); |
| |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertFalse(getGCThread().enableMinorCompaction); |
| assertTrue(getGCThread().isForceMajorCompactionAllow); |
| assertTrue(getGCThread().isForceMinorCompactionAllow); |
| |
| assertEquals(0.5f, getGCThread().majorCompactionThreshold, 0f); |
| assertEquals(0.2f, getGCThread().minorCompactionThreshold, 0f); |
| } |
| |
| @Test |
| public void testForceGarbageCollection() throws Exception { |
| testForceGarbageCollection(true); |
| testForceGarbageCollection(false); |
| } |
| |
| public void testForceGarbageCollection(boolean isForceCompactionAllowWhenDisableCompaction) throws Exception { |
| ServerConfiguration conf = newServerConfiguration(); |
| conf.setGcWaitTime(60000); |
| if (isForceCompactionAllowWhenDisableCompaction) { |
| conf.setMinorCompactionInterval(0); |
| conf.setMajorCompactionInterval(0); |
| conf.setForceAllowCompaction(true); |
| conf.setMajorCompactionThreshold(0.5f); |
| conf.setMinorCompactionThreshold(0.2f); |
| } else { |
| conf.setMinorCompactionInterval(120000); |
| conf.setMajorCompactionInterval(240000); |
| } |
| LedgerDirsManager dirManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| CheckpointSource cp = new CheckpointSource() { |
| @Override |
| public Checkpoint newCheckpoint() { |
| // Do nothing. |
| return null; |
| } |
| |
| @Override |
| public void checkpointComplete(Checkpoint checkPoint, boolean compact) |
| throws IOException { |
| // Do nothing. |
| } |
| }; |
| for (File journalDir : conf.getJournalDirs()) { |
| BookieImpl.checkDirectoryStructure(journalDir); |
| } |
| for (File dir : dirManager.getAllLedgerDirs()) { |
| BookieImpl.checkDirectoryStructure(dir); |
| } |
| runFunctionWithLedgerManagerFactory(conf, lmf -> { |
| try (LedgerManager lm = lmf.newLedgerManager()) { |
| InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); |
| storage.initialize( |
| conf, |
| lm, |
| dirManager, |
| dirManager, |
| NullStatsLogger.INSTANCE, |
| UnpooledByteBufAllocator.DEFAULT); |
| storage.setCheckpointSource(cp); |
| storage.setCheckpointer(Checkpointer.NULL); |
| |
| storage.start(); |
| long startTime = System.currentTimeMillis(); |
| storage.gcThread.enableForceGC(); |
| storage.gcThread.triggerGC().get(); //major |
| storage.gcThread.triggerGC().get(); //minor |
| // Minor and Major compaction times should be larger than when we started |
| // this test. |
| assertTrue("Minor or major compaction did not trigger even on forcing.", |
| storage.gcThread.lastMajorCompactionTime > startTime |
| && storage.gcThread.lastMinorCompactionTime > startTime); |
| storage.shutdown(); |
| } catch (Exception e) { |
| throw new UncheckedExecutionException(e.getMessage(), e); |
| } |
| return null; |
| }); |
| } |
| |
| @Test |
| public void testForceGarbageCollectionWhenDiskIsFull() throws Exception { |
| testForceGarbageCollectionWhenDiskIsFull(true); |
| testForceGarbageCollectionWhenDiskIsFull(false); |
| } |
| |
| public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAllowWhenDisableCompaction) |
| throws Exception { |
| |
| restartBookies(conf -> { |
| if (isForceCompactionAllowWhenDisableCompaction) { |
| conf.setMinorCompactionInterval(0); |
| conf.setMajorCompactionInterval(0); |
| conf.setForceAllowCompaction(true); |
| conf.setMajorCompactionThreshold(0.5f); |
| conf.setMinorCompactionThreshold(0.2f); |
| } else { |
| conf.setMinorCompactionInterval(120000); |
| conf.setMajorCompactionInterval(240000); |
| } |
| return conf; |
| }); |
| |
| getGCThread().suspendMajorGC(); |
| getGCThread().suspendMinorGC(); |
| long majorCompactionCntBeforeGC = 0; |
| long minorCompactionCntBeforeGC = 0; |
| long majorCompactionCntAfterGC = 0; |
| long minorCompactionCntAfterGC = 0; |
| |
| // disable forceMajor and forceMinor |
| majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter(); |
| minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter(); |
| getGCThread().triggerGC(true, true, true).get(); |
| majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter(); |
| minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter(); |
| assertEquals(majorCompactionCntBeforeGC, majorCompactionCntAfterGC); |
| assertEquals(minorCompactionCntBeforeGC, minorCompactionCntAfterGC); |
| |
| // enable forceMajor and forceMinor |
| majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter(); |
| minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter(); |
| getGCThread().triggerGC(true, false, false).get(); |
| majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter(); |
| minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter(); |
| assertEquals(majorCompactionCntBeforeGC + 1, majorCompactionCntAfterGC); |
| assertEquals(minorCompactionCntBeforeGC, minorCompactionCntAfterGC); |
| |
| // enable forceMajor and disable forceMinor |
| majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter(); |
| minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter(); |
| getGCThread().triggerGC(true, false, true).get(); |
| majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter(); |
| minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter(); |
| assertEquals(majorCompactionCntBeforeGC + 1, majorCompactionCntAfterGC); |
| assertEquals(minorCompactionCntBeforeGC, minorCompactionCntAfterGC); |
| |
| // disable forceMajor and enable forceMinor |
| majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter(); |
| minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter(); |
| getGCThread().triggerGC(true, true, false).get(); |
| majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter(); |
| minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter(); |
| assertEquals(majorCompactionCntBeforeGC, majorCompactionCntAfterGC); |
| assertEquals(minorCompactionCntBeforeGC + 1, minorCompactionCntAfterGC); |
| |
| } |
| |
| @Test |
| public void testMinorCompaction() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // restart bookies |
| restartBookies(c -> { |
| // disable major compaction |
| c.setMajorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(120000); |
| c.setMajorCompactionInterval(240000); |
| return c; |
| }); |
| |
| |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| if (useMetadataCache) { |
| assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap); |
| } |
| assertTrue( |
| "ACTIVE_ENTRY_LOG_COUNT should have been updated", |
| getStatsProvider(0) |
| .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_COUNT) |
| .getSample().intValue() > 0); |
| assertTrue( |
| "ACTIVE_ENTRY_LOG_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_SPACE_BYTES) |
| .getSample().intValue() > 0); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertTrue(getGCThread().enableMinorCompaction); |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| |
| // after garbage collection, major compaction should not be executed |
| assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should be compacted. |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // even though entry log files are removed, we still can access entries for ledger1 |
| // since those entries have been compacted to a new entry log |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| |
| assertTrue( |
| "RECLAIMED_COMPACTION_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getCounter("bookie.gc." + RECLAIMED_COMPACTION_SPACE_BYTES) |
| .get().intValue() > 0); |
| assertTrue( |
| "RECLAIMED_DELETION_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getCounter("bookie.gc." + RECLAIMED_DELETION_SPACE_BYTES) |
| .get().intValue() > 0); |
| } |
| |
| @Test |
| public void testMinorCompactionWithMaxTimeMillisOk() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(6, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // disable major compaction |
| // restart bookies |
| restartBookies(c-> { |
| c.setMajorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(120000); |
| c.setMajorCompactionInterval(240000); |
| |
| // Setup limit on compaction duration. |
| // The limit is enough to compact. |
| c.setMinorCompactionMaxTimeMillis(5000); |
| c.setMajorCompactionMaxTimeMillis(5000); |
| |
| return c; |
| }); |
| |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| assertTrue( |
| "ACTIVE_ENTRY_LOG_COUNT should have been updated", |
| getStatsProvider(0) |
| .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_COUNT) |
| .getSample().intValue() > 0); |
| assertTrue( |
| "ACTIVE_ENTRY_LOG_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_SPACE_BYTES) |
| .getSample().intValue() > 0); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertTrue(getGCThread().enableMinorCompaction); |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| |
| // after garbage collection, major compaction should not be executed |
| assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should be compacted. |
| for (File ledgerDirectory : tmpDirs.getDirs()) { |
| assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // even though entry log files are removed, we still can access entries for ledger1 |
| // since those entries have been compacted to a new entry log |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| |
| assertTrue( |
| "RECLAIMED_COMPACTION_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getCounter("bookie.gc." + RECLAIMED_COMPACTION_SPACE_BYTES) |
| .get().intValue() > 0); |
| assertTrue( |
| "RECLAIMED_DELETION_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getCounter("bookie.gc." + RECLAIMED_DELETION_SPACE_BYTES) |
| .get().intValue() > 0); |
| } |
| |
| |
| @Test |
| public void testMinorCompactionWithMaxTimeMillisTooShort() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(6, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // disable major compaction |
| // restart bookies |
| restartBookies(c-> { |
| c.setMajorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(120000); |
| c.setMajorCompactionInterval(240000); |
| |
| // Setup limit on compaction duration. |
| // The limit is not enough to finish the compaction |
| c.setMinorCompactionMaxTimeMillis(1); |
| c.setMajorCompactionMaxTimeMillis(1); |
| |
| return c; |
| }); |
| |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| assertTrue( |
| "ACTIVE_ENTRY_LOG_COUNT should have been updated", |
| getStatsProvider(0) |
| .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_COUNT) |
| .getSample().intValue() > 0); |
| assertTrue( |
| "ACTIVE_ENTRY_LOG_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_SPACE_BYTES) |
| .getSample().intValue() > 0); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertTrue(getGCThread().enableMinorCompaction); |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| |
| // after garbage collection, major compaction should not be executed |
| assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should be compacted. |
| for (File ledgerDirectory : tmpDirs.getDirs()) { |
| // Compaction of at least one of the files should not finish up |
| assertTrue("Not found entry log file ([0,1,2].log that should not have been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| } |
| |
| @Test |
| public void testForceMinorCompaction() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // restart bookies |
| restartBookies(c-> { |
| c.setMajorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(-1); |
| c.setMajorCompactionInterval(-1); |
| c.setForceAllowCompaction(true); |
| return c; |
| }); |
| |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| assertTrue( |
| "ACTIVE_ENTRY_LOG_COUNT should have been updated", |
| getStatsProvider(0) |
| .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_COUNT) |
| .getSample().intValue() > 0); |
| assertTrue( |
| "ACTIVE_ENTRY_LOG_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getGauge("bookie.gc." + ACTIVE_ENTRY_LOG_SPACE_BYTES) |
| .getSample().intValue() > 0); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertFalse(getGCThread().enableMinorCompaction); |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| |
| // after garbage collection, major compaction should not be executed |
| assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should be compacted. |
| for (File ledgerDirectory : tmpDirs.getDirs()) { |
| assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // even though entry log files are removed, we still can access entries for ledger1 |
| // since those entries have been compacted to a new entry log |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| |
| assertTrue( |
| "RECLAIMED_COMPACTION_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getCounter("bookie.gc." + RECLAIMED_COMPACTION_SPACE_BYTES) |
| .get().intValue() > 0); |
| assertTrue( |
| "RECLAIMED_DELETION_SPACE_BYTES should have been updated", |
| getStatsProvider(0) |
| .getCounter("bookie.gc." + RECLAIMED_DELETION_SPACE_BYTES) |
| .get().intValue() > 0); |
| } |
| |
| @Test |
| public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception { |
| // restart bookies |
| restartBookies(c-> { |
| c.setMajorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(120000); |
| c.setMajorCompactionInterval(240000); |
| c.setForceAllowCompaction(true); |
| c.setEntryLogPerLedgerEnabled(true); |
| return c; |
| }); |
| |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertTrue(getGCThread().enableMinorCompaction); |
| |
| // remove ledgers 1 and 2 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| // Need to wait until entry log 3 gets flushed before initiating GC to satisfy assertions. |
| while (!getGCThread().entryLogger.getFlushedLogIds().contains(3L)) { |
| TimeUnit.MILLISECONDS.sleep(100); |
| } |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().triggerGC(true, false, false).get(); |
| |
| assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| |
| // At this point, we have the following state of ledgers end entry logs: |
| // L0 (not deleted) -> E0 (un-flushed): Entry log should exist. |
| // L1 (deleted) -> E1 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC. |
| // L2 (deleted) -> E2 (flushed): Entry log should have been garbage collected. |
| // E3 (flushed): Entry log should have been garbage collected. |
| // E4 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC. |
| assertTrue("Not found entry log files [0, 1, 4].log that should not have been compacted in: " |
| + tmpDirs.getDirs().get(0), TestUtils.hasAllLogFiles(tmpDirs.getDirs().get(0), 0, 1, 4)); |
| assertTrue("Found entry log files [2, 3].log that should have been compacted in ledgerDirectory: " |
| + tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 2, 3)); |
| |
| // Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm |
| // should consider it for deletion. |
| ((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L); |
| getGCThread().triggerGC(true, false, false).get(); |
| assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: " |
| + tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 1)); |
| |
| // Once removed the ledger L0, then deleting E0 is fine (only if it has been flushed). |
| bkc.deleteLedger(lhs[0].getId()); |
| getGCThread().triggerGC(true, false, false).get(); |
| assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: " |
| + tmpDirs.getDirs().get(0), TestUtils.hasAllLogFiles(tmpDirs.getDirs().get(0), 0)); |
| ((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L); |
| getGCThread().triggerGC(true, false, false).get(); |
| assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: " |
| + tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 0)); |
| } |
| |
| @Test |
| public void testMinorCompactionWithNoWritableLedgerDirs() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // restart bookies |
| restartBookies(c -> { |
| // disable major compaction |
| c.setMajorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(120000); |
| c.setMajorCompactionInterval(240000); |
| return c; |
| }); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertTrue(getGCThread().enableMinorCompaction); |
| |
| for (int i = 0; i < bookieCount(); i++) { |
| BookieImpl bookie = ((BookieImpl) serverByIndex(i).getBookie()); |
| LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager(); |
| List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs(); |
| // if all the discs are full then Major and Minor compaction would be disabled since |
| // 'isForceGCAllowWhenNoSpace' is not enabled. Check LedgerDirsListener of interleavedLedgerStorage. |
| for (File ledgerDir : ledgerDirs) { |
| ledgerDirsManager.addToFilledDirs(ledgerDir); |
| } |
| } |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().triggerGC().get(); |
| if (useMetadataCache) { |
| assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap); |
| } |
| // after garbage collection, major compaction should not be executed |
| assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); |
| assertEquals(lastMinorCompactionTime, getGCThread().lastMinorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should still remain, because both major and Minor compaction are disabled. |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertTrue( |
| "All the entry log files ([0,1,2].log are not available, which is not expected" + ledgerDirectory, |
| TestUtils.hasLogFiles(ledgerDirectory, false, 0, 1, 2)); |
| } |
| } |
| |
| @Test |
| public void testMinorCompactionWithNoWritableLedgerDirsButIsForceGCAllowWhenNoSpaceIsSet() throws Exception { |
| stopAllBookies(); |
| ServerConfiguration conf = newServerConfiguration(); |
| // disable major compaction |
| conf.setMajorCompactionThreshold(0.0f); |
| // here we are setting isForceGCAllowWhenNoSpace to true, so Major and Minor compaction wont be disabled in case |
| // when discs are full |
| conf.setIsForceGCAllowWhenNoSpace(true); |
| conf.setGcWaitTime(600000); |
| conf.setMinorCompactionInterval(120000); |
| conf.setMajorCompactionInterval(240000); |
| // We need at least 2 ledger dirs because compaction will flush ledger cache, and will |
| // trigger relocateIndexFileAndFlushHeader. If we only have one ledger dir, compaction will always fail |
| // when there's no writeable ledger dir. |
| File ledgerDir1 = tmpDirs.createNew("ledger", "test1"); |
| File ledgerDir2 = tmpDirs.createNew("ledger", "test2"); |
| File journalDir = tmpDirs.createNew("journal", "test"); |
| String[] ledgerDirNames = new String[]{ |
| ledgerDir1.getPath(), |
| ledgerDir2.getPath() |
| }; |
| conf.setLedgerDirNames(ledgerDirNames); |
| conf.setJournalDirName(journalDir.getPath()); |
| BookieServer server = startAndAddBookie(conf).getServer(); |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertTrue(getGCThread().enableMinorCompaction); |
| if (useMetadataCache) { |
| assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap); |
| } |
| for (int i = 0; i < bookieCount(); i++) { |
| BookieImpl bookie = ((BookieImpl) serverByIndex(i).getBookie()); |
| bookie.getLedgerStorage().flush(); |
| bookie.dirsMonitor.shutdown(); |
| LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager(); |
| List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs(); |
| // Major and Minor compaction are not disabled even though discs are full. Check LedgerDirsListener of |
| // interleavedLedgerStorage. |
| for (File ledgerDir : ledgerDirs) { |
| ledgerDirsManager.addToFilledDirs(ledgerDir); |
| } |
| } |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().triggerGC(true, false, false).get(); |
| |
| // after garbage collection, major compaction should not be executed |
| assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| |
| // though all discs are added to filled dirs list, compaction would succeed, because in EntryLogger for |
| // allocating newlog |
| // we get getWritableLedgerDirsForNewLog() of ledgerDirsManager instead of getWritableLedgerDirs() |
| // entry logs ([0,1,2].log) should be compacted. |
| for (File ledgerDirectory : ((BookieImpl) server.getBookie()).getLedgerDirsManager().getAllLedgerDirs()) { |
| assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory.getParentFile(), true, 0, 1, 2)); |
| } |
| |
| // even entry log files are removed, we still can access entries for ledger1 |
| // since those entries has been compacted to new entry log |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| |
| // for the sake of validity of test lets make sure that there is no writableLedgerDir in the bookies |
| for (int i = 0; i < bookieCount(); i++) { |
| BookieImpl bookie = (BookieImpl) serverByIndex(i).getBookie(); |
| LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager(); |
| try { |
| List<File> ledgerDirs = ledgerDirsManager.getWritableLedgerDirs(); |
| // it is expected not to have any writableLedgerDirs since we added all of them to FilledDirs |
| fail("It is expected not to have any writableLedgerDirs"); |
| } catch (NoWritableLedgerDirException nwe) { |
| |
| } |
| } |
| } |
| |
| @Test |
| public void testMajorCompaction() throws Exception { |
| |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, true); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // restart bookies |
| restartBookies(c -> { |
| // disable minor compaction |
| c.setMinorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(120000); |
| c.setMajorCompactionInterval(240000); |
| return c; |
| }); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertTrue(getGCThread().enableMajorCompaction); |
| assertFalse(getGCThread().enableMinorCompaction); |
| |
| // remove ledger1 and ledger3 |
| bkc.deleteLedger(lhs[0].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| if (useMetadataCache) { |
| assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap); |
| } |
| // after garbage collection, minor compaction should not be executed |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should be compacted |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // even entry log files are removed, we still can access entries for ledger2 |
| // since those entries has been compacted to new entry log |
| verifyLedger(lhs[1].getId(), 0, lhs[1].getLastAddConfirmed()); |
| } |
| |
| @Test |
| public void testForceMajorCompaction() throws Exception { |
| |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, true); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // restart bookies |
| restartBookies(c-> { |
| // disable minor compaction |
| c.setMinorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(-1); |
| c.setMajorCompactionInterval(-1); |
| c.setForceAllowCompaction(true); |
| return c; |
| }); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertFalse(getGCThread().enableMajorCompaction); |
| assertFalse(getGCThread().enableMinorCompaction); |
| assertTrue(getGCThread().isForceMajorCompactionAllow); |
| assertFalse(getGCThread().isForceMinorCompactionAllow); |
| |
| // remove ledger1 and ledger3 |
| bkc.deleteLedger(lhs[0].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| if (useMetadataCache) { |
| assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap); |
| } |
| // after garbage collection, minor compaction should not be executed |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should be compacted |
| for (File ledgerDirectory : tmpDirs.getDirs()) { |
| assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // even entry log files are removed, we still can access entries for ledger2 |
| // since those entries has been compacted to new entry log |
| verifyLedger(lhs[1].getId(), 0, lhs[1].getLastAddConfirmed()); |
| } |
| |
| @Test |
| public void testCompactionPersistence() throws Exception { |
| /* |
| * for this test scenario we are assuming that there will be only one |
| * bookie in the cluster |
| */ |
| assertEquals("Numbers of Bookies in this cluster", 1, numBookies); |
| /* |
| * this test is for validating EntryLogCompactor, so make sure |
| * TransactionalCompaction is not enabled. |
| */ |
| assertFalse("Bookies must be using EntryLogCompactor", baseConf.getUseTransactionalCompaction()); |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, true); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // restart bookies |
| restartBookies(c -> { |
| // disable minor compaction |
| c.setMinorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(120000); |
| c.setMajorCompactionInterval(240000); |
| return c; |
| }); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertTrue(getGCThread().enableMajorCompaction); |
| assertFalse(getGCThread().enableMinorCompaction); |
| |
| // remove ledger1 and ledger3 |
| bkc.deleteLedger(lhs[0].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| |
| // after garbage collection, minor compaction should not be executed |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should be compacted |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // even entry log files are removed, we still can access entries for |
| // ledger2 |
| // since those entries has been compacted to new entry log |
| long ledgerId = lhs[1].getId(); |
| long lastAddConfirmed = lhs[1].getLastAddConfirmed(); |
| verifyLedger(ledgerId, 0, lastAddConfirmed); |
| |
| /* |
| * there is only one bookie in the cluster so we should be able to read |
| * entries from this bookie. |
| */ |
| ServerConfiguration bookieServerConfig = ((BookieImpl) serverByIndex(0).getBookie()).conf; |
| ServerConfiguration newBookieConf = new ServerConfiguration(bookieServerConfig); |
| /* |
| * by reusing bookieServerConfig and setting metadataServiceUri to null |
| * we can create/start new Bookie instance using the same data |
| * (journal/ledger/index) of the existing BookeieServer for our testing |
| * purpose. |
| */ |
| newBookieConf.setMetadataServiceUri(null); |
| String entryLogCachePath = tmpDirs.createNew("entry", "bk2").getAbsolutePath(); |
| newBookieConf.setGcEntryLogMetadataCachePath(entryLogCachePath); |
| Bookie newbookie = new TestBookieImpl(newBookieConf); |
| |
| DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes, |
| BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT, |
| baseClientConf.getUseV2WireProtocol()); |
| |
| for (long entryId = 0; entryId <= lastAddConfirmed; entryId++) { |
| ByteBuf readEntryBufWithChecksum = newbookie.readEntry(ledgerId, entryId); |
| ByteBuf readEntryBuf = digestManager.verifyDigestAndReturnData(entryId, readEntryBufWithChecksum); |
| byte[] readEntryBytes = new byte[readEntryBuf.readableBytes()]; |
| readEntryBuf.readBytes(readEntryBytes); |
| assertEquals(msg, new String(readEntryBytes)); |
| } |
| } |
| |
| @Test |
| public void testCompactionWhenLedgerDirsAreFull() throws Exception { |
| /* |
| * for this test scenario we are assuming that there will be only one |
| * bookie in the cluster |
| */ |
| assertEquals("Numbers of Bookies in this cluster", 1, bookieCount()); |
| ServerConfiguration serverConfig = confByIndex(0); |
| File ledgerDir = serverConfig.getLedgerDirs()[0]; |
| assertEquals("Number of Ledgerdirs for this bookie", 1, serverConfig.getLedgerDirs().length); |
| assertTrue("indexdirs should be configured to null", null == serverConfig.getIndexDirs()); |
| /* |
| * this test is for validating EntryLogCompactor, so make sure |
| * TransactionalCompaction is not enabled. |
| */ |
| assertFalse("Bookies must be using EntryLogCompactor", baseConf.getUseTransactionalCompaction()); |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, true); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| serverByIndex(0).getBookie().getLedgerStorage().flush(); |
| assertTrue( |
| "entry log file ([0,1,2].log should be available in ledgerDirectory: " |
| + serverConfig.getLedgerDirs()[0], |
| TestUtils.hasLogFiles(serverConfig.getLedgerDirs()[0], false, 0, 1, 2)); |
| |
| long usableSpace = ledgerDir.getUsableSpace(); |
| long totalSpace = ledgerDir.getTotalSpace(); |
| |
| /* |
| * because of the value set for diskUsageThreshold, when bookie is |
| * restarted it wouldn't find any writableledgerdir. But we have set |
| * very low values for minUsableSizeForEntryLogCreation and |
| * minUsableSizeForIndexFileCreation, so it should be able to create |
| * EntryLog file and Index file for doing compaction. |
| */ |
| |
| // restart bookies |
| restartBookies(c -> { |
| c.setForceReadOnlyBookie(true); |
| c.setIsForceGCAllowWhenNoSpace(true); |
| // disable minor compaction |
| c.setMinorCompactionThreshold(0.0f); |
| c.setGcWaitTime(60000); |
| c.setMinorCompactionInterval(120000); |
| c.setMajorCompactionInterval(240000); |
| c.setMinUsableSizeForEntryLogCreation(1); |
| c.setMinUsableSizeForIndexFileCreation(1); |
| c.setDiskUsageThreshold((1.0f - ((float) usableSpace / (float) totalSpace)) * 0.9f); |
| c.setDiskUsageWarnThreshold(0.0f); |
| return c; |
| }); |
| |
| assertFalse("There shouldn't be any writable ledgerDir", |
| ((BookieImpl) serverByIndex(0).getBookie()).getLedgerDirsManager().hasWritableLedgerDirs()); |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertTrue(getGCThread().enableMajorCompaction); |
| assertFalse(getGCThread().enableMinorCompaction); |
| |
| // remove ledger1 and ledger3 |
| bkc.deleteLedger(lhs[0].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| if (useMetadataCache) { |
| assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap); |
| } |
| // after garbage collection, minor compaction should not be executed |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime); |
| |
| /* |
| * GarbageCollection should have succeeded, so no previous entrylog |
| * should be available. |
| */ |
| |
| // entry logs ([0,1,2].log) should be compacted |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // even entry log files are removed, we still can access entries for |
| // ledger2 |
| // since those entries has been compacted to new entry log |
| long ledgerId = lhs[1].getId(); |
| long lastAddConfirmed = lhs[1].getLastAddConfirmed(); |
| verifyLedger(ledgerId, 0, lastAddConfirmed); |
| } |
| |
| @Test |
| public void testMajorCompactionAboveThreshold() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; |
| long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; |
| assertTrue(getGCThread().enableMajorCompaction); |
| assertTrue(getGCThread().enableMinorCompaction); |
| |
| // remove ledger1 and ledger2 |
| bkc.deleteLedger(lhs[0].getId()); |
| bkc.deleteLedger(lhs[1].getId()); |
| LOG.info("Finished deleting the ledgers contains less entries."); |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| |
| // after garbage collection, minor compaction should not be executed |
| assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); |
| assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime); |
| |
| // entry logs ([0,1,2].log) should not be compacted |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertTrue("Not Found entry log file ([1,2].log that should have been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, false, 0, 1, 2)); |
| } |
| } |
| |
| @Test |
| public void testCompactionSmallEntryLogs() throws Exception { |
| |
| // create a ledger to write a few entries |
| LedgerHandle alh = bkc.createLedger(NUM_BOOKIES, NUM_BOOKIES, digestType, "".getBytes()); |
| for (int i = 0; i < 3; i++) { |
| alh.addEntry(msg.getBytes()); |
| } |
| alh.close(); |
| |
| // restart bookie to roll entry log files |
| restartBookies(); |
| |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| // restart bookies again to roll entry log files. |
| restartBookies(); |
| |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| if (useMetadataCache) { |
| assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap); |
| } |
| // entry logs (0.log) should not be compacted |
| // entry logs ([1,2,3].log) should be compacted. |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertTrue("Not Found entry log file ([0].log that should have been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0)); |
| assertFalse("Found entry log file ([1,2,3].log that should have not been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 1, 2, 3)); |
| } |
| |
| // even entry log files are removed, we still can access entries for ledger1 |
| // since those entries has been compacted to new entry log |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| } |
| |
| /** |
| * Test that compaction doesnt add to index without having persisted |
| * entrylog first. This is needed because compaction doesn't go through the journal. |
| * {@see https://issues.apache.org/jira/browse/BOOKKEEPER-530} |
| * {@see https://issues.apache.org/jira/browse/BOOKKEEPER-664} |
| */ |
| @Test |
| public void testCompactionSafety() throws Exception { |
| tearDown(); // I dont want the test infrastructure |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| final Set<Long> ledgers = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>()); |
| LedgerManager manager = getLedgerManager(ledgers); |
| |
| File tmpDir = tmpDirs.createNew("bkTest", ".dir"); |
| File curDir = BookieImpl.getCurrentDirectory(tmpDir); |
| BookieImpl.checkDirectoryStructure(curDir); |
| conf.setLedgerDirNames(new String[] {tmpDir.toString()}); |
| |
| conf.setEntryLogSizeLimit(DefaultEntryLogger.LOGFILE_HEADER_SIZE + 3 * (4 + ENTRY_SIZE)); |
| conf.setGcWaitTime(100); |
| conf.setMinorCompactionThreshold(0.7f); |
| conf.setMajorCompactionThreshold(0.0f); |
| conf.setMinorCompactionInterval(1); |
| conf.setMajorCompactionInterval(10); |
| conf.setPageLimit(1); |
| |
| CheckpointSource checkpointSource = new CheckpointSource() { |
| AtomicInteger idGen = new AtomicInteger(0); |
| class MyCheckpoint implements CheckpointSource.Checkpoint { |
| int id = idGen.incrementAndGet(); |
| @Override |
| public int compareTo(CheckpointSource.Checkpoint o) { |
| if (o == CheckpointSource.Checkpoint.MAX) { |
| return -1; |
| } else if (o == CheckpointSource.Checkpoint.MIN) { |
| return 1; |
| } |
| return id - ((MyCheckpoint) o).id; |
| } |
| } |
| |
| @Override |
| public CheckpointSource.Checkpoint newCheckpoint() { |
| return new MyCheckpoint(); |
| } |
| |
| public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean compact) |
| throws IOException { |
| } |
| }; |
| final byte[] key = "foobar".getBytes(); |
| File log0 = new File(curDir, "0.log"); |
| LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| assertFalse("Log shouldnt exist", log0.exists()); |
| InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); |
| storage.initialize( |
| conf, |
| manager, |
| dirs, |
| dirs, |
| NullStatsLogger.INSTANCE, |
| UnpooledByteBufAllocator.DEFAULT); |
| storage.setCheckpointSource(checkpointSource); |
| storage.setCheckpointer(Checkpointer.NULL); |
| |
| ledgers.add(1L); |
| ledgers.add(2L); |
| ledgers.add(3L); |
| storage.setMasterKey(1, key); |
| storage.setMasterKey(2, key); |
| storage.setMasterKey(3, key); |
| storage.addEntry(genEntry(1, 1, ENTRY_SIZE)); |
| storage.addEntry(genEntry(2, 1, ENTRY_SIZE)); |
| storage.addEntry(genEntry(2, 2, ENTRY_SIZE)); |
| storage.addEntry(genEntry(3, 2, ENTRY_SIZE)); |
| storage.flush(); |
| storage.shutdown(); |
| |
| assertTrue("Log should exist", log0.exists()); |
| ledgers.remove(2L); |
| ledgers.remove(3L); |
| |
| storage = new InterleavedLedgerStorage(); |
| storage.initialize( |
| conf, |
| manager, |
| dirs, dirs, |
| NullStatsLogger.INSTANCE, |
| UnpooledByteBufAllocator.DEFAULT); |
| storage.setCheckpointSource(checkpointSource); |
| storage.setCheckpointer(Checkpointer.NULL); |
| |
| storage.start(); |
| for (int i = 0; i < 10; i++) { |
| if (!log0.exists()) { |
| break; |
| } |
| Thread.sleep(1000); |
| storage.entryLogger.flush(); // simulate sync thread |
| } |
| assertFalse("Log shouldnt exist", log0.exists()); |
| |
| ledgers.add(4L); |
| storage.setMasterKey(4, key); |
| storage.addEntry(genEntry(4, 1, ENTRY_SIZE)); // force ledger 1 page to flush |
| storage.shutdown(); |
| |
| storage = new InterleavedLedgerStorage(); |
| storage.initialize( |
| conf, |
| manager, |
| dirs, |
| dirs, |
| NullStatsLogger.INSTANCE, |
| UnpooledByteBufAllocator.DEFAULT); |
| storage.setCheckpointSource(checkpointSource); |
| storage.setCheckpointer(Checkpointer.NULL); |
| |
| storage.getEntry(1, 1); // entry should exist |
| } |
| |
| @Test |
| public void testCancelledCompactionWhenShuttingDown() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| // change compaction in low throughput |
| // restart bookies |
| restartBookies(c -> { |
| c.setIsThrottleByBytes(true); |
| c.setCompactionRateByBytes(ENTRY_SIZE / 1000); |
| c.setMinorCompactionThreshold(0.2f); |
| c.setMajorCompactionThreshold(0.5f); |
| return c; |
| }); |
| |
| // remove ledger2 and ledger3 |
| // so entry log 1 and 2 would have ledger1 entries left |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| |
| getGCThread().triggerGC(true, false, false); |
| getGCThread().throttler.cancelledAcquire(); |
| waitUntilTrue(() -> { |
| try { |
| return getGCThread().compacting.get(); |
| } catch (Exception e) { |
| fail("Get GC thread failed"); |
| } |
| return null; |
| }, () -> "Not attempting to complete", 10000, 200); |
| |
| getGCThread().shutdown(); |
| // after garbage collection shutdown, compaction should be cancelled when acquire permits |
| // and GC running flag should be false. |
| assertFalse(getGCThread().running); |
| |
| } |
| |
| private void waitUntilTrue(Supplier<Boolean> condition, |
| Supplier<String> msg, |
| long waitTime, |
| long pause) throws InterruptedException { |
| long startTime = System.currentTimeMillis(); |
| while (true) { |
| if (condition.get()) { |
| return; |
| } |
| if (System.currentTimeMillis() > startTime + waitTime) { |
| fail(msg.get()); |
| } |
| Thread.sleep(Math.min(waitTime, pause)); |
| } |
| } |
| |
| private LedgerManager getLedgerManager(final Set<Long> ledgers) { |
| LedgerManager manager = new LedgerManager() { |
| @Override |
| public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long lid, |
| LedgerMetadata metadata) { |
| unsupported(); |
| return null; |
| } |
| @Override |
| public CompletableFuture<Void> removeLedgerMetadata(long ledgerId, Version version) { |
| unsupported(); |
| return null; |
| } |
| @Override |
| public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) { |
| unsupported(); |
| return null; |
| } |
| @Override |
| public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, |
| LedgerMetadata metadata, |
| Version currentVersion) { |
| unsupported(); |
| return null; |
| } |
| @Override |
| public void asyncProcessLedgers(Processor<Long> processor, |
| AsyncCallback.VoidCallback finalCb, |
| Object context, int successRc, int failureRc) { |
| unsupported(); |
| } |
| @Override |
| public void registerLedgerMetadataListener(long ledgerId, |
| LedgerMetadataListener listener) { |
| unsupported(); |
| } |
| @Override |
| public void unregisterLedgerMetadataListener(long ledgerId, |
| LedgerMetadataListener listener) { |
| unsupported(); |
| } |
| @Override |
| public void close() throws IOException {} |
| |
| void unsupported() { |
| LOG.error("Unsupported operation called", new Exception()); |
| throw new RuntimeException("Unsupported op"); |
| } |
| |
| @Override |
| public LedgerRangeIterator getLedgerRanges(long zkOpTimeoutMs) { |
| final AtomicBoolean hasnext = new AtomicBoolean(true); |
| return new LedgerManager.LedgerRangeIterator() { |
| @Override |
| public boolean hasNext() throws IOException { |
| return hasnext.get(); |
| } |
| @Override |
| public LedgerManager.LedgerRange next() throws IOException { |
| hasnext.set(false); |
| return new LedgerManager.LedgerRange(ledgers); |
| } |
| }; |
| } |
| }; |
| return manager; |
| } |
| |
| /** |
| * Test that compaction should execute silently when there is no entry logs |
| * to compact. {@see https://issues.apache.org/jira/browse/BOOKKEEPER-700} |
| */ |
| @Test |
| public void testWhenNoLogsToCompact() throws Exception { |
| tearDown(); // I dont want the test infrastructure |
| ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| File tmpDir = tmpDirs.createNew("bkTest", ".dir"); |
| File curDir = BookieImpl.getCurrentDirectory(tmpDir); |
| BookieImpl.checkDirectoryStructure(curDir); |
| conf.setLedgerDirNames(new String[] { tmpDir.toString() }); |
| |
| LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| final Set<Long> ledgers = Collections |
| .newSetFromMap(new ConcurrentHashMap<Long, Boolean>()); |
| LedgerManager manager = getLedgerManager(ledgers); |
| CheckpointSource checkpointSource = new CheckpointSource() { |
| |
| @Override |
| public Checkpoint newCheckpoint() { |
| return null; |
| } |
| |
| @Override |
| public void checkpointComplete(Checkpoint checkpoint, |
| boolean compact) throws IOException { |
| } |
| }; |
| InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); |
| storage.initialize( |
| conf, |
| manager, |
| dirs, |
| dirs, |
| NullStatsLogger.INSTANCE, |
| UnpooledByteBufAllocator.DEFAULT); |
| storage.setCheckpointSource(checkpointSource); |
| storage.setCheckpointer(Checkpointer.NULL); |
| |
| double threshold = 0.1; |
| long limit = 0; |
| |
| // shouldn't throw exception |
| storage.gcThread.doCompactEntryLogs(threshold, limit); |
| } |
| |
| private ByteBuf genEntry(long ledger, long entry, int size) { |
| ByteBuf bb = Unpooled.buffer(size); |
| bb.writeLong(ledger); |
| bb.writeLong(entry); |
| while (bb.isWritable()) { |
| bb.writeByte((byte) 0xFF); |
| } |
| return bb; |
| } |
| |
| /** |
| * Suspend garbage collection when suspendMajor/suspendMinor is set. |
| */ |
| @Test |
| public void testSuspendGarbageCollection() throws Exception { |
| ServerConfiguration conf = newServerConfiguration(); |
| conf.setGcWaitTime(500); |
| conf.setMinorCompactionInterval(1); |
| conf.setMajorCompactionInterval(2); |
| conf.setMajorCompactionMaxTimeMillis(5000); |
| runFunctionWithLedgerManagerFactory(conf, lmf -> { |
| try (LedgerManager lm = lmf.newLedgerManager()) { |
| testSuspendGarbageCollection(conf, lm); |
| } catch (Exception e) { |
| throw new UncheckedExecutionException(e.getMessage(), e); |
| } |
| return null; |
| }); |
| } |
| |
| private void testSuspendGarbageCollection(ServerConfiguration conf, |
| LedgerManager lm) throws Exception { |
| LedgerDirsManager dirManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), |
| new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); |
| CheckpointSource cp = new CheckpointSource() { |
| @Override |
| public Checkpoint newCheckpoint() { |
| // Do nothing. |
| return null; |
| } |
| |
| @Override |
| public void checkpointComplete(Checkpoint checkPoint, boolean compact) |
| throws IOException { |
| // Do nothing. |
| } |
| }; |
| for (File journalDir : conf.getJournalDirs()) { |
| BookieImpl.checkDirectoryStructure(journalDir); |
| } |
| for (File dir : dirManager.getAllLedgerDirs()) { |
| BookieImpl.checkDirectoryStructure(dir); |
| } |
| InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); |
| TestStatsProvider stats = new TestStatsProvider(); |
| storage.initialize( |
| conf, |
| lm, |
| dirManager, |
| dirManager, |
| stats.getStatsLogger("storage"), |
| UnpooledByteBufAllocator.DEFAULT); |
| storage.setCheckpointSource(cp); |
| storage.setCheckpointer(Checkpointer.NULL); |
| |
| storage.start(); |
| |
| int majorCompactions = stats.getCounter("storage.gc." + MAJOR_COMPACTION_COUNT).get().intValue(); |
| int minorCompactions = stats.getCounter("storage.gc." + MINOR_COMPACTION_COUNT).get().intValue(); |
| Thread.sleep(3 * (conf.getMajorCompactionInterval() * 1000 |
| + conf.getGcWaitTime() |
| + conf.getMajorCompactionMaxTimeMillis())); |
| assertTrue( |
| "Major compaction should have happened", |
| stats.getCounter("storage.gc." + MAJOR_COMPACTION_COUNT).get() > majorCompactions); |
| |
| // test suspend Major GC. |
| storage.gcThread.suspendMajorGC(); |
| |
| Thread.sleep(1000); |
| long startTime = System.currentTimeMillis(); |
| majorCompactions = stats.getCounter("storage.gc." + MAJOR_COMPACTION_COUNT).get().intValue(); |
| Thread.sleep(conf.getMajorCompactionInterval() * 1000 |
| + conf.getGcWaitTime()); |
| assertTrue("major compaction triggered while suspended", |
| storage.gcThread.lastMajorCompactionTime < startTime); |
| assertTrue("major compaction triggered while suspended", |
| stats.getCounter("storage.gc." + MAJOR_COMPACTION_COUNT).get() == majorCompactions); |
| |
| // test suspend Major GC. |
| Thread.sleep(conf.getMinorCompactionInterval() * 1000 |
| + conf.getGcWaitTime()); |
| assertTrue( |
| "Minor compaction should have happened", |
| stats.getCounter("storage.gc." + MINOR_COMPACTION_COUNT).get() > minorCompactions); |
| |
| // test suspend Minor GC. |
| storage.gcThread.suspendMinorGC(); |
| |
| Thread.sleep(1000); |
| startTime = System.currentTimeMillis(); |
| minorCompactions = stats.getCounter("storage.gc." + MINOR_COMPACTION_COUNT).get().intValue(); |
| Thread.sleep(conf.getMajorCompactionInterval() * 1000 |
| + conf.getGcWaitTime()); |
| assertTrue("minor compaction triggered while suspended", |
| storage.gcThread.lastMinorCompactionTime < startTime); |
| assertTrue("minor compaction triggered while suspended", |
| stats.getCounter("storage.gc." + MINOR_COMPACTION_COUNT).get() == minorCompactions); |
| |
| // test resume |
| storage.gcThread.resumeMinorGC(); |
| storage.gcThread.resumeMajorGC(); |
| |
| Thread.sleep((conf.getMajorCompactionInterval() + conf.getMinorCompactionInterval()) * 1000 |
| + (conf.getGcWaitTime() * 2)); |
| assertTrue( |
| "Major compaction should have happened", |
| stats.getCounter("storage.gc." + MAJOR_COMPACTION_COUNT).get() > majorCompactions); |
| assertTrue( |
| "Minor compaction should have happened", |
| stats.getCounter("storage.gc." + MINOR_COMPACTION_COUNT).get() > minorCompactions); |
| assertTrue( |
| "gcThreadRunttime should be non-zero", |
| stats.getOpStatsLogger("storage.gc." + THREAD_RUNTIME).getSuccessCount() > 0); |
| |
| } |
| |
| @Test |
| public void testRecoverIndexWhenIndexIsPartiallyFlush() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(3, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // restart bookies |
| restartBookies(c -> { |
| // disable compaction |
| c.setMinorCompactionThreshold(0.0f); |
| c.setMajorCompactionThreshold(0.0f); |
| c.setGcWaitTime(600000); |
| return c; |
| }); |
| |
| |
| |
| BookieImpl bookie = ((BookieImpl) serverByIndex(0).getBookie()); |
| InterleavedLedgerStorage storage = (InterleavedLedgerStorage) bookie.ledgerStorage; |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| |
| MockTransactionalEntryLogCompactor partialCompactionWorker = new MockTransactionalEntryLogCompactor( |
| ((InterleavedLedgerStorage) bookie.getLedgerStorage()).gcThread); |
| |
| for (long logId = 0; logId < 3; logId++) { |
| EntryLogMetadata meta = storage.entryLogger.getEntryLogMetadata(logId); |
| partialCompactionWorker.compactWithIndexFlushFailure(meta); |
| } |
| |
| // entry logs ([0,1,2].log) should not be compacted because of partial flush throw IOException |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertTrue("Entry log file ([0,1,2].log should not be compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // entries should be available |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| |
| // But we should see .compacted file with index flush failure |
| assertEquals(findCompactedEntryLogFiles().size(), 3); |
| |
| // Now try to recover those flush failed index files |
| partialCompactionWorker.cleanUpAndRecover(); |
| |
| // There should be no .compacted files after recovery |
| assertEquals(findCompactedEntryLogFiles().size(), 0); |
| |
| // compaction worker should recover partial flushed index and delete [0,1,2].log |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertFalse("Entry log file ([0,1,2].log should have been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); |
| } |
| |
| // even entry log files are removed, we still can access entries for ledger1 |
| // since those entries has been compacted to new entry log |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| } |
| |
| @Test |
| public void testCompactionFailureShouldNotResultInDuplicatedData() throws Exception { |
| // prepare data |
| LedgerHandle[] lhs = prepareData(5, false); |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| // restart bookies |
| restartBookies(c -> { |
| // disable compaction |
| c.setMinorCompactionThreshold(0.0f); |
| c.setMajorCompactionThreshold(0.0f); |
| c.setUseTransactionalCompaction(true); |
| return c; |
| }); |
| |
| // remove ledger2 and ledger3 |
| bkc.deleteLedger(lhs[1].getId()); |
| bkc.deleteLedger(lhs[2].getId()); |
| |
| LOG.info("Finished deleting the ledgers contains most entries."); |
| Thread.sleep(baseConf.getMajorCompactionInterval() * 1000 |
| + baseConf.getGcWaitTime()); |
| BookieImpl bookie = (BookieImpl) serverByIndex(0).getBookie(); |
| InterleavedLedgerStorage storage = (InterleavedLedgerStorage) bookie.ledgerStorage; |
| |
| List<File> ledgerDirs = bookie.getLedgerDirsManager().getAllLedgerDirs(); |
| List<Long> usageBeforeCompaction = new ArrayList<>(); |
| ledgerDirs.forEach(file -> usageBeforeCompaction.add(getDirectorySpaceUsage(file))); |
| |
| MockTransactionalEntryLogCompactor partialCompactionWorker = new MockTransactionalEntryLogCompactor( |
| ((InterleavedLedgerStorage) bookie.ledgerStorage).gcThread); |
| |
| for (long logId = 0; logId < 5; logId++) { |
| EntryLogMetadata meta = storage.entryLogger.getEntryLogMetadata(logId); |
| partialCompactionWorker.compactWithLogFlushFailure(meta); |
| } |
| |
| // entry logs ([0-4].log) should not be compacted because of failure in flush compaction log |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertTrue("Entry log file ([0,1,2].log should not be compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2, 3, 4)); |
| } |
| // even entry log files are removed, we still can access entries for ledger1 |
| // since those entries has been compacted to new entry log |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| |
| List<Long> freeSpaceAfterCompactionFailed = new ArrayList<>(); |
| ledgerDirs.forEach(file -> freeSpaceAfterCompactionFailed.add(getDirectorySpaceUsage(file))); |
| |
| // No extra data is generated after compaction fail |
| for (int i = 0; i < usageBeforeCompaction.size(); i++) { |
| assertEquals(usageBeforeCompaction.get(i), freeSpaceAfterCompactionFailed.get(i)); |
| } |
| |
| |
| // restart bookies |
| restartBookies(c -> { |
| // now enable normal compaction |
| c.setMajorCompactionThreshold(0.5f); |
| c.setMajorCompactionMaxTimeMillis(5000); |
| return c; |
| }); |
| |
| getGCThread().enableForceGC(); |
| getGCThread().triggerGC().get(); |
| |
| Thread.sleep(confByIndex(0).getMajorCompactionInterval() * 1000 |
| + confByIndex(0).getGcWaitTime() |
| + confByIndex(0).getMajorCompactionMaxTimeMillis()); |
| // compaction worker should compact [0-4].log |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| assertFalse("Entry log file ([0,1,2].log should have been compacted in ledgerDirectory: " |
| + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2, 3, 4)); |
| } |
| |
| // even entry log files are removed, we still can access entries for ledger1 |
| // since those entries has been compacted to new entry log |
| verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); |
| } |
| |
| private long getDirectorySpaceUsage(File dir) { |
| long size = 0; |
| for (File file : dir.listFiles()) { |
| size += file.length(); |
| } |
| return size; |
| } |
| |
| private Set<File> findCompactedEntryLogFiles() throws Exception { |
| Set<File> compactedLogFiles = new HashSet<>(); |
| for (File ledgerDirectory : bookieLedgerDirs()) { |
| File[] files = BookieImpl.getCurrentDirectory(ledgerDirectory).listFiles( |
| file -> file.getName().endsWith(COMPACTED_SUFFIX)); |
| if (files != null) { |
| Collections.addAll(compactedLogFiles, files); |
| } |
| } |
| return compactedLogFiles; |
| } |
| |
| private static class MockTransactionalEntryLogCompactor extends TransactionalEntryLogCompactor { |
| |
| public MockTransactionalEntryLogCompactor(GarbageCollectorThread gcThread) { |
| super(gcThread.conf, |
| gcThread.entryLogger, |
| gcThread.ledgerStorage, |
| (long entry) -> { |
| try { |
| gcThread.removeEntryLog(entry); |
| } catch (EntryLogMetadataMapException e) { |
| LOG.warn("Failed to remove entry-log metadata {}", entry, e); |
| } |
| }); |
| } |
| |
| synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) throws IOException { |
| LOG.info("Compacting entry log {}.", metadata.getEntryLogId()); |
| CompactionEntryLog compactionLog = entryLogger.newCompactionLog(metadata.getEntryLogId()); |
| |
| CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog); |
| if (!scanEntryLog.run()) { |
| LOG.info("Compaction for {} end in ScanEntryLogPhase.", metadata.getEntryLogId()); |
| return; |
| } |
| CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(compactionLog); |
| if (!flushCompactionLog.run()) { |
| LOG.info("Compaction for {} end in FlushCompactionLogPhase.", metadata.getEntryLogId()); |
| return; |
| } |
| CompactionPhase partialFlushIndexPhase = new PartialFlushIndexPhase(compactionLog); |
| if (!partialFlushIndexPhase.run()) { |
| LOG.info("Compaction for {} end in PartialFlushIndexPhase.", metadata.getEntryLogId()); |
| return; |
| } |
| logRemovalListener.removeEntryLog(metadata.getEntryLogId()); |
| LOG.info("Compacted entry log : {}.", metadata.getEntryLogId()); |
| } |
| |
| synchronized void compactWithLogFlushFailure(EntryLogMetadata metadata) throws IOException { |
| LOG.info("Compacting entry log {}", metadata.getEntryLogId()); |
| CompactionEntryLog compactionLog = entryLogger.newCompactionLog(metadata.getEntryLogId()); |
| |
| CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog); |
| if (!scanEntryLog.run()) { |
| LOG.info("Compaction for {} end in ScanEntryLogPhase.", metadata.getEntryLogId()); |
| return; |
| } |
| CompactionPhase logFlushFailurePhase = new LogFlushFailurePhase(compactionLog); |
| if (!logFlushFailurePhase.run()) { |
| LOG.info("Compaction for {} end in FlushCompactionLogPhase.", metadata.getEntryLogId()); |
| return; |
| } |
| CompactionPhase updateIndex = new UpdateIndexPhase(compactionLog); |
| if (!updateIndex.run()) { |
| LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId()); |
| return; |
| } |
| logRemovalListener.removeEntryLog(metadata.getEntryLogId()); |
| LOG.info("Compacted entry log : {}.", metadata.getEntryLogId()); |
| } |
| |
| private class PartialFlushIndexPhase extends UpdateIndexPhase { |
| |
| public PartialFlushIndexPhase(CompactionEntryLog compactionLog) { |
| super(compactionLog); |
| } |
| |
| @Override |
| void start() throws IOException { |
| compactionLog.makeAvailable(); |
| assertTrue(offsets.size() > 1); |
| // only flush index for one entry location |
| EntryLocation el = offsets.get(0); |
| ledgerStorage.updateEntriesLocations(offsets); |
| ledgerStorage.flushEntriesLocationsIndex(); |
| throw new IOException("Flush ledger index encounter exception"); |
| } |
| } |
| |
| private class LogFlushFailurePhase extends FlushCompactionLogPhase { |
| |
| LogFlushFailurePhase(CompactionEntryLog compactionEntryLog) { |
| super(compactionEntryLog); |
| } |
| |
| @Override |
| void start() throws IOException { |
| // flush the current compaction log |
| compactionLog.flush(); |
| throw new IOException("Encounter IOException when trying to flush compaction log"); |
| } |
| } |
| } |
| |
| } |