blob: 54a85bcb8e6a37293f4710f15249b35c5f305267 [file] [log] [blame]
package org.apache.bookkeeper.test;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
import java.io.File;
import java.util.Enumeration;
import java.util.List;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.proto.BookieServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* This class tests that bookie rolling journals
*/
public class BookieJournalRollingTest extends BookKeeperClusterTestCase {
static Logger LOG = LoggerFactory.getLogger(BookieJournalRollingTest.class);
DigestType digestType;
public BookieJournalRollingTest() {
super(3);
this.digestType = DigestType.CRC32;
}
@Before
@Override
public void setUp() throws Exception {
// Set up the configuration properties needed.
baseConf.setMaxJournalSize(1);
baseConf.setMaxBackupJournals(1);
super.setUp();
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
}
/**
* Common method to create ledgers and write entries to them.
*/
private LedgerHandle[] writeLedgerEntries(int numLedgers, int msgSize, int numMsgs) throws Exception {
// Create the ledgers
LedgerHandle[] lhs = new LedgerHandle[numLedgers];
long[] ledgerIds = new long[numLedgers];
for (int i = 0; i < numLedgers; i++) {
lhs[i] = bkc.createLedger(digestType, "".getBytes());
ledgerIds[i] = lhs[i].getId();
}
writeLedgerEntries(lhs, msgSize, numMsgs);
// Return the ledger handles to the inserted ledgers and entries
return lhs;
}
private void writeLedgerEntries(LedgerHandle[] lhs, int msgSize, int numMsgs) throws Exception {
// Create a dummy message string to write as ledger entries
StringBuilder msgSB = new StringBuilder();
for (int i = 0; i < msgSize; i++) {
msgSB.append("a");
}
String msg = msgSB.toString();
// Write all of the entries for all of the ledgers
for (int i = 0; i < numMsgs; i++) {
for (int j = 0; j < lhs.length; j++) {
StringBuilder sb = new StringBuilder();
sb.append(lhs[j].getId()).append('-').append(i).append('-')
.append(msg);
lhs[j].addEntry(sb.toString().getBytes());
}
}
}
private void validLedgerEntries(long[] ledgerIds, int msgSize, int numMsgs) throws Exception {
// Open the ledgers
LedgerHandle[] lhs = new LedgerHandle[ledgerIds.length];
for (int i = 0; i < lhs.length; i++) {
lhs[i] = bkc.openLedger(ledgerIds[i], digestType, "".getBytes());
}
StringBuilder msgSB = new StringBuilder();
for (int i = 0; i < msgSize; i++) {
msgSB.append("a");
}
String msg = msgSB.toString();
int numToRead = 10;
// read all of the entries for all the ledgers
for (int j = 0; j < lhs.length; j++) {
int start = 0;
int read = Math.min(numToRead, numMsgs - start);
int end = start + read - 1;
int entryId = 0;
if (LOG.isDebugEnabled()) {
LOG.debug("Validating Entries of Ledger " + ledgerIds[j]);
}
while (start < numMsgs) {
Enumeration<LedgerEntry> seq = lhs[j].readEntries(start, end);
assertTrue("Enumeration of ledger entries has no element", seq.hasMoreElements() == true);
while (seq.hasMoreElements()) {
LedgerEntry e = seq.nextElement();
assertEquals(entryId, e.getEntryId());
StringBuilder sb = new StringBuilder();
sb.append(ledgerIds[j]).append('-').append(entryId).append('-')
.append(msg);
Assert.assertArrayEquals(sb.toString().getBytes(), e.getEntry());
entryId++;
}
assertEquals(entryId - 1, end);
start = end + 1;
read = Math.min(numToRead, numMsgs - start);
end = start + read - 1;
}
}
}
/**
* This test writes enough ledger entries to roll over the journals
*
* It will then keep only 1 journal file before last marked journal
*
* @throws Exception
*/
@Test(timeout=60000)
public void testJournalRolling() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Testing Journal Rolling");
}
// Write enough ledger entries so that we roll over journals
LedgerHandle[] lhs = writeLedgerEntries(4, 1024, 1024);
long[] ledgerIds = new long[lhs.length];
for (int i=0; i<lhs.length; i++) {
ledgerIds[i] = lhs[i].getId();
}
// Sleep for a while to ensure data are flushed
Thread.sleep(2000);
// verify that we only keep at most journal files
for (File journalDir : tmpDirs) {
File[] journals = journalDir.listFiles();
int numJournals = 0;
for (File f : journals) {
if (!f.getName().endsWith(".txn")) {
continue;
}
++numJournals;
}
assertTrue(numJournals <= 2);
}
// restart bookies
// ensure after restart we can read the entries since journals rolls
restartBookies();
validLedgerEntries(ledgerIds, 1024, 1024);
}
/**
* This test writes enough ledger entries to roll over the journals
* without sync up
*
* @throws Exception
*/
@Test(timeout=60000)
public void testJournalRollingWithoutSyncup() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Testing Journal Rolling without sync up");
}
// set flush interval to a large value
ServerConfiguration newConf = new ServerConfiguration();
newConf.setAllowLoopback(true);
newConf.setFlushInterval(999999999);
// restart bookies
restartBookies(newConf);
// Write enough ledger entries so that we roll over journals
LedgerHandle[] lhs = writeLedgerEntries(4, 1024, 1024);
long[] ledgerIds = new long[lhs.length];
for (int i=0; i<lhs.length; i++) {
ledgerIds[i] = lhs[i].getId();
}
// ledger indexes are not flushed
// and after bookies restarted, journals will be relayed
// ensure that we can still read the entries
restartBookies(newConf);
validLedgerEntries(ledgerIds, 1024, 1024);
}
/**
* This test writes enough ledger entries to roll over the journals
* without sync up
*
* @throws Exception
*/
@Test(timeout=60000)
public void testReplayDeletedLedgerJournalEntries() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Testing replaying journal entries whose ledger has been removed.");
}
// Write entries
LedgerHandle[] lhs = writeLedgerEntries(1, 1024, 10);
// Wait until all entries are flushed and last mark rolls
Thread.sleep(3 * baseConf.getFlushInterval());
// restart bookies with flush interval set to a large value
ServerConfiguration newConf = new ServerConfiguration();
newConf.setFlushInterval(999999999);
newConf.setAllowLoopback(true);
// restart bookies
restartBookies(newConf);
// Write entries again to let them existed in journal
writeLedgerEntries(lhs, 1024, 10);
// delete them
for (LedgerHandle lh : lhs) {
bkc.deleteLedger(lh.getId());
}
// wait for gc
Thread.sleep(2 * newConf.getGcWaitTime());
// restart bookies again to trigger replaying journal
restartBookies(newConf);
}
}