package org.apache.hadoop.contrib.bkjournal;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.conf.Configuration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import java.util.Collection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
* BookKeeper Journal Manager
* To use, add the following to hdfs-site.xml.
* <pre>
* {@code
* <property>
* <name>dfs.namenode.edits.dir</name>
* <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
* </property>
* <property>
* <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
* <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
* </property>
* }
* </pre>
* The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
* [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
* pairs. In the example above there are 3 servers, in the ensemble,
* zk1, zk2 &amp; zk3, each one listening on port 2181.
* [root znode] is the path of the zookeeper znode, under which the editlog
* information will be stored.
* Other configuration options are:
* <ul>
* <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
* Number of bytes a bookkeeper journal stream will buffer before
* forcing a flush. Default is 1024.</li>
* <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
* Number of bookkeeper servers in edit log ledger ensembles. This
* is the number of bookkeeper servers which need to be available
* for the ledger to be writable. Default is 3.</li>
* <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
* Number of bookkeeper servers in the write quorum. This is the
* number of bookkeeper servers which must have acknowledged the
* write of an entry before it is considered written.
* Default is 2.</li>
* <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
* Password to use when creating ledgers. </li>
* <li><b>dfs.namenode.bookkeeperjournal.zk.session.timeout</b>
* Session timeout for Zookeeper client from BookKeeper Journal Manager.
* Hadoop recommends that, this value should be less than the ZKFC
* session timeout value. Default value is 3000.</li>
* </ul>
public class BookKeeperJournalManager implements JournalManager {
static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
public static final String BKJM_OUTPUT_BUFFER_SIZE
= "dfs.namenode.bookkeeperjournal.output-buffer-size";
public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
= "dfs.namenode.bookkeeperjournal.ensemble-size";
public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
= "dfs.namenode.bookkeeperjournal.quorum-size";
public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
public static final String BKJM_BOOKKEEPER_DIGEST_PW
= "dfs.namenode.bookkeeperjournal.digestPw";
public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
private static final int BKJM_LAYOUT_VERSION = -1;
public static final String BKJM_ZK_SESSION_TIMEOUT
= "dfs.namenode.bookkeeperjournal.zk.session.timeout";
public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000;
private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
private ZooKeeper zkc;
private final Configuration conf;
private final BookKeeper bkc;
private final CurrentInprogress ci;
private final String ledgerPath;
private final MaxTxId maxTxId;
private final int ensembleSize;
private final int quorumSize;
private final String digestpw;
private final CountDownLatch zkConnectLatch;
private LedgerHandle currentLedger = null;
private int bytesToInt(byte[] b) {
assert b.length >= 4;
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
private byte[] intToBytes(int i) {
return new byte[] {
(byte)(i >> 24),
(byte)(i >> 16),
(byte)(i >> 8),
(byte)(i) };
* Construct a Bookkeeper journal manager.
public BookKeeperJournalManager(Configuration conf, URI uri)
throws IOException {
this.conf = conf;
String zkConnect = uri.getAuthority().replace(";", ",");
String zkPath = uri.getPath();
ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
ledgerPath = zkPath + "/ledgers";
String maxTxIdPath = zkPath + "/maxtxid";
String currentInprogressNodePath = zkPath + "/CurrentInprogress";
String versionPath = zkPath + "/version";
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
try {
zkConnectLatch = new CountDownLatch(1);
zkc = new ZooKeeper(zkConnect, conf.getInt(BKJM_ZK_SESSION_TIMEOUT,
BKJM_ZK_SESSION_TIMEOUT_DEFAULT), new ZkConnectionWatcher());
if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
throw new IOException("Error connecting to zookeeper");
if (zkc.exists(zkPath, false) == null) {
zkc.create(zkPath, new byte[] {'0'},
Stat versionStat = zkc.exists(versionPath, false);
if (versionStat != null) {
byte[] d = zkc.getData(versionPath, false, versionStat);
// There's only one version at the moment
assert bytesToInt(d) == BKJM_LAYOUT_VERSION;
} else {
zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION),
if (zkc.exists(ledgerPath, false) == null) {
zkc.create(ledgerPath, new byte[] {'0'},
bkc = new BookKeeper(new ClientConfiguration(),
} catch (KeeperException e) {
throw new IOException("Error initializing zk", e);
} catch (InterruptedException ie) {
throw new IOException("Interrupted while initializing bk journal manager",
ci = new CurrentInprogress(zkc, currentInprogressNodePath);
maxTxId = new MaxTxId(zkc, maxTxIdPath);
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.
* Then create a ledger and stream based on that ledger.
* The ledger id is written to the inprogress znode, so that in the
* case of a crash, a recovery process can find the ledger we were writing
* to when we crashed.
* @param txId First transaction id to be written to the stream
public EditLogOutputStream startLogSegment(long txId) throws IOException {
if (txId <= maxTxId.get()) {
throw new IOException("We've already seen " + txId
+ ". A new stream cannot be created with it");
try {
String existingInprogressNode =;
if (null != existingInprogressNode
&& zkc.exists(existingInprogressNode, false) != null) {
throw new IOException("Inprogress node already exists");
if (currentLedger != null) {
// bookkeeper errored on last stream, clean up ledger
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
} catch (BKException bke) {
throw new IOException("Error creating ledger", bke);
} catch (KeeperException ke) {
throw new IOException("Error in zookeeper while creating ledger", ke);
} catch (InterruptedException ie) {
throw new IOException("Interrupted creating ledger", ie);
try {
String znodePath = inprogressZNode(txId);
EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId);
/* Write the ledger metadata out to the inprogress ledger znode
* This can fail if for some reason our write lock has
* expired (@see WriteLock) and another process has managed to
* create the inprogress znode.
* In this case, throw an exception. We don't want to continue
* as this would lead to a split brain situation.
l.write(zkc, znodePath);;
return new BookKeeperEditLogOutputStream(conf, currentLedger);
} catch (KeeperException ke) {
throw new IOException("Error storing ledger metadata", ke);
private void cleanupLedger(LedgerHandle lh) {
try {
long id = currentLedger.getId();
} catch (BKException bke) {
//log & ignore, an IOException will be thrown soon
LOG.error("Error closing ledger", bke);
} catch (InterruptedException ie) {
LOG.warn("Interrupted while closing ledger", ie);
* Finalize a log segment. If the journal manager is currently
* writing to a ledger, ensure that this is the ledger of the log segment
* being finalized.
* Otherwise this is the recovery case. In the recovery case, ensure that
* the firstTxId of the ledger matches firstTxId for the segment we are
* trying to finalize.
public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
String inprogressPath = inprogressZNode(firstTxId);
try {
Stat inprogressStat = zkc.exists(inprogressPath, false);
if (inprogressStat == null) {
throw new IOException("Inprogress znode " + inprogressPath
+ " doesn't exist");
EditLogLedgerMetadata l
=, inprogressPath);
if (currentLedger != null) { // normal, non-recovery case
if (l.getLedgerId() == currentLedger.getId()) {
try {
} catch (BKException bke) {
LOG.error("Error closing current ledger", bke);
currentLedger = null;
} else {
throw new IOException(
"Active ledger has different ID to inprogress. "
+ l.getLedgerId() + " found, "
+ currentLedger.getId() + " expected");
if (l.getFirstTxId() != firstTxId) {
throw new IOException("Transaction id not as expected, "
+ l.getFirstTxId() + " found, " + firstTxId + " expected");
String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
try {
l.write(zkc, finalisedPath);
} catch (KeeperException.NodeExistsException nee) {
if (!l.verify(zkc, finalisedPath)) {
throw new IOException("Node " + finalisedPath + " already exists"
+ " but data doesn't match");
zkc.delete(inprogressPath, inprogressStat.getVersion());
String inprogressPathFromCI =;
if (inprogressPath.equals(inprogressPathFromCI)) {
} catch (KeeperException e) {
throw new IOException("Error finalising ledger", e);
} catch (InterruptedException ie) {
throw new IOException("Error finalising ledger", ie);
EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
throws IOException {
for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
long lastTxId = l.getLastTxId();
if (l.isInProgress()) {
lastTxId = recoverLastTxId(l, false);
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
try {
LedgerHandle h;
if (l.isInProgress()) { // we don't want to fence the current journal
h = bkc.openLedgerNoRecovery(l.getLedgerId(),
BookKeeper.DigestType.MAC, digestpw.getBytes());
} else {
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
BookKeeperEditLogInputStream s = new BookKeeperEditLogInputStream(h,
return s;
} catch (BKException e) {
throw new IOException("Could not open ledger for " + fromTxId, e);
} catch (InterruptedException ie) {
throw new IOException("Interrupted opening ledger for "
+ fromTxId, ie);
return null;
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) {
// NOTE: could probably be rewritten more efficiently
while (true) {
EditLogInputStream elis;
try {
elis = getInputStream(fromTxId, inProgressOk);
} catch (IOException e) {
if (elis == null) {
if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
fromTxId = elis.getLastTxId() + 1;
long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
throws IOException {
long count = 0;
long expectedStart = 0;
for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
long lastTxId = l.getLastTxId();
if (l.isInProgress()) {
lastTxId = recoverLastTxId(l, false);
if (lastTxId == HdfsConstants.INVALID_TXID) {
assert lastTxId >= l.getFirstTxId();
if (lastTxId < fromTxId) {
} else if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) {
// we can start in the middle of a segment
count = (lastTxId - l.getFirstTxId()) + 1;
expectedStart = lastTxId + 1;
} else {
if (expectedStart != l.getFirstTxId()) {
if (count == 0) {
throw new CorruptionException("StartTxId " + l.getFirstTxId()
+ " is not as expected " + expectedStart
+ ". Gap in transaction log?");
} else {
count += (lastTxId - l.getFirstTxId()) + 1;
expectedStart = lastTxId + 1;
return count;
public void recoverUnfinalizedSegments() throws IOException {
synchronized (this) {
try {
List<String> children = zkc.getChildren(ledgerPath, false);
for (String child : children) {
if (!child.startsWith(BKJM_EDIT_INPROGRESS)) {
String znode = ledgerPath + "/" + child;
EditLogLedgerMetadata l =, znode);
try {
long endTxId = recoverLastTxId(l, true);
if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + znode
+ ". Unable to continue recovery.");
throw new IOException("Unrecoverable corruption,"
+ " please check logs.");
finalizeLogSegment(l.getFirstTxId(), endTxId);
} catch (SegmentEmptyException see) {
LOG.warn("Inprogress znode " + child
+ " refers to a ledger which is empty. This occurs when the NN"
+ " crashes after opening a segment, but before writing the"
+ " OP_START_LOG_SEGMENT op. It is safe to delete."
+ " MetaData [" + l.toString() + "]");
// If the max seen transaction is the same as what would
// have been the first transaction of the failed ledger,
// decrement it, as that transaction never happened and as
// such, is _not_ the last seen
if (maxTxId.get() == l.getFirstTxId()) {
maxTxId.reset(maxTxId.get() - 1);
zkc.delete(znode, -1);
} catch (KeeperException.NoNodeException nne) {
// nothing to recover, ignore
} catch (KeeperException ke) {
throw new IOException("Couldn't get list of inprogress segments", ke);
} catch (InterruptedException ie) {
throw new IOException("Interrupted getting list of inprogress segments",
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
for (EditLogLedgerMetadata l : getLedgerList(false)) {
if (l.getLastTxId() < minTxIdToKeep) {
try {
Stat stat = zkc.exists(l.getZkPath(), false);
zkc.delete(l.getZkPath(), stat.getVersion());
} catch (InterruptedException ie) {
LOG.error("Interrupted while purging " + l, ie);
} catch (BKException bke) {
LOG.error("Couldn't delete ledger from bookkeeper", bke);
} catch (KeeperException ke) {
LOG.error("Error deleting ledger entry in zookeeper", ke);
public void close() throws IOException {
try {
} catch (BKException bke) {
throw new IOException("Couldn't close bookkeeper client", bke);
} catch (InterruptedException ie) {
throw new IOException("Interrupted while closing journal manager", ie);
* Set the amount of memory that this stream should use to buffer edits.
* Setting this will only affect future output stream. Streams
* which have currently be created won't be affected.
public void setOutputBufferCapacity(int size) {
conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
* Find the id of the last edit log transaction writen to a edit log
* ledger.
private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
throws IOException, SegmentEmptyException {
LedgerHandle lh = null;
try {
if (fence) {
lh = bkc.openLedger(l.getLedgerId(),
} else {
lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
} catch (BKException bke) {
throw new IOException("Exception opening ledger for " + l, bke);
} catch (InterruptedException ie) {
throw new IOException("Interrupted opening ledger for " + l, ie);
BookKeeperEditLogInputStream in = null;
try {
long lastAddConfirmed = lh.getLastAddConfirmed();
if (lastAddConfirmed == -1) {
throw new SegmentEmptyException();
in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
long endTxId = HdfsConstants.INVALID_TXID;
FSEditLogOp op = in.readOp();
while (op != null) {
if (endTxId == HdfsConstants.INVALID_TXID
|| op.getTransactionId() == endTxId+1) {
endTxId = op.getTransactionId();
op = in.readOp();
return endTxId;
} finally {
if (in != null) {
* Get a list of all segments in the journal.
List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
throws IOException {
List<EditLogLedgerMetadata> ledgers
= new ArrayList<EditLogLedgerMetadata>();
try {
List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
for (String ledgerName : ledgerNames) {
if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) {
String legderMetadataPath = ledgerPath + "/" + ledgerName;
try {
EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
.read(zkc, legderMetadataPath);
} catch (KeeperException.NoNodeException e) {
LOG.warn("ZNode: " + legderMetadataPath
+ " might have finalized and deleted."
+ " So ignoring NoNodeException.");
} catch (KeeperException e) {
throw new IOException("Exception reading ledger list from zk", e);
} catch (InterruptedException ie) {
throw new IOException("Interrupted getting list of ledgers from zk", ie);
Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
return ledgers;
* Get the znode path for a finalize ledger
String finalizedLedgerZNode(long startTxId, long endTxId) {
return String.format("%s/edits_%018d_%018d",
ledgerPath, startTxId, endTxId);
* Get the znode path for the inprogressZNode
String inprogressZNode(long startTxid) {
return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
void setZooKeeper(ZooKeeper zk) {
this.zkc = zk;
* Simple watcher to notify when zookeeper has connected
private class ZkConnectionWatcher implements Watcher {
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected.equals(event.getState())) {
private static class SegmentEmptyException extends IOException {