blob: 07fcd720c1924927cc2f9ec9529aab86ba32d6f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
import static org.mockito.Mockito.spy;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class TestBookKeeperJournalManager {
static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
private static final long DEFAULT_SEGMENT_SIZE = 1000;
protected static Configuration conf = new Configuration();
private ZooKeeper zkc;
private static BKJMUtil bkutil;
static int numBookies = 3;
private BookieServer newBookie;
@BeforeClass
public static void setupBookkeeper() throws Exception {
bkutil = new BKJMUtil(numBookies);
bkutil.start();
}
@AfterClass
public static void teardownBookkeeper() throws Exception {
bkutil.teardown();
}
@Before
public void setup() throws Exception {
zkc = BKJMUtil.connectZooKeeper();
}
@After
public void teardown() throws Exception {
zkc.close();
if (newBookie != null) {
newBookie.shutdown();
}
}
private NamespaceInfo newNSInfo() {
Random r = new Random();
return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
}
@Test
public void testSimpleWrite() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
String zkpath = bkjm.finalizedLedgerZNode(1, 100);
assertNotNull(zkc.exists(zkpath, false));
assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
}
@Test
public void testNumberOfTransactions() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(100, numTrans);
}
@Test
public void testNumberOfTransactionsWithGaps() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
bkjm.format(nsi);
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, txid-1);
assertNotNull(
zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
}
zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
DEFAULT_SEGMENT_SIZE*2), -1);
long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
try {
numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true);
fail("Should have thrown corruption exception by this point");
} catch (JournalManager.CorruptionException ce) {
// if we get here, everything is going good
}
numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
}
@Test
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
bkjm.format(nsi);
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
assertNotNull(
zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
}
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
out.abort();
out.close();
long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals((txid-1), numTrans);
}
/**
* Create a bkjm namespace, write a journal from txid 1, close stream.
* Try to create a new journal from txid 1. Should throw an exception.
*/
@Test
public void testWriteRestartFrom1() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
bkjm.format(nsi);
long txid = 1;
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
txid = 1;
try {
out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Shouldn't be able to start another journal from " + txid
+ " when one already exists");
} catch (Exception ioe) {
LOG.info("Caught exception as expected", ioe);
}
// test border case
txid = DEFAULT_SEGMENT_SIZE;
try {
out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Shouldn't be able to start another journal from " + txid
+ " when one already exists");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
}
// open journal continuing from before
txid = DEFAULT_SEGMENT_SIZE + 1;
start = txid;
out = bkjm.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
assertNotNull(out);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
// open journal arbitarily far in the future
txid = DEFAULT_SEGMENT_SIZE * 4;
out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
assertNotNull(out);
}
@Test
public void testTwoWriters() throws Exception {
long start = 1;
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
bkjm1.format(nsi);
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
EditLogOutputStream out1 = bkjm1.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
try {
bkjm2.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Shouldn't have been able to open the second writer");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
}finally{
out1.close();
}
}
@Test
public void testSimpleRead() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
nsi);
bkjm.format(nsi);
final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1 ; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, numTransactions);
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
bkjm.selectInputStreams(in, 1, true);
try {
assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
} finally {
in.get(0).close();
}
}
@Test
public void testSimpleRecovery() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.setReadyToFlush();
out.flush();
out.abort();
out.close();
assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false));
bkjm.recoverUnfinalizedSegments();
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
}
/**
* Test that if enough bookies fail to prevent an ensemble,
* writes the bookkeeper will fail. Test that when once again
* an ensemble is available, it can continue to write.
*/
@Test
public void testAllBookieFailure() throws Exception {
// bookie to fail
newBookie = bkutil.newBookie();
BookieServer replacementBookie = null;
try {
int ensembleSize = numBookies + 1;
assertEquals("New bookie didn't start",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
// ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager
Configuration conf = new Configuration();
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
long txid = 1;
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
newBookie.shutdown();
assertEquals("New bookie didn't die",
numBookies, bkutil.checkBookiesUp(numBookies, 10));
try {
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
fail("should not get to this stage");
} catch (IOException ioe) {
LOG.debug("Error writing to bookkeeper", ioe);
assertTrue("Invalid exception message",
ioe.getMessage().contains("Failed to write to bookkeeper"));
}
replacementBookie = bkutil.newBookie();
assertEquals("New bookie didn't start",
numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
} catch (Exception e) {
LOG.error("Exception in test", e);
throw e;
} finally {
if (replacementBookie != null) {
replacementBookie.shutdown();
}
newBookie.shutdown();
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors");
}
}
}
/**
* Test that a BookKeeper JM can continue to work across the
* failure of a bookie. This should be handled transparently
* by bookkeeper.
*/
@Test
public void testOneBookieFailure() throws Exception {
newBookie = bkutil.newBookie();
BookieServer replacementBookie = null;
try {
int ensembleSize = numBookies + 1;
assertEquals("New bookie didn't start",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
// ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager
Configuration conf = new Configuration();
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
long txid = 1;
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
replacementBookie = bkutil.newBookie();
assertEquals("replacement bookie didn't start",
ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
newBookie.shutdown();
assertEquals("New bookie didn't die",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
} catch (Exception e) {
LOG.error("Exception in test", e);
throw e;
} finally {
if (replacementBookie != null) {
replacementBookie.shutdown();
}
newBookie.shutdown();
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors");
}
}
}
/**
* If a journal manager has an empty inprogress node, ensure that we throw an
* error, as this should not be possible, and some third party has corrupted
* the zookeeper state
*/
@Test
public void testEmptyInprogressNode() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
out.close();
bkjm.close();
String inprogressZNode = bkjm.inprogressZNode(101);
zkc.setData(inprogressZNode, new byte[0], -1);
bkjm = new BookKeeperJournalManager(conf, uri, nsi);
try {
bkjm.recoverUnfinalizedSegments();
fail("Should have failed. There should be no way of creating"
+ " an empty inprogess znode");
} catch (IOException e) {
// correct behaviour
assertTrue("Exception different than expected", e.getMessage().contains(
"Invalid/Incomplete data in znode"));
} finally {
bkjm.close();
}
}
/**
* If a journal manager has an corrupt inprogress node, ensure that we throw
* an error, as this should not be possible, and some third party has
* corrupted the zookeeper state
*/
@Test
public void testCorruptInprogressNode() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
out.close();
bkjm.close();
String inprogressZNode = bkjm.inprogressZNode(101);
zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
bkjm = new BookKeeperJournalManager(conf, uri, nsi);
try {
bkjm.recoverUnfinalizedSegments();
fail("Should have failed. There should be no way of creating"
+ " an empty inprogess znode");
} catch (IOException e) {
// correct behaviour
assertTrue("Exception different than expected", e.getMessage().contains(
"has no field named"));
} finally {
bkjm.close();
}
}
/**
* Cases can occur where we create a segment but crash before we even have the
* chance to write the START_SEGMENT op. If this occurs we should warn, but
* load as normal
*/
@Test
public void testEmptyInprogressLedger() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
out.close();
bkjm.close();
bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(101,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(101, 200);
bkjm.close();
}
/**
* Test that if we fail between finalizing an inprogress and deleting the
* corresponding inprogress znode.
*/
@Test
public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
URI uri = BKJMUtil
.createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.close();
String inprogressZNode = bkjm.inprogressZNode(1);
String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100);
assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode,
null));
assertNull("finalized znode exists", zkc.exists(finalizedZNode, null));
byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
// finalize
bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments();
bkjm.close();
assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null));
assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode,
null));
zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// should work fine
bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments();
bkjm.close();
}
/**
* Tests that the edit log file meta data reading from ZooKeeper should be
* able to handle the NoNodeException. bkjm.getInputStream(fromTxId,
* inProgressOk) should suppress the NoNodeException and continue. HDFS-3441.
*/
@Test
public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
bkjm.format(nsi);
try {
// start new inprogress log segment with txid=1
// and write transactions till txid=50
String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50);
// start new inprogress log segment with txid=51
// and write transactions till txid=100
String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100);
// read the metadata from ZK. Here simulating the situation
// when reading,the edit log metadata can be removed by purger thread.
ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper());
bkjm.setZooKeeper(zkspy);
Mockito.doThrow(
new KeeperException.NoNodeException(zkpath2 + " doesn't exists"))
.when(zkspy).getData(zkpath2, false, null);
List<EditLogLedgerMetadata> ledgerList = bkjm.getLedgerList(false);
assertEquals("List contains the metadata of non exists path.", 1,
ledgerList.size());
assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1,
ledgerList.get(0).getZkPath());
} finally {
bkjm.close();
}
}
private enum ThreadStatus {
COMPLETED, GOODEXCEPTION, BADEXCEPTION;
};
/**
* Tests that concurrent calls to format will still allow one to succeed.
*/
@Test
public void testConcurrentFormat() throws Exception {
final URI uri = BKJMUtil.createJournalURI("/hdfsjournal-concurrentformat");
final NamespaceInfo nsi = newNSInfo();
// populate with data first
BookKeeperJournalManager bkjm
= new BookKeeperJournalManager(conf, uri, nsi);
bkjm.format(nsi);
for (int i = 1; i < 100*2; i += 2) {
bkjm.startLogSegment(i, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
bkjm.finalizeLogSegment(i, i+1);
}
bkjm.close();
final int numThreads = 40;
List<Callable<ThreadStatus>> threads
= new ArrayList<Callable<ThreadStatus>>();
final CyclicBarrier barrier = new CyclicBarrier(numThreads);
for (int i = 0; i < numThreads; i++) {
threads.add(new Callable<ThreadStatus>() {
public ThreadStatus call() {
BookKeeperJournalManager bkjm = null;
try {
bkjm = new BookKeeperJournalManager(conf, uri, nsi);
barrier.await();
bkjm.format(nsi);
return ThreadStatus.COMPLETED;
} catch (IOException ioe) {
LOG.info("Exception formatting ", ioe);
return ThreadStatus.GOODEXCEPTION;
} catch (InterruptedException ie) {
LOG.error("Interrupted. Something is broken", ie);
Thread.currentThread().interrupt();
return ThreadStatus.BADEXCEPTION;
} catch (Exception e) {
LOG.error("Some other bad exception", e);
return ThreadStatus.BADEXCEPTION;
} finally {
if (bkjm != null) {
try {
bkjm.close();
} catch (IOException ioe) {
LOG.error("Error closing journal manager", ioe);
}
}
}
}
});
}
ExecutorService service = Executors.newFixedThreadPool(numThreads);
List<Future<ThreadStatus>> statuses = service.invokeAll(threads, 60,
TimeUnit.SECONDS);
int numCompleted = 0;
for (Future<ThreadStatus> s : statuses) {
assertTrue(s.isDone());
assertTrue("Thread threw invalid exception",
s.get() == ThreadStatus.COMPLETED
|| s.get() == ThreadStatus.GOODEXCEPTION);
if (s.get() == ThreadStatus.COMPLETED) {
numCompleted++;
}
}
LOG.info("Completed " + numCompleted + " formats");
assertTrue("No thread managed to complete formatting", numCompleted > 0);
}
@Test(timeout = 120000)
public void testDefaultAckQuorum() throws Exception {
newBookie = bkutil.newBookie();
int ensembleSize = numBookies + 1;
int quorumSize = numBookies + 1;
// ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager
Configuration conf = new Configuration();
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
quorumSize);
// sets 2 secs
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
2);
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
bkjm.format(nsi);
CountDownLatch sleepLatch = new CountDownLatch(1);
sleepBookie(sleepLatch, newBookie);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
int numTransactions = 100;
for (long i = 1; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
try {
out.close();
bkjm.finalizeLogSegment(1, numTransactions);
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
bkjm.selectInputStreams(in, 1, true);
try {
assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
} finally {
in.get(0).close();
}
fail("Should throw exception as not enough non-faulty bookies available!");
} catch (IOException ioe) {
// expected
}
}
/**
* Test ack quorum feature supported by bookkeeper. Keep ack quorum bookie
* alive and sleep all the other bookies. Now the client would wait for the
* acknowledgement from the ack size bookies and after receiving the success
* response will continue writing. Non ack client will hang long time to add
* entries.
*/
@Test(timeout = 120000)
public void testAckQuorum() throws Exception {
// slow bookie
newBookie = bkutil.newBookie();
// make quorum size and ensemble size same to avoid the interleave writing
// of the ledger entries
int ensembleSize = numBookies + 1;
int quorumSize = numBookies + 1;
int ackSize = numBookies;
// ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager
Configuration conf = new Configuration();
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
quorumSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ACK_QUORUM_SIZE,
ackSize);
// sets 60 minutes
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
3600);
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
bkjm.format(nsi);
CountDownLatch sleepLatch = new CountDownLatch(1);
sleepBookie(sleepLatch, newBookie);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
int numTransactions = 100;
for (long i = 1; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, numTransactions);
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
bkjm.selectInputStreams(in, 1, true);
try {
assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
} finally {
sleepLatch.countDown();
in.get(0).close();
bkjm.close();
}
}
/**
* Sleep a bookie until I count down the latch
*
* @param latch
* Latch to wait on
* @param bookie
* bookie server
* @throws Exception
*/
private void sleepBookie(final CountDownLatch l, final BookieServer bookie)
throws Exception {
Thread sleeper = new Thread() {
public void run() {
try {
bookie.suspendProcessing();
l.await(60, TimeUnit.SECONDS);
bookie.resumeProcessing();
} catch (Exception e) {
LOG.error("Error suspending bookie", e);
}
}
};
sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
sleeper.start();
}
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
int startTxid, int endTxid) throws IOException, KeeperException,
InterruptedException {
EditLogOutputStream out = bkjm.startLogSegment(startTxid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = startTxid; i <= endTxid; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
// finalize the inprogress_1 log segment.
bkjm.finalizeLogSegment(startTxid, endTxid);
String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid);
assertNotNull(zkc.exists(zkpath1, false));
assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false));
return zkpath1;
}
}