blob: e3342808ada253528ec096cef1af1edb4d60903b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
/**
* Test the checkpoint logic of {@link DbLedgerStorage}.
*/
@Slf4j
public class CheckpointOnNewLedgersTest {
@Rule
public final TemporaryFolder testDir = new TemporaryFolder();
private ServerConfiguration conf;
private BookieImpl bookie;
private CountDownLatch getLedgerDescCalledLatch;
private CountDownLatch getLedgerDescWaitLatch;
@Before
public void setup() throws Exception {
File bkDir = testDir.newFolder("dbLedgerStorageCheckpointTest");
File curDir = BookieImpl.getCurrentDirectory(bkDir);
BookieImpl.checkDirectoryStructure(curDir);
int gcWaitTime = 1000;
conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(gcWaitTime);
conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
conf.setJournalDirsName(new String[] { bkDir.toString() });
conf.setLedgerDirNames(new String[] { bkDir.toString() });
conf.setEntryLogSizeLimit(10 * 1024);
bookie = spy(new BookieImpl(conf));
bookie.start();
getLedgerDescCalledLatch = new CountDownLatch(1);
getLedgerDescWaitLatch = new CountDownLatch(1);
// spy `getLedgerForEntry`
doAnswer(invocationOnMock -> {
ByteBuf entry = invocationOnMock.getArgument(0);
long ledgerId = entry.getLong(entry.readerIndex());
LedgerDescriptor ld = (LedgerDescriptor) invocationOnMock.callRealMethod();
if (ledgerId % 2 == 1) {
getLedgerDescCalledLatch.countDown();
getLedgerDescWaitLatch.await();
}
return ld;
}).when(bookie).getLedgerForEntry(
any(ByteBuf.class),
any(byte[].class));
}
@After
public void teardown() throws Exception {
if (null != bookie) {
bookie.shutdown();
}
}
private static ByteBuf createByteBuf(long ledgerId, long entryId, int entrySize) {
byte[] data = new byte[entrySize];
ThreadLocalRandom.current().nextBytes(data);
ByteBuf buffer = Unpooled.wrappedBuffer(data);
buffer.writerIndex(0);
buffer.writeLong(ledgerId);
buffer.writeLong(entryId);
buffer.writeLong(entryId - 1); // lac
buffer.writerIndex(entrySize);
return buffer;
}
@Test
public void testCheckpoint() throws Exception {
int entrySize = 1024;
long l1 = 1L;
long l2 = 2L;
final CountDownLatch writeL1Latch = new CountDownLatch(1);
Thread t1 = new Thread(() -> {
ByteBuf entry = createByteBuf(l1, 0L, entrySize);
try {
bookie.addEntry(
entry,
false,
(rc, ledgerId, entryId, addr, ctx) -> writeL1Latch.countDown(),
null,
new byte[0]
);
} catch (Exception e) {
log.info("Failed to write entry to l1", e);
}
}, "ledger-1-writer");
t1.start();
// wait until the ledger desc is opened
getLedgerDescCalledLatch.await();
LastLogMark logMark = bookie.journals.get(0).getLastLogMark().markLog();
// keep write entries to l2 to trigger entry log rolling to checkpoint
int numEntries = 10;
final CountDownLatch writeL2Latch = new CountDownLatch(numEntries);
for (int i = 0; i < numEntries; i++) {
ByteBuf entry = createByteBuf(l2, i, entrySize);
bookie.addEntry(
entry,
false,
(rc, ledgerId, entryId, addr, ctx) -> writeL2Latch.countDown(),
null,
new byte[0]);
}
writeL2Latch.await();
// wait until checkpoint to complete and journal marker is rolled.
bookie.syncThread.getExecutor().submit(() -> {}).get();
log.info("Wait until checkpoint is completed");
// the journal mark is rolled.
LastLogMark newLogMark = bookie.journals.get(0).getLastLogMark().markLog();
assertTrue(newLogMark.getCurMark().compare(logMark.getCurMark()) > 0);
// resume l1-writer to continue writing the entries
getLedgerDescWaitLatch.countDown();
// wait until the l1 entry is written
writeL1Latch.await();
t1.join();
// construct a new bookie to simulate "bookie restart from crash"
Bookie newBookie = new BookieImpl(conf);
newBookie.start();
for (int i = 0; i < numEntries; i++) {
ByteBuf entry = newBookie.readEntry(l2, i);
assertNotNull(entry);
assertEquals(l2, entry.readLong());
assertEquals((long) i, entry.readLong());
entry.release();
}
ByteBuf entry = newBookie.readEntry(l1, 0L);
assertNotNull(entry);
assertEquals(l1, entry.readLong());
assertEquals(0L, entry.readLong());
entry.release();
newBookie.shutdown();
}
}