blob: eaa7505d4b7ba85ea6af493c91df0a5fcb0f9473 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo;
import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.PortManager;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* LedgerStorageCheckpointTest.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(SyncThread.class)
@PowerMockIgnore("javax.*")
public class LedgerStorageCheckpointTest {
private static final Logger LOG = LoggerFactory
.getLogger(LedgerStorageCheckpointTest.class);
@Rule
public final TestName runtime = new TestName();
// ZooKeeper related variables
protected final ZooKeeperUtil zkUtil = new ZooKeeperUtil();
// BookKeeper related variables
protected final List<File> tmpDirs = new LinkedList<File>();
// ScheduledExecutorService used by SyncThread
MockExecutorController executorController;
@Before
public void setUp() throws Exception {
LOG.info("Setting up test {}", getClass());
PowerMockito.mockStatic(Executors.class);
try {
// start zookeeper service
startZKCluster();
} catch (Exception e) {
LOG.error("Error setting up", e);
throw e;
}
ScheduledExecutorService scheduledExecutorService = PowerMockito.mock(ScheduledExecutorService.class);
executorController = new MockExecutorController()
.controlSubmit(scheduledExecutorService)
.controlScheduleAtFixedRate(scheduledExecutorService, 10);
PowerMockito.when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
PowerMockito.when(Executors.newSingleThreadScheduledExecutor(any())).thenReturn(scheduledExecutorService);
}
@After
public void tearDown() throws Exception {
LOG.info("TearDown");
Exception tearDownException = null;
// stop zookeeper service
try {
stopZKCluster();
} catch (Exception e) {
LOG.error("Got Exception while trying to stop ZKCluster", e);
tearDownException = e;
}
// cleanup temp dirs
try {
cleanupTempDirs();
} catch (Exception e) {
LOG.error("Got Exception while trying to cleanupTempDirs", e);
tearDownException = e;
}
if (tearDownException != null) {
throw tearDownException;
}
}
/**
* Start zookeeper cluster.
*
* @throws Exception
*/
protected void startZKCluster() throws Exception {
zkUtil.startCluster();
}
/**
* Stop zookeeper cluster.
*
* @throws Exception
*/
protected void stopZKCluster() throws Exception {
zkUtil.killCluster();
}
protected void cleanupTempDirs() throws Exception {
for (File f : tmpDirs) {
FileUtils.deleteDirectory(f);
}
}
protected File createTempDir(String prefix, String suffix) throws IOException {
File dir = IOUtils.createTempDir(prefix, suffix);
tmpDirs.add(dir);
return dir;
}
private LogMark readLastMarkFile(File lastMarkFile) throws IOException {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark rolledLogMark = new LogMark();
FileInputStream fis = new FileInputStream(lastMarkFile);
int bytesRead = fis.read(buff);
fis.close();
if (bytesRead != 16) {
throw new IOException("Couldn't read enough bytes from lastMark." + " Wanted " + 16 + ", got " + bytesRead);
}
bb.clear();
rolledLogMark.readLogMark(bb);
return rolledLogMark;
}
/*
* In this testcase, InterleavedLedgerStorage is used and validate if the
* checkpoint is called for every flushinterval period.
*/
@Test
public void testPeriodicCheckpointForInterleavedLedgerStorage() throws Exception {
testPeriodicCheckpointForLedgerStorage(InterleavedLedgerStorage.class.getName());
}
/*
* In this testcase, SortedLedgerStorage is used and validate if the
* checkpoint is called for every flushinterval period.
*/
@Test
public void testPeriodicCheckpointForSortedLedgerStorage() throws Exception {
testPeriodicCheckpointForLedgerStorage(SortedLedgerStorage.class.getName());
}
public void testPeriodicCheckpointForLedgerStorage(String ledgerStorageClassName) throws Exception {
File tmpDir = createTempDir("DiskCheck", "test");
final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(5000)
.setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() })
.setAutoRecoveryDaemonEnabled(false)
.setFlushInterval(2000)
.setBookiePort(PortManager.nextFreePort())
// entrylog per ledger is enabled
.setEntryLogPerLedgerEnabled(true)
.setLedgerStorageClass(ledgerStorageClassName);
Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
// we know there is only one ledgerDir
File ledgerDir = BookieImpl.getCurrentDirectories(conf.getLedgerDirs())[0];
BookieServer server = new BookieServer(conf);
server.start();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
BookKeeper bkClient = new BookKeeper(clientConf);
int numOfLedgers = 2;
int numOfEntries = 5;
byte[] dataBytes = "data".getBytes();
for (int i = 0; i < numOfLedgers; i++) {
int ledgerIndex = i;
LedgerHandle handle = bkClient.createLedgerAdv((long) i, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(),
null);
for (int j = 0; j < numOfEntries; j++) {
handle.addEntry(j, dataBytes);
}
handle.close();
}
LastLogMark lastLogMarkAfterFirstSetOfAdds = ((BookieImpl) server.getBookie()).journals.get(0).getLastLogMark();
LogMark curMarkAfterFirstSetOfAdds = lastLogMarkAfterFirstSetOfAdds.getCurMark();
File lastMarkFile = new File(ledgerDir, "lastMark");
// lastMark file should be zero, because checkpoint hasn't happenend
LogMark logMarkFileBeforeCheckpoint = readLastMarkFile(lastMarkFile);
Assert.assertEquals("lastMarkFile before checkpoint should be zero", 0,
logMarkFileBeforeCheckpoint.compare(new LogMark()));
// wait for flushInterval for SyncThread to do next iteration of checkpoint
executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
/*
* since we have waited for more than flushInterval SyncThread should
* have checkpointed. if entrylogperledger is not enabled, then we
* checkpoint only when currentLog in EntryLogger is rotated. but if
* entrylogperledger is enabled, then we checkpoint for every
* flushInterval period
*/
Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened",
lastMarkFile.exists());
LastLogMark lastLogMarkAfterCheckpoint = ((BookieImpl) server.getBookie()).journals.get(0).getLastLogMark();
LogMark curMarkAfterCheckpoint = lastLogMarkAfterCheckpoint.getCurMark();
LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0,
rolledLogMark.compare(new LogMark()));
/*
* Curmark should be equal before and after checkpoint, because we didnt
* add new entries during this period
*/
Assert.assertTrue("Curmark should be equal before and after checkpoint",
curMarkAfterCheckpoint.compare(curMarkAfterFirstSetOfAdds) == 0);
/*
* Curmark after checkpoint should be equal to rolled logmark, because
* we checkpointed
*/
Assert.assertTrue("Curmark after first set of adds should be equal to rolled logmark",
curMarkAfterCheckpoint.compare(rolledLogMark) == 0);
// add more ledger/entries
for (int i = numOfLedgers; i < 2 * numOfLedgers; i++) {
int ledgerIndex = i;
LedgerHandle handle = bkClient.createLedgerAdv((long) i, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(),
null);
for (int j = 0; j < numOfEntries; j++) {
handle.addEntry(j, dataBytes);
}
handle.close();
}
// wait for flushInterval for SyncThread to do next iteration of checkpoint
executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
LastLogMark lastLogMarkAfterSecondSetOfAdds = ((BookieImpl) server.getBookie()).
journals.get(0).getLastLogMark();
LogMark curMarkAfterSecondSetOfAdds = lastLogMarkAfterSecondSetOfAdds.getCurMark();
rolledLogMark = readLastMarkFile(lastMarkFile);
/*
* Curmark after checkpoint should be equal to rolled logmark, because
* we checkpointed
*/
Assert.assertTrue("Curmark after second set of adds should be equal to rolled logmark",
curMarkAfterSecondSetOfAdds.compare(rolledLogMark) == 0);
server.shutdown();
bkClient.close();
}
/*
* In this testcase, InterleavedLedgerStorage is used, entrylogperledger is
* enabled and validate that when entrylog is rotated it doesn't do
* checkpoint.
*/
@Test
public void testCheckpointOfILSEntryLogIsRotatedWithELPLEnabled() throws Exception {
testCheckpointofILSWhenEntryLogIsRotated(true);
}
/*
* In this testcase, InterleavedLedgerStorage is used, entrylogperledger is
* not enabled and validate that when entrylog is rotated it does
* checkpoint.
*/
@Test
public void testCheckpointOfILSEntryLogIsRotatedWithELPLDisabled() throws Exception {
testCheckpointofILSWhenEntryLogIsRotated(false);
}
public void testCheckpointofILSWhenEntryLogIsRotated(boolean entryLogPerLedgerEnabled) throws Exception {
File tmpDir = createTempDir("DiskCheck", "test");
final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(5000)
.setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() })
.setAutoRecoveryDaemonEnabled(false)
//set very high period for flushInterval
.setFlushInterval(30000)
.setBookiePort(PortManager.nextFreePort())
// entrylog per ledger is enabled
.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled)
.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
// we know there is only one ledgerDir
File ledgerDir = BookieImpl.getCurrentDirectories(conf.getLedgerDirs())[0];
BookieServer server = new BookieServer(conf);
server.start();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
BookKeeper bkClient = new BookKeeper(clientConf);
InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().getLedgerStorage();
int numOfEntries = 5;
byte[] dataBytes = "data".getBytes();
long ledgerId = 10;
LedgerHandle handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
for (int j = 0; j < numOfEntries; j++) {
handle.addEntry(j, dataBytes);
}
handle.close();
// simulate rolling entrylog
((EntryLogManagerBase) ledgerStorage.getEntryLogger().getEntryLogManager()).createNewLog(ledgerId);
// sleep for a bit for checkpoint to do its task
executorController.advance(Duration.ofMillis(500));
File lastMarkFile = new File(ledgerDir, "lastMark");
LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
if (entryLogPerLedgerEnabled) {
Assert.assertEquals(
"rolledLogMark should be zero, since checkpoint"
+ "shouldn't have happened when entryLog is rotated",
0, rolledLogMark.compare(new LogMark()));
} else {
Assert.assertNotEquals("rolledLogMark shouldn't be zero, since checkpoint"
+ "should have happened when entryLog is rotated", 0, rolledLogMark.compare(new LogMark()));
}
bkClient.close();
server.shutdown();
}
/*
* In this testcase, SortedLedgerStorage is used, entrylogperledger is
* enabled and validate that when entrylog is rotated it doesn't do
* checkpoint.
*/
@Test
public void testCheckpointOfSLSEntryLogIsRotatedWithELPLEnabled() throws Exception {
testCheckpointOfSLSWhenEntryLogIsRotated(true);
}
/*
* In this testcase, SortedLedgerStorage is used, entrylogperledger is
* not enabled and validate that when entrylog is rotated it does
* checkpoint.
*/
@Test
public void testCheckpointOfSLSEntryLogIsRotatedWithELPLDisabled() throws Exception {
testCheckpointOfSLSWhenEntryLogIsRotated(false);
}
public void testCheckpointOfSLSWhenEntryLogIsRotated(boolean entryLogPerLedgerEnabled) throws Exception {
File tmpDir = createTempDir("DiskCheck", "test");
final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(5000)
.setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() })
.setAutoRecoveryDaemonEnabled(false)
//set very high period for flushInterval
.setFlushInterval(30000)
.setBookiePort(PortManager.nextFreePort())
// entrylog per ledger is enabled
.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled)
.setLedgerStorageClass(SortedLedgerStorage.class.getName())
// set very low skipListSizeLimit and entryLogSizeLimit to simulate log file rotation
.setSkipListSizeLimit(1 * 1000 * 1000)
.setEntryLogSizeLimit(2 * 1000 * 1000);
Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
// we know there is only one ledgerDir
File ledgerDir = BookieImpl.getCurrentDirectories(conf.getLedgerDirs())[0];
BookieServer server = new BookieServer(conf);
server.start();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
BookKeeper bkClient = new BookKeeper(clientConf);
Random rand = new Random();
byte[] dataBytes = new byte[10 * 1000];
rand.nextBytes(dataBytes);
int numOfEntries = ((int) conf.getEntryLogSizeLimit() + (100 * 1000)) / dataBytes.length;
LedgerHandle handle = bkClient.createLedgerAdv(10, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
for (int j = 0; j < numOfEntries; j++) {
handle.addEntry(j, dataBytes);
}
handle.close();
// sleep for a bit for checkpoint to do its task
executorController.advance(Duration.ofMillis(500));
File lastMarkFile = new File(ledgerDir, "lastMark");
LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
if (entryLogPerLedgerEnabled) {
Assert.assertEquals(
"rolledLogMark should be zero, since checkpoint"
+ "shouldn't have happened when entryLog is rotated",
0, rolledLogMark.compare(new LogMark()));
} else {
Assert.assertNotEquals("rolledLogMark shouldn't be zero, since checkpoint"
+ "should have happened when entryLog is rotated", 0, rolledLogMark.compare(new LogMark()));
}
bkClient.close();
server.shutdown();
}
/*
* in this method it checks if entryLogPerLedger is enabled, then
* InterLeavedLedgerStorage.checkpoint flushes current activelog and flushes
* all rotatedlogs and closes them.
*
*/
@Test
public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exception {
File tmpDir = createTempDir("DiskCheck", "test");
final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(5000)
.setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() })
.setAutoRecoveryDaemonEnabled(false)
//set flushInterval
.setFlushInterval(3000)
.setBookiePort(PortManager.nextFreePort())
// entrylog per ledger is enabled
.setEntryLogPerLedgerEnabled(true)
.setLedgerStorageClass(InterleavedLedgerStorage.class.getName())
// set setFlushIntervalInBytes to some very high number
.setFlushIntervalInBytes(10000000);
Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
// we know there is only one ledgerDir
File ledgerDir = BookieImpl.getCurrentDirectories(conf.getLedgerDirs())[0];
BookieServer server = new BookieServer(conf);
server.start();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
BookKeeper bkClient = new BookKeeper(clientConf);
InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().getLedgerStorage();
EntryLogger entryLogger = ledgerStorage.entryLogger;
EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
.getEntryLogManager();
Random rand = new Random();
int numOfEntries = 5;
byte[] dataBytes = "data".getBytes();
int numOfLedgers = 3;
long[] ledgerIds = new long[numOfLedgers];
LedgerHandle handle;
for (int i = 0; i < numOfLedgers; i++) {
ledgerIds[i] = rand.nextInt(100000) + 1;
handle = bkClient.createLedgerAdv(ledgerIds[i], 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
for (int j = 0; j < numOfEntries; j++) {
handle.addEntry(j, dataBytes);
}
// simulate rolling entrylog
entryLogManager.createNewLog(ledgerIds[i]);
}
Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = entryLogManager.getCopyOfCurrentLogs();
for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
currentLogWithDirInfo.getLogChannel().getUnpersistedBytes());
}
Assert.assertNotEquals("There should be logChannelsToFlush", 0,
entryLogManager.getRotatedLogChannels().size());
/*
* wait for atleast flushInterval period, so that checkpoint can happen.
*/
executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
/*
* since checkpoint happenend, there shouldn't be any logChannelsToFlush
* and bytesWrittenSinceLastFlush should be zero.
*/
List<BufferedLogChannel> copyOfRotatedLogChannels = entryLogManager.getRotatedLogChannels();
Assert.assertTrue("There shouldn't be logChannelsToFlush",
((copyOfRotatedLogChannels == null) || (copyOfRotatedLogChannels.size() == 0)));
copyOfCurrentLogsWithDirInfo = entryLogManager.getCopyOfCurrentLogs();
for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
currentLogWithDirInfo.getLogChannel().getUnpersistedBytes());
}
}
static class MockInterleavedLedgerStorage extends InterleavedLedgerStorage {
@Override
public void shutdown() {
// During BookieServer shutdown this method will be called
// and we want it to be noop.
// do nothing
}
@Override
public synchronized void flush() throws IOException {
// this method will be called by SyncThread.shutdown.
// During BookieServer shutdown we want this method to be noop
// do nothing
}
}
/*
* This is complete end-to-end scenario.
*
* 1) This testcase uses MockInterleavedLedgerStorage, which extends
* InterleavedLedgerStorage but doesn't do anything when Bookie is shutdown.
* This is needed to simulate Bookie crash.
* 2) entryLogPerLedger is enabled
* 3) ledgers are created and entries are added.
* 4) wait for flushInterval period for checkpoint to complete
* 5) simulate bookie crash
* 6) delete the journal files and lastmark file
* 7) Now restart the Bookie
* 8) validate that the entries which were written can be read successfully.
*/
@Test
public void testCheckPointForEntryLoggerWithMultipleActiveEntryLogs() throws Exception {
File tmpDir = createTempDir("DiskCheck", "test");
final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(5000)
.setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() })
.setAutoRecoveryDaemonEnabled(false)
.setFlushInterval(3000)
.setBookiePort(PortManager.nextFreePort())
// entrylog per ledger is enabled
.setEntryLogPerLedgerEnabled(true)
.setLedgerStorageClass(MockInterleavedLedgerStorage.class.getName());
Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
// we know there is only one ledgerDir
File ledgerDir = BookieImpl.getCurrentDirectories(conf.getLedgerDirs())[0];
BookieServer server = new BookieServer(conf);
server.start();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
final BookKeeper bkClient = new BookKeeper(clientConf);
int numOfLedgers = 12;
int numOfEntries = 100;
byte[] dataBytes = "data".getBytes();
AtomicBoolean receivedExceptionForAdd = new AtomicBoolean(false);
LongStream.range(0, numOfLedgers).parallel().mapToObj((ledgerId) -> {
LedgerHandle handle = null;
try {
handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
} catch (BKException | InterruptedException exc) {
receivedExceptionForAdd.compareAndSet(false, true);
LOG.error("Got Exception while trying to create LedgerHandle for ledgerId: " + ledgerId, exc);
}
return handle;
}).forEach((writeHandle) -> {
IntStream.range(0, numOfEntries).forEach((entryId) -> {
try {
writeHandle.addEntry(entryId, dataBytes);
} catch (BKException | InterruptedException exc) {
receivedExceptionForAdd.compareAndSet(false, true);
LOG.error("Got Exception while trying to AddEntry of ledgerId: " + writeHandle.getId()
+ " entryId: " + entryId, exc);
}
});
try {
writeHandle.close();
} catch (BKException | InterruptedException e) {
receivedExceptionForAdd.compareAndSet(false, true);
LOG.error("Got Exception while trying to close writeHandle of ledgerId: " + writeHandle.getId(), e);
}
});
Assert.assertFalse(
"There shouldn't be any exceptions while creating writeHandle and adding entries to writeHandle",
receivedExceptionForAdd.get());
executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
// since we have waited for more than flushInterval SyncThread should have checkpointed.
// if entrylogperledger is not enabled, then we checkpoint only when currentLog in EntryLogger
// is rotated. but if entrylogperledger is enabled, then we checkpoint for every flushInterval period
File lastMarkFile = new File(ledgerDir, "lastMark");
Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened",
lastMarkFile.exists());
LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0,
rolledLogMark.compare(new LogMark()));
bkClient.close();
// here we are calling shutdown, but MockInterleavedLedgerStorage shudown/flush
// methods are noop, so entrylogger is not flushed as part of this shutdown
// here we are trying to simulate Bookie crash, but there is no way to
// simulate bookie abrupt crash
server.shutdown();
// delete journal files and lastMark, to make sure that we are not reading from
// Journal file
File[] journalDirs = conf.getJournalDirs();
for (File journalDir : journalDirs) {
File journalDirectory = BookieImpl.getCurrentDirectory(journalDir);
List<Long> journalLogsId = Journal.listJournalIds(journalDirectory, null);
for (long journalId : journalLogsId) {
File journalFile = new File(journalDirectory, Long.toHexString(journalId) + ".txn");
journalFile.delete();
}
}
// we know there is only one ledgerDir
lastMarkFile = new File(ledgerDir, "lastMark");
lastMarkFile.delete();
// now we are restarting BookieServer
conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
server = new BookieServer(conf);
server.start();
BookKeeper newBKClient = new BookKeeper(clientConf);
// since Bookie checkpointed successfully before shutdown/crash,
// we should be able to read from entryLogs though journal is deleted
AtomicBoolean receivedExceptionForRead = new AtomicBoolean(false);
LongStream.range(0, numOfLedgers).parallel().forEach((ledgerId) -> {
try {
LedgerHandle lh = newBKClient.openLedger(ledgerId, DigestType.CRC32, "passwd".getBytes());
Enumeration<LedgerEntry> entries = lh.readEntries(0, numOfEntries - 1);
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
byte[] readData = entry.getEntry();
Assert.assertEquals("Ledger Entry Data should match", new String("data".getBytes()),
new String(readData));
}
lh.close();
} catch (BKException | InterruptedException e) {
receivedExceptionForRead.compareAndSet(false, true);
LOG.error("Got Exception while trying to read entries of ledger, ledgerId: " + ledgerId, e);
}
});
Assert.assertFalse("There shouldn't be any exceptions while creating readHandle and while reading"
+ "entries using readHandle", receivedExceptionForRead.get());
newBKClient.close();
server.shutdown();
}
}