blob: 48d103803552eeb8be550faece8a35cc4c206a69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test {@link SortedLedgerStorage}.
*/
@Slf4j
public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
@Data
@RequiredArgsConstructor
@ToString
@EqualsAndHashCode
private static class TestCheckpoint implements Checkpoint {
private final long offset;
@Override
public int compareTo(Checkpoint o) {
if (Checkpoint.MAX == o) {
return -1;
}
TestCheckpoint other = (TestCheckpoint) o;
return Long.compare(offset, other.offset);
}
}
@RequiredArgsConstructor
private static class TestCheckpointSource implements CheckpointSource {
private long currentOffset = 0;
void advanceOffset(long numBytes) {
currentOffset += numBytes;
}
@Override
public Checkpoint newCheckpoint() {
TestCheckpoint cp = new TestCheckpoint(currentOffset);
log.info("New checkpoint : {}", cp);
return cp;
}
@Override
public void checkpointComplete(Checkpoint checkpoint, boolean compact)
throws IOException {
log.info("Complete checkpoint : {}", checkpoint);
}
}
private SortedLedgerStorage storage;
private Checkpointer checkpointer;
private final LinkedBlockingQueue<Checkpoint> checkpoints;
private final TestCheckpointSource checkpointSrc = new TestCheckpointSource();
public SortedLedgerStorageCheckpointTest() {
super();
conf.setEntryLogSizeLimit(1024);
conf.setEntryLogFilePreAllocationEnabled(false);
this.checkpoints = new LinkedBlockingQueue<>();
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// initial checkpoint
this.storage = new SortedLedgerStorage();
this.checkpointer = new Checkpointer() {
@Override
public void startCheckpoint(Checkpoint checkpoint) {
storage.getScheduler().submit(() -> {
log.info("Checkpoint the storage at {}", checkpoint);
try {
storage.checkpoint(checkpoint);
checkpoints.add(checkpoint);
} catch (IOException e) {
log.error("Failed to checkpoint at {}", checkpoint, e);
}
});
}
@Override
public void start() {
// no-op
}
};
// if the SortedLedgerStorage need not to change bookie's state, pass StateManager==null is ok
this.storage.initialize(
conf,
mock(LedgerManager.class),
ledgerDirsManager,
ledgerDirsManager,
null,
checkpointSrc,
checkpointer,
NullStatsLogger.INSTANCE,
UnpooledByteBufAllocator.DEFAULT);
}
@After
@Override
public void tearDown() throws Exception {
if (null != storage) {
storage.shutdown();
}
super.tearDown();
}
ByteBuf prepareEntry(long ledgerId, long entryId) {
ByteBuf entry = Unpooled.buffer(4 * Long.BYTES);
// ledger id, entry id, lac
entry.writeLong(ledgerId);
entry.writeLong(entryId);
entry.writeLong(entryId - 1);
// data
entry.writeLong(entryId);
return entry;
}
@Test
public void testCheckpoint() throws Exception {
// memory table holds the first checkpoint, but it is not completed yet.
Checkpoint memtableCp = storage.memTable.kvmap.cp;
assertEquals(new TestCheckpoint(0), memtableCp);
// write entries into ledger storage
long lid = System.currentTimeMillis();
storage.setMasterKey(lid, new byte[0]);
for (int i = 0; i < 20; i++) {
storage.addEntry(prepareEntry(lid, i));
}
// simulate journal persists the entries in journal;
checkpointSrc.advanceOffset(100);
// memory table holds the first checkpoint, but it is not completed yet.
memtableCp = storage.memTable.kvmap.cp;
assertEquals(new TestCheckpoint(0), memtableCp);
// trigger a memtable flush
Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot());
storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
// wait for checkpoint to complete
checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(new TestCheckpoint(100), storage.memTable.kvmap.cp);
assertEquals(0, storage.memTable.kvmap.size());
}
@Test
public void testCheckpointAfterEntryLogRotated() throws Exception {
// memory table holds the first checkpoint, but it is not completed yet.
Checkpoint memtableCp = storage.memTable.kvmap.cp;
assertEquals(new TestCheckpoint(0), memtableCp);
// write entries into ledger storage
long lid = System.currentTimeMillis();
storage.setMasterKey(lid, new byte[0]);
for (int i = 0; i < 20; i++) {
storage.addEntry(prepareEntry(lid, i));
}
// simulate journal persists the entries in journal;
checkpointSrc.advanceOffset(100);
// memory table holds the first checkpoint, but it is not completed yet.
memtableCp = storage.memTable.kvmap.cp;
assertEquals(new TestCheckpoint(0), memtableCp);
assertEquals(20, storage.memTable.kvmap.size());
final CountDownLatch readyLatch = new CountDownLatch(1);
storage.getScheduler().submit(() -> {
try {
readyLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// simulate entry log is rotated (due to compaction)
EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger()
.getEntryLogManager();
entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId();
long currentLogId = entryLogManager.getCurrentLogId();
log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
readyLatch.countDown();
assertNull(checkpoints.poll());
assertEquals(new TestCheckpoint(0), storage.memTable.kvmap.cp);
assertEquals(20, storage.memTable.kvmap.size());
// trigger a memtable flush
Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot());
storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
assertEquals(new TestCheckpoint(100), checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
// all the entries are flushed out
assertEquals(new TestCheckpoint(100), storage.memTable.kvmap.cp);
assertEquals(0, storage.memTable.kvmap.size());
assertTrue(
"current log " + currentLogId + " contains entries added from memtable should be forced to disk"
+ " but least unflushed log is " + storage.getEntryLogger().getLeastUnflushedLogId(),
storage.getEntryLogger().getLeastUnflushedLogId() > currentLogId);
}
}