blob: def20f6806a6b788a03c648b581e501dcc4211ae [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the bookie journal.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(JournalChannel.class)
public class BookieJournalTest {
private static final Logger LOG = LoggerFactory.getLogger(BookieJournalTest.class);
final Random r = new Random(System.currentTimeMillis());
final List<File> tempDirs = new ArrayList<File>();
File createTempDir(String prefix, String suffix) throws IOException {
File dir = IOUtils.createTempDir(prefix, suffix);
tempDirs.add(dir);
return dir;
}
@After
public void tearDown() throws Exception {
for (File dir : tempDirs) {
FileUtils.deleteDirectory(dir);
}
tempDirs.clear();
}
private void writeIndexFileForLedger(File indexDir, long ledgerId,
byte[] masterKey)
throws Exception {
File fn = new File(indexDir, IndexPersistenceMgr.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey, FileInfo.CURRENT_HEADER_VERSION);
// force creation of index file
fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
fi.close(true);
}
private void writePartialIndexFileForLedger(File indexDir, long ledgerId,
byte[] masterKey, boolean truncateToMasterKey)
throws Exception {
File fn = new File(indexDir, IndexPersistenceMgr.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey, FileInfo.CURRENT_HEADER_VERSION);
// force creation of index file
fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
fi.close(true);
// file info header
int headerLen = 8 + 4 + masterKey.length;
// truncate the index file
int leftSize;
if (truncateToMasterKey) {
leftSize = r.nextInt(headerLen);
} else {
leftSize = headerLen + r.nextInt(1024 - headerLen);
}
FileChannel fc = new RandomAccessFile(fn, "rw").getChannel();
fc.truncate(leftSize);
fc.close();
}
/**
* Generate fence entry.
*/
private static ByteBuf generateFenceEntry(long ledgerId) {
ByteBuf bb = Unpooled.buffer();
bb.writeLong(ledgerId);
bb.writeLong(BookieImpl.METAENTRY_ID_FENCE_KEY);
return bb;
}
/**
* Generate meta entry with given master key.
*/
private static ByteBuf generateMetaEntry(long ledgerId, byte[] masterKey) {
ByteBuf bb = Unpooled.buffer();
bb.writeLong(ledgerId);
bb.writeLong(BookieImpl.METAENTRY_ID_LEDGER_KEY);
bb.writeInt(masterKey.length);
bb.writeBytes(masterKey);
return bb;
}
private void writeJunkJournal(File journalDir) throws Exception {
long logId = System.currentTimeMillis();
File fn = new File(journalDir, Long.toHexString(logId) + ".txn");
FileChannel fc = new RandomAccessFile(fn, "rw").getChannel();
ByteBuffer zeros = ByteBuffer.allocate(512);
fc.write(zeros, 4 * 1024 * 1024);
fc.position(0);
for (int i = 1; i <= 10; i++) {
fc.write(ByteBuffer.wrap("JunkJunkJunk".getBytes()));
}
}
private void writePreV2Journal(File journalDir, int numEntries) throws Exception {
long logId = System.currentTimeMillis();
File fn = new File(journalDir, Long.toHexString(logId) + ".txn");
FileChannel fc = new RandomAccessFile(fn, "rw").getChannel();
ByteBuffer zeros = ByteBuffer.allocate(512);
fc.write(zeros, 4 * 1024 * 1024);
fc.position(0);
byte[] data = "JournalTestData".getBytes();
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
for (int i = 1; i <= numEntries; i++) {
ByteBuf packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data);
lastConfirmed = i;
ByteBuffer lenBuff = ByteBuffer.allocate(4);
lenBuff.putInt(packet.readableBytes());
lenBuff.flip();
fc.write(lenBuff);
fc.write(packet.nioBuffer());
packet.release();
}
}
private static void moveToPosition(JournalChannel jc, long pos) throws IOException {
jc.fc.position(pos);
jc.bc.position = pos;
jc.bc.writeBufferStartPosition.set(pos);
}
private static void updateJournalVersion(JournalChannel jc, int journalVersion) throws IOException {
long prevPos = jc.fc.position();
try {
ByteBuffer versionBuffer = ByteBuffer.allocate(4);
versionBuffer.putInt(journalVersion);
versionBuffer.flip();
jc.fc.position(4);
IOUtils.writeFully(jc.fc, versionBuffer);
jc.fc.force(true);
} finally {
jc.fc.position(prevPos);
}
}
private JournalChannel writeV2Journal(File journalDir, int numEntries) throws Exception {
long logId = System.currentTimeMillis();
JournalChannel jc = new JournalChannel(journalDir, logId);
moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);
BufferedChannel bc = jc.getBufferedChannel();
byte[] data = new byte[1024];
Arrays.fill(data, (byte) 'X');
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
for (int i = 1; i <= numEntries; i++) {
ByteBuf packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data);
lastConfirmed = i;
ByteBuffer lenBuff = ByteBuffer.allocate(4);
lenBuff.putInt(packet.readableBytes());
lenBuff.flip();
bc.write(Unpooled.wrappedBuffer(lenBuff));
bc.write(packet);
packet.release();
}
bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V2);
return jc;
}
private JournalChannel writeV3Journal(File journalDir, int numEntries, byte[] masterKey) throws Exception {
long logId = System.currentTimeMillis();
JournalChannel jc = new JournalChannel(journalDir, logId);
moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);
BufferedChannel bc = jc.getBufferedChannel();
byte[] data = new byte[1024];
Arrays.fill(data, (byte) 'X');
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
for (int i = 0; i <= numEntries; i++) {
ByteBuf packet;
if (i == 0) {
packet = generateMetaEntry(1, masterKey);
} else {
packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data);
}
lastConfirmed = i;
ByteBuffer lenBuff = ByteBuffer.allocate(4);
lenBuff.putInt(packet.readableBytes());
lenBuff.flip();
bc.write(Unpooled.wrappedBuffer(lenBuff));
bc.write(packet);
packet.release();
}
bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V3);
return jc;
}
private JournalChannel writeV4Journal(File journalDir, int numEntries, byte[] masterKey) throws Exception {
long logId = System.currentTimeMillis();
JournalChannel jc = new JournalChannel(journalDir, logId);
moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);
BufferedChannel bc = jc.getBufferedChannel();
byte[] data = new byte[1024];
Arrays.fill(data, (byte) 'X');
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
for (int i = 0; i <= numEntries; i++) {
ByteBuf packet;
if (i == 0) {
packet = generateMetaEntry(1, masterKey);
} else {
packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data);
}
lastConfirmed = i;
ByteBuffer lenBuff = ByteBuffer.allocate(4);
lenBuff.putInt(packet.readableBytes());
lenBuff.flip();
bc.write(Unpooled.wrappedBuffer(lenBuff));
bc.write(packet);
packet.release();
}
// write fence key
ByteBuf packet = generateFenceEntry(1);
ByteBuf lenBuf = Unpooled.buffer();
lenBuf.writeInt(packet.readableBytes());
bc.write(lenBuf);
bc.write(packet);
bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V4);
return jc;
}
static JournalChannel writeV5Journal(File journalDir, int numEntries,
byte[] masterKey) throws Exception {
return writeV5Journal(journalDir, numEntries, masterKey, false);
}
static JournalChannel writeV5Journal(File journalDir, int numEntries,
byte[] masterKey, boolean corruptLength) throws Exception {
long logId = System.currentTimeMillis();
JournalChannel jc = new JournalChannel(journalDir, logId);
BufferedChannel bc = jc.getBufferedChannel();
ByteBuf paddingBuff = Unpooled.buffer();
paddingBuff.writeZero(2 * JournalChannel.SECTOR_SIZE);
byte[] data = new byte[4 * 1024 * 1024];
Arrays.fill(data, (byte) 'X');
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
long length = 0;
for (int i = 0; i <= numEntries; i++) {
ByteBuf packet;
if (i == 0) {
packet = generateMetaEntry(1, masterKey);
} else {
packet = ClientUtil.generatePacket(1, i, lastConfirmed, length, data, 0, i);
}
lastConfirmed = i;
length += i;
ByteBuf lenBuff = Unpooled.buffer();
if (corruptLength) {
lenBuff.writeInt(-1);
} else {
lenBuff.writeInt(packet.readableBytes());
}
bc.write(lenBuff);
bc.write(packet);
packet.release();
Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
}
// write fence key
ByteBuf packet = generateFenceEntry(1);
ByteBuf lenBuf = Unpooled.buffer();
lenBuf.writeInt(packet.readableBytes());
bc.write(lenBuf);
bc.write(packet);
Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V5);
return jc;
}
/**
* test that we can open a journal written without the magic
* word at the start. This is for versions of bookkeeper before
* the magic word was introduced
*/
@Test
public void testPreV2Journal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
writePreV2Journal(BookieImpl.getCurrentDirectory(journalDir), 100);
writeIndexFileForLedger(BookieImpl.getCurrentDirectory(ledgerDir), 1, "testPasswd".getBytes());
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
Bookie b = createBookieAndReadJournal(conf);
b.readEntry(1, 100);
try {
b.readEntry(1, 101);
fail("Shouldn't have found entry 101");
} catch (Bookie.NoEntryException e) {
// correct behaviour
}
b.shutdown();
}
@Test
public void testV4Journal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
writeV4Journal(BookieImpl.getCurrentDirectory(journalDir), 100, "testPasswd".getBytes());
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
BookieImpl b = createBookieAndReadJournal(conf);
b.readEntry(1, 100);
try {
b.readEntry(1, 101);
fail("Shouldn't have found entry 101");
} catch (Bookie.NoEntryException e) {
// correct behaviour
}
assertTrue(b.handles.getHandle(1, "testPasswd".getBytes()).isFenced());
b.shutdown();
}
@Test
public void testV5Journal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
writeV5Journal(BookieImpl.getCurrentDirectory(journalDir), 2 * JournalChannel.SECTOR_SIZE,
"testV5Journal".getBytes());
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
BookieImpl b = createBookieAndReadJournal(conf);
for (int i = 1; i <= 2 * JournalChannel.SECTOR_SIZE; i++) {
b.readEntry(1, i);
}
try {
b.readEntry(1, 2 * JournalChannel.SECTOR_SIZE + 1);
fail("Shouldn't have found entry " + (2 * JournalChannel.SECTOR_SIZE + 1));
} catch (Bookie.NoEntryException e) {
// correct behavior
}
assertTrue(b.handles.getHandle(1, "testV5Journal".getBytes()).isFenced());
b.shutdown();
}
/**
* Test that if the journal is all journal, we can not
* start the bookie. An admin should look to see what has
* happened in this case
*/
@Test
public void testAllJunkJournal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
writeJunkJournal(BookieImpl.getCurrentDirectory(journalDir));
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
Bookie b = null;
try {
b = new BookieImpl(conf);
fail("Shouldn't have been able to start without admin");
} catch (Throwable t) {
// correct behaviour
} finally {
if (b != null) {
b.shutdown();
}
}
}
/**
* Test that we can start with an empty journal.
* This can happen if the bookie crashes between creating the
* journal and writing the magic word. It could also happen before
* the magic word existed, if the bookie started but nothing was
* ever written.
*/
@Test
public void testEmptyJournal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
writePreV2Journal(BookieImpl.getCurrentDirectory(journalDir), 0);
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
Bookie b = new BookieImpl(conf);
}
/**
* Test that a journal can load if only the magic word and
* version are there.
*/
@Test
public void testHeaderOnlyJournal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
writeV2Journal(BookieImpl.getCurrentDirectory(journalDir), 0);
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
Bookie b = new BookieImpl(conf);
}
/**
* Test that if a journal has junk at the end, it does not load.
* If the journal is corrupt like this, admin intervention is needed
*/
@Test
public void testJunkEndedJournal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
JournalChannel jc = writeV2Journal(BookieImpl.getCurrentDirectory(journalDir), 0);
jc.getBufferedChannel().write(Unpooled.wrappedBuffer("JunkJunkJunk".getBytes()));
jc.getBufferedChannel().flushAndForceWrite(false);
writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
Bookie b = null;
try {
b = new BookieImpl(conf);
} catch (Throwable t) {
// correct behaviour
}
}
/**
* Test that if the bookie crashes while writing the length
* of an entry, that we can recover.
*
* <p>This is currently not the case, which is bad as recovery
* should be fine here. The bookie has crashed while writing
* but so the client has not be notified of success.
*/
@Test
public void testTruncatedInLenJournal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
JournalChannel jc = writeV2Journal(
BookieImpl.getCurrentDirectory(journalDir), 100);
ByteBuffer zeros = ByteBuffer.allocate(2048);
jc.fc.position(jc.getBufferedChannel().position() - 0x429);
jc.fc.write(zeros);
jc.fc.force(false);
writeIndexFileForLedger(BookieImpl.getCurrentDirectory(ledgerDir),
1, "testPasswd".getBytes());
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
Bookie b = createBookieAndReadJournal(conf);
b.readEntry(1, 99);
try {
b.readEntry(1, 100);
fail("Shouldn't have found entry 100");
} catch (Bookie.NoEntryException e) {
// correct behaviour
}
}
/**
* Test that if the bookie crashes in the middle of writing
* the actual entry it can recover.
* In this case the entry will be available, but it will corrupt.
* This is ok, as the client will disregard the entry after looking
* at its checksum.
*/
@Test
public void testTruncatedInEntryJournal() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
JournalChannel jc = writeV2Journal(
BookieImpl.getCurrentDirectory(journalDir), 100);
ByteBuffer zeros = ByteBuffer.allocate(2048);
jc.fc.position(jc.getBufferedChannel().position() - 0x300);
jc.fc.write(zeros);
jc.fc.force(false);
writeIndexFileForLedger(BookieImpl.getCurrentDirectory(ledgerDir),
1, "testPasswd".getBytes());
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
Bookie b = createBookieAndReadJournal(conf);
b.readEntry(1, 99);
// still able to read last entry, but it's junk
ByteBuf buf = b.readEntry(1, 100);
assertEquals("Ledger Id is wrong", buf.readLong(), 1);
assertEquals("Entry Id is wrong", buf.readLong(), 100);
assertEquals("Last confirmed is wrong", buf.readLong(), 99);
assertEquals("Length is wrong", buf.readLong(), 100 * 1024);
buf.readLong(); // skip checksum
boolean allX = true;
for (int i = 0; i < 1024; i++) {
byte x = buf.readByte();
allX = allX && x == (byte) 'X';
}
assertFalse("Some of buffer should have been zeroed", allX);
try {
b.readEntry(1, 101);
fail("Shouldn't have found entry 101");
} catch (Bookie.NoEntryException e) {
// correct behaviour
}
}
private BookieImpl createBookieAndReadJournal(ServerConfiguration conf) throws Exception {
BookieImpl b = new BookieImpl(conf);
for (Journal journal : b.journals) {
LastLogMark lastLogMark = journal.getLastLogMark().markLog();
b.readJournal();
assertTrue(journal.getLastLogMark().getCurMark().compare(lastLogMark.getCurMark()) > 0);
}
return b;
}
/**
* Test journal replay with SortedLedgerStorage and a very small max
* arena size.
*/
@Test
public void testSortedLedgerStorageReplayWithSmallMaxArenaSize() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
JournalChannel jc = writeV2Journal(
BookieImpl.getCurrentDirectory(journalDir), 100);
jc.fc.force(false);
writeIndexFileForLedger(BookieImpl.getCurrentDirectory(ledgerDir),
1, "testPasswd".getBytes());
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setLedgerStorageClass("org.apache.bookkeeper.bookie.SortedLedgerStorage");
conf.setSkipListArenaMaxAllocSize(0);
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
BookieImpl b = new BookieImpl(conf);
b.readJournal();
b.ledgerStorage.flush();
b.readEntry(1, 80);
b.readEntry(1, 99);
}
/**
* Test partial index (truncate master key) with pre-v3 journals.
*/
@Test
public void testPartialFileInfoPreV3Journal1() throws Exception {
testPartialFileInfoPreV3Journal(true);
}
/**
* Test partial index with pre-v3 journals.
*/
@Test
public void testPartialFileInfoPreV3Journal2() throws Exception {
testPartialFileInfoPreV3Journal(false);
}
/**
* Test partial index file with pre-v3 journals.
*/
private void testPartialFileInfoPreV3Journal(boolean truncateMasterKey)
throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
writePreV2Journal(BookieImpl.getCurrentDirectory(journalDir), 100);
writePartialIndexFileForLedger(BookieImpl.getCurrentDirectory(ledgerDir),
1, "testPasswd".getBytes(), truncateMasterKey);
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
if (truncateMasterKey) {
try {
BookieImpl b = new BookieImpl(conf);
b.readJournal();
fail("Should not reach here!");
} catch (IOException ie) {
}
} else {
BookieImpl b = new BookieImpl(conf);
b.readJournal();
b.readEntry(1, 100);
try {
b.readEntry(1, 101);
fail("Shouldn't have found entry 101");
} catch (Bookie.NoEntryException e) {
// correct behaviour
}
}
}
/**
* Test partial index (truncate master key) with post-v3 journals.
*/
@Test
public void testPartialFileInfoPostV3Journal1() throws Exception {
testPartialFileInfoPostV3Journal(true);
}
/**
* Test partial index with post-v3 journals.
*/
@Test
public void testPartialFileInfoPostV3Journal2() throws Exception {
testPartialFileInfoPostV3Journal(false);
}
/**
* Test partial index file with post-v3 journals.
*/
private void testPartialFileInfoPostV3Journal(boolean truncateMasterKey)
throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
byte[] masterKey = "testPasswd".getBytes();
writeV3Journal(BookieImpl.getCurrentDirectory(journalDir), 100, masterKey);
writePartialIndexFileForLedger(BookieImpl.getCurrentDirectory(ledgerDir), 1, masterKey,
truncateMasterKey);
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
BookieImpl b = new BookieImpl(conf);
b.readJournal();
b.readEntry(1, 100);
try {
b.readEntry(1, 101);
fail("Shouldn't have found entry 101");
} catch (Bookie.NoEntryException e) {
// correct behaviour
}
}
/**
* Test for fake IOException during read of Journal.
*/
@Test
public void testJournalScanIOException() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
writeV4Journal(BookieImpl.getCurrentDirectory(journalDir), 100, "testPasswd".getBytes());
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
Journal.JournalScanner journalScanner = new DummyJournalScan();
FileChannel fileChannel = PowerMockito.mock(FileChannel.class);
PowerMockito.when(fileChannel.position(Mockito.anyLong()))
.thenThrow(new IOException());
PowerMockito.mockStatic(JournalChannel.class);
PowerMockito.when(JournalChannel.openFileChannel(Mockito.any(RandomAccessFile.class))).thenReturn(fileChannel);
BookieImpl b = new BookieImpl(conf);
for (Journal journal : b.journals) {
List<Long> journalIds = journal.listJournalIds(journal.getJournalDirectory(), null);
assertEquals(journalIds.size(), 1);
try {
journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner);
fail("Should not have been able to scan the journal");
} catch (Exception e) {
// Expected
}
}
b.shutdown();
}
private class DummyJournalScan implements Journal.JournalScanner {
@Override
public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
LOG.warn("Journal Version : " + journalVersion);
}
};
}