blob: 6c4cdb311a7fd8f81c78ef3c3e44ef9988047b40 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.TestUtils;
import org.junit.Before;
/**
* Test BookieStorage with a threshold.
*/
public class BookieStorageThresholdTest extends BookKeeperClusterTestCase {
private static final int NUM_BOOKIES = 1;
private static final int NUM_ENTRIES = 100;
private static final int ENTRY_SIZE = 1024;
final String msg;
DigestType digestType = DigestType.CRC32;
public BookieStorageThresholdTest() {
super(NUM_BOOKIES);
// a dummy message
StringBuilder msgSB = new StringBuilder();
for (int i = 0; i < ENTRY_SIZE; i++) {
msgSB.append("a");
}
msg = msgSB.toString();
}
@Before
@Override
public void setUp() throws Exception {
// Set up the configuration properties needed.
baseConf.setEntryLogSizeLimit(NUM_ENTRIES * ENTRY_SIZE);
baseConf.setFlushInterval(500);
// setting very high intervals for GC intervals, so GC/compaction is not invoked by regular scheduler
baseConf.setGcWaitTime(60000);
baseConf.setMinorCompactionInterval(600000);
baseConf.setMajorCompactionInterval(700000);
baseConf.setEntryLogFilePreAllocationEnabled(false);
baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
// set isForceGCAllowWhenNoSpace to true, which will forceGC when a disk is full (or when all disks are full)
baseConf.setIsForceGCAllowWhenNoSpace(true);
// keep some lower value for DiskCheckInterval, so DiskChecker checks quite often
baseConf.setDiskCheckInterval(3000);
super.setUp();
}
LedgerHandle[] prepareData(int numEntryLogs) throws Exception {
// since an entry log file can hold at most 100 entries
// first ledger write 2 entries, which is less than low water mark
int num1 = 2;
// third ledger write more than high water mark entries
int num3 = (int) (NUM_ENTRIES * 0.7f);
// second ledger write remaining entries, which is higher than low water
// mark and less than high water mark
int num2 = NUM_ENTRIES - num3 - num1;
LedgerHandle[] lhs = new LedgerHandle[3];
for (int i = 0; i < 3; ++i) {
lhs[i] = bkc.createLedger(NUM_BOOKIES, NUM_BOOKIES, digestType, "".getBytes());
}
for (int n = 0; n < numEntryLogs; n++) {
for (int k = 0; k < num1; k++) {
lhs[0].addEntry(msg.getBytes());
}
for (int k = 0; k < num2; k++) {
lhs[1].addEntry(msg.getBytes());
}
for (int k = 0; k < num3; k++) {
lhs[2].addEntry(msg.getBytes());
}
}
return lhs;
}
/**
* A Threshold-based disk checker test.
*/
public class ThresholdTestDiskChecker extends DiskChecker {
final AtomicBoolean injectDiskOutOfSpaceException;
public ThresholdTestDiskChecker(float threshold, float warnThreshold) {
super(threshold, warnThreshold);
injectDiskOutOfSpaceException = new AtomicBoolean();
}
public void setInjectDiskOutOfSpaceException(boolean setValue) {
injectDiskOutOfSpaceException.set(setValue);
}
@Override
public float checkDir(File dir) throws DiskErrorException, DiskOutOfSpaceException, DiskWarnThresholdException {
if (injectDiskOutOfSpaceException.get()) {
throw new DiskOutOfSpaceException("Injected DiskOutOfSpaceException",
baseConf.getDiskUsageThreshold() + 2);
}
return super.checkDir(dir);
}
}
@FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1562")
public void testStorageThresholdCompaction() throws Exception {
stopAllBookies();
ServerConfiguration conf = newServerConfiguration();
File ledgerDir1 = createTempDir("ledger", "test1");
File ledgerDir2 = createTempDir("ledger", "test2");
File journalDir = createTempDir("journal", "test");
String[] ledgerDirNames = new String[]{
ledgerDir1.getPath(),
ledgerDir2.getPath()
};
conf.setLedgerDirNames(ledgerDirNames);
conf.setJournalDirName(journalDir.getPath());
BookieServer server = startBookie(conf);
bs.add(server);
bsConfs.add(conf);
BookieImpl bookie = (BookieImpl) server.getBookie();
// since we are going to set dependency injected dirsMonitor, so we need to shutdown
// the dirsMonitor which was created as part of the initialization of Bookie
bookie.dirsMonitor.shutdown();
LedgerDirsManager ledgerDirsManager = ((BookieImpl) bookie).getLedgerDirsManager();
// flag latches
final CountDownLatch diskWritable = new CountDownLatch(1);
final CountDownLatch diskFull = new CountDownLatch(1);
ledgerDirsManager.addLedgerDirsListener(new LedgerDirsListener() {
@Override
public void diskWritable(File disk) {
diskWritable.countDown();
}
@Override
public void diskFull(File disk) {
diskFull.countDown();
}
});
// Dependency Injected class
ThresholdTestDiskChecker thresholdTestDiskChecker = new ThresholdTestDiskChecker(
baseConf.getDiskUsageThreshold(), baseConf.getDiskUsageWarnThreshold());
bookie.dirsMonitor = new LedgerDirsMonitor(baseConf, thresholdTestDiskChecker,
Collections.singletonList(ledgerDirsManager));
// set the dirsMonitor and initiate/start it
bookie.dirsMonitor.init();
bookie.dirsMonitor.start();
// create ledgers and add fragments
LedgerHandle[] lhs = prepareData(3);
for (LedgerHandle lh : lhs) {
lh.close();
}
// delete ledger2 and ledger3
bkc.deleteLedger(lhs[1].getId());
bkc.deleteLedger(lhs[2].getId());
// validating that LedgerDirsListener are not triggered yet
assertTrue("Disk Full shouldn't have been triggered yet", diskFull.getCount() == 1);
assertTrue("Disk writable shouldn't have been triggered yet", diskWritable.getCount() == 1);
// set exception injection to true, so that next time when checkDir of DiskChecker (ThresholdTestDiskChecker) is
// called it will throw DiskOutOfSpaceException
thresholdTestDiskChecker.setInjectDiskOutOfSpaceException(true);
// now we are waiting for diskFull latch count to get to 0.
// we are waiting for diskCheckInterval period, so that next time when LedgerDirsMonitor monitors diskusage of
// its directories, it would get DiskOutOfSpaceException and hence diskFull of all LedgerDirsListener would be
// called.
diskFull.await(baseConf.getDiskCheckInterval() + 500, TimeUnit.MILLISECONDS);
// verifying that diskFull of all LedgerDirsListener are invoked, so countdown of diskFull should come down to 0
assertTrue("Disk Full should have been triggered", diskFull.getCount() == 0);
// making sure diskWritable of LedgerDirsListener are not invoked yet
assertTrue("Disk writable shouldn't have been triggered yet", diskWritable.getCount() == 1);
// waiting momentarily, because transition to Readonly mode happens asynchronously when there are no more
// writableLedgerDirs
Thread.sleep(500);
assertTrue("Bookie should be transitioned to ReadOnly", bookie.isReadOnly());
// since we set isForceGCAllowWhenNoSpace to true, when the disk is full (or when all disks are full) it does
// force GC.
// Because of getWritableLedgerDirsForNewLog, compaction would be able to create newlog and compact even though
// there are no writableLedgerDirs
for (File ledgerDir : bookie.getLedgerDirsManager().getAllLedgerDirs()) {
assertFalse("Found entry log file ([0,1,2].log. They should have been compacted" + ledgerDir,
TestUtils.hasLogFiles(ledgerDir.getParentFile(), true, 0, 1, 2));
}
try {
ledgerDirsManager.getWritableLedgerDirs();
fail("It is expected that there wont be any Writable LedgerDirs and getWritableLedgerDirs "
+ "is supposed to throw NoWritableLedgerDirException");
} catch (NoWritableLedgerDirException nowritableDirsException) {
}
// disable exception injection
thresholdTestDiskChecker.setInjectDiskOutOfSpaceException(false);
// now we are waiting for diskWritable latch count to get to 0.
// we are waiting for diskCheckInterval period, so that next time when LedgerDirsMonitor monitors diskusage of
// its directories, it would find writableledgerdirectory and hence diskWritable of all LedgerDirsListener would
// be called.
diskWritable.await(baseConf.getDiskCheckInterval() + 500, TimeUnit.MILLISECONDS);
// verifying that diskWritable of all LedgerDirsListener are invoked, so countdown of diskWritable should come
// down to 0
assertTrue("Disk writable should have been triggered", diskWritable.getCount() == 0);
// waiting momentarily, because transition to ReadWrite mode happens asynchronously when there is new
// writableLedgerDirectory
Thread.sleep(500);
assertFalse("Bookie should be transitioned to ReadWrite", bookie.isReadOnly());
}
}