Apache Hadoop 2.0.0-alpha-rc0.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/tags/release-2.0.0-alpha-rc0@1336256 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7bdd537..17a607a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -83,6 +83,8 @@
HDFS-3298. Add HdfsDataOutputStream as a public API. (szetszwo)
+ HDFS-234. Integration with BookKeeper logging system. (Ivan Kelly via umamahesh)
+
IMPROVEMENTS
HDFS-2018. Move all journal stream management code into one place.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
new file mode 100644
index 0000000..7f67226
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
@@ -0,0 +1,66 @@
+This module provides a BookKeeper backend for HFDS Namenode write
+ahead logging.
+
+BookKeeper is a highly available distributed write ahead logging
+system. For more details, see
+
+ http://zookeeper.apache.org/bookkeeper
+
+-------------------------------------------------------------------------------
+How do I build?
+
+ To generate the distribution packages for BK journal, do the
+ following.
+
+ $ mvn clean package -Pdist
+
+ This will generate a jar with all the dependencies needed by the journal
+ manager,
+
+ target/hadoop-hdfs-bkjournal-<VERSION>.jar
+
+ Note that the -Pdist part of the build command is important, as otherwise
+ the dependencies would not be packaged in the jar.
+
+-------------------------------------------------------------------------------
+How do I use the BookKeeper Journal?
+
+ To run a HDFS namenode using BookKeeper as a backend, copy the bkjournal
+ jar, generated above, into the lib directory of hdfs. In the standard
+ distribution of HDFS, this is at $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/
+
+ cp target/hadoop-hdfs-bkjournal-<VERSION>.jar \
+ $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/
+
+ Then, in hdfs-site.xml, set the following properties.
+
+ <property>
+ <name>dfs.namenode.edits.dir</name>
+ <value>bookkeeper://localhost:2181/bkjournal,file:///path/for/edits</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
+ <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
+ </property>
+
+ In this example, the namenode is configured to use 2 write ahead
+ logging devices. One writes to BookKeeper and the other to a local
+ file system. At the moment is is not possible to only write to
+ BookKeeper, as the resource checker explicitly checked for local
+ disks currently.
+
+ The given example, configures the namenode to look for the journal
+ metadata at the path /bkjournal on the a standalone zookeeper ensemble
+ at localhost:2181. To configure a multiple host zookeeper ensemble,
+ separate the hosts with semicolons. For example, if you have 3
+ zookeeper servers, zk1, zk2 & zk3, each listening on port 2181, you
+ would specify this with
+
+ bookkeeper://zk1:2181;zk2:2181;zk3:2181/bkjournal
+
+ The final part /bkjournal specifies the znode in zookeeper where
+ ledger metadata will be store. Administrators can set this to anything
+ they wish.
+
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
new file mode 100644
index 0000000..c7e2c8c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-project</artifactId>
+ <version>2.0.0-alpha</version>
+ <relativePath>../../../../../hadoop-project</relativePath>
+ </parent>
+
+ <groupId>org.apache.hadoop.contrib</groupId>
+ <artifactId>hadoop-hdfs-bkjournal</artifactId>
+ <version>2.0.0-alpha</version>
+ <description>Apache Hadoop HDFS BookKeeper Journal</description>
+ <name>Apache Hadoop HDFS BookKeeper Journal</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <hadoop.component>hdfs</hadoop.component>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-server</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>dist</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <artifactSet>
+ <includes>
+ <include>org.apache.bookkeeper:bookkeeper-server</include>
+ <include>org.apache.zookeeper:zookeeper</include>
+ <include>org.jboss.netty:netty</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.bookkeeper</pattern>
+ <shadedPattern>hidden.bkjournal.org.apache.bookkeeper</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.zookeeper</pattern>
+ <shadedPattern>hidden.bkjournal.org.apache.zookeeper</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.jboss.netty</pattern>
+ <shadedPattern>hidden.bkjournal.org.jboss.netty</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
new file mode 100644
index 0000000..9d070d9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Input stream which reads from a BookKeeper ledger.
+ */
+class BookKeeperEditLogInputStream extends EditLogInputStream {
+ static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class);
+
+ private final long firstTxId;
+ private final long lastTxId;
+ private final int logVersion;
+ private final LedgerHandle lh;
+
+ private final FSEditLogOp.Reader reader;
+ private final FSEditLogLoader.PositionTrackingInputStream tracker;
+
+ /**
+ * Construct BookKeeper edit log input stream.
+ * Starts reading from the first entry of the ledger.
+ */
+ BookKeeperEditLogInputStream(final LedgerHandle lh,
+ final EditLogLedgerMetadata metadata)
+ throws IOException {
+ this(lh, metadata, 0);
+ }
+
+ /**
+ * Construct BookKeeper edit log input stream.
+ * Starts reading from firstBookKeeperEntry. This allows the stream
+ * to take a shortcut during recovery, as it doesn't have to read
+ * every edit log transaction to find out what the last one is.
+ */
+ BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata,
+ long firstBookKeeperEntry)
+ throws IOException {
+ this.lh = lh;
+ this.firstTxId = metadata.getFirstTxId();
+ this.lastTxId = metadata.getLastTxId();
+ this.logVersion = metadata.getVersion();
+
+ BufferedInputStream bin = new BufferedInputStream(
+ new LedgerInputStream(lh, firstBookKeeperEntry));
+ tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
+ DataInputStream in = new DataInputStream(tracker);
+
+ reader = new FSEditLogOp.Reader(in, logVersion);
+ }
+
+ @Override
+ public long getFirstTxId() throws IOException {
+ return firstTxId;
+ }
+
+ @Override
+ public long getLastTxId() throws IOException {
+ return lastTxId;
+ }
+
+ @Override
+ public int getVersion() throws IOException {
+ return logVersion;
+ }
+
+ @Override
+ protected FSEditLogOp nextOp() throws IOException {
+ return reader.readOp(false);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ lh.close();
+ } catch (Exception e) {
+ throw new IOException("Exception closing ledger", e);
+ }
+ }
+
+ @Override
+ public long getPosition() {
+ return tracker.getPos();
+ }
+
+ @Override
+ public long length() throws IOException {
+ return lh.getLength();
+ }
+
+ @Override
+ public String getName() {
+ return String.format("BookKeeper[%s,first=%d,last=%d]",
+ lh.toString(), firstTxId, lastTxId);
+ }
+
+ // TODO(HA): Test this.
+ @Override
+ public boolean isInProgress() {
+ return true;
+ }
+
+ /**
+ * Input stream implementation which can be used by
+ * FSEditLogOp.Reader
+ */
+ private static class LedgerInputStream extends InputStream {
+ private long readEntries;
+ private InputStream entryStream = null;
+ private final LedgerHandle lh;
+ private final long maxEntry;
+
+ /**
+ * Construct ledger input stream
+ * @param lh the ledger handle to read from
+ * @param firstBookKeeperEntry ledger entry to start reading from
+ */
+ LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry)
+ throws IOException {
+ this.lh = lh;
+ readEntries = firstBookKeeperEntry;
+ try {
+ maxEntry = lh.getLastAddConfirmed();
+ } catch (Exception e) {
+ throw new IOException("Error reading last entry id", e);
+ }
+ }
+
+ /**
+ * Get input stream representing next entry in the
+ * ledger.
+ * @return input stream, or null if no more entries
+ */
+ private InputStream nextStream() throws IOException {
+ try {
+ if (readEntries > maxEntry) {
+ return null;
+ }
+ Enumeration<LedgerEntry> entries
+ = lh.readEntries(readEntries, readEntries);
+ readEntries++;
+ if (entries.hasMoreElements()) {
+ LedgerEntry e = entries.nextElement();
+ assert !entries.hasMoreElements();
+ return e.getEntryInputStream();
+ }
+ } catch (Exception e) {
+ throw new IOException("Error reading entries from bookkeeper", e);
+ }
+ return null;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ if (read(b, 0, 1) != 1) {
+ return -1;
+ } else {
+ return b[0];
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ try {
+ int read = 0;
+ if (entryStream == null) {
+ entryStream = nextStream();
+ if (entryStream == null) {
+ return read;
+ }
+ }
+
+ while (read < len) {
+ int thisread = entryStream.read(b, off+read, (len-read));
+ if (thisread == -1) {
+ entryStream = nextStream();
+ if (entryStream == null) {
+ return read;
+ }
+ } else {
+ read += thisread;
+ }
+ }
+ return read;
+ } catch (IOException e) {
+ throw e;
+ }
+
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
new file mode 100644
index 0000000..ddbe0b6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
+
+import java.util.Arrays;
+
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
+
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.io.DataOutputBuffer;
+import java.io.IOException;
+
+/**
+ * Output stream for BookKeeper Journal.
+ * Multiple complete edit log entries are packed into a single bookkeeper
+ * entry before sending it over the network. The fact that the edit log entries
+ * are complete in the bookkeeper entries means that each bookkeeper log entry
+ *can be read as a complete edit log. This is useful for recover, as we don't
+ * need to read through the entire edit log segment to get the last written
+ * entry.
+ */
+class BookKeeperEditLogOutputStream
+ extends EditLogOutputStream implements AddCallback {
+ private final DataOutputBuffer bufCurrent;
+ private final AtomicInteger outstandingRequests;
+ private final int transmissionThreshold;
+ private final LedgerHandle lh;
+ private CountDownLatch syncLatch;
+ private final WriteLock wl;
+ private final Writer writer;
+
+ /**
+ * Construct an edit log output stream which writes to a ledger.
+
+ */
+ protected BookKeeperEditLogOutputStream(Configuration conf,
+ LedgerHandle lh, WriteLock wl)
+ throws IOException {
+ super();
+
+ bufCurrent = new DataOutputBuffer();
+ outstandingRequests = new AtomicInteger(0);
+ syncLatch = null;
+ this.lh = lh;
+ this.wl = wl;
+ this.wl.acquire();
+ this.writer = new Writer(bufCurrent);
+ this.transmissionThreshold
+ = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
+ BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE_DEFAULT);
+ }
+
+ @Override
+ public void create() throws IOException {
+ // noop
+ }
+
+ @Override
+ public void close() throws IOException {
+ setReadyToFlush();
+ flushAndSync();
+ try {
+ lh.close();
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted waiting on close", ie);
+ } catch (BKException bke) {
+ throw new IOException("BookKeeper error during close", bke);
+ }
+ }
+
+ @Override
+ public void abort() throws IOException {
+ try {
+ lh.close();
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted waiting on close", ie);
+ } catch (BKException bke) {
+ throw new IOException("BookKeeper error during abort", bke);
+ }
+
+ wl.release();
+ }
+
+ @Override
+ public void writeRaw(final byte[] data, int off, int len) throws IOException {
+ throw new IOException("Not supported for BK");
+ }
+
+ @Override
+ public void write(FSEditLogOp op) throws IOException {
+ wl.checkWriteLock();
+
+ writer.writeOp(op);
+
+ if (bufCurrent.getLength() > transmissionThreshold) {
+ transmit();
+ }
+ }
+
+ @Override
+ public void setReadyToFlush() throws IOException {
+ wl.checkWriteLock();
+
+ transmit();
+
+ synchronized(this) {
+ syncLatch = new CountDownLatch(outstandingRequests.get());
+ }
+ }
+
+ @Override
+ public void flushAndSync() throws IOException {
+ wl.checkWriteLock();
+
+ assert(syncLatch != null);
+ try {
+ syncLatch.await();
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted waiting on latch", ie);
+ }
+
+ syncLatch = null;
+ // wait for whatever we wait on
+ }
+
+ /**
+ * Transmit the current buffer to bookkeeper.
+ * Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
+ * are never called at the same time.
+ */
+ private void transmit() throws IOException {
+ wl.checkWriteLock();
+
+ if (bufCurrent.getLength() > 0) {
+ byte[] entry = Arrays.copyOf(bufCurrent.getData(),
+ bufCurrent.getLength());
+ lh.asyncAddEntry(entry, this, null);
+ bufCurrent.reset();
+ outstandingRequests.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void addComplete(int rc, LedgerHandle handle,
+ long entryId, Object ctx) {
+ synchronized(this) {
+ outstandingRequests.decrementAndGet();
+ CountDownLatch l = syncLatch;
+ if (l != null) {
+ l.countDown();
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
new file mode 100644
index 0000000..047efd5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * BookKeeper Journal Manager
+ *
+ * To use, add the following to hdfs-site.xml.
+ * <pre>
+ * {@code
+ * <property>
+ * <name>dfs.namenode.edits.dir</name>
+ * <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
+ * </property>
+ *
+ * <property>
+ * <name>dfs.namenode.edits.journalPlugin.bookkeeper</name>
+ * <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
+ * </property>
+ * }
+ * </pre>
+ * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
+ * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
+ * pairs. In the example above there are 3 servers, in the ensemble,
+ * zk1, zk2 & zk3, each one listening on port 2181.
+ *
+ * [root znode] is the path of the zookeeper znode, under which the editlog
+ * information will be stored.
+ *
+ * Other configuration options are:
+ * <ul>
+ * <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
+ * Number of bytes a bookkeeper journal stream will buffer before
+ * forcing a flush. Default is 1024.</li>
+ * <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
+ * Number of bookkeeper servers in edit log ledger ensembles. This
+ * is the number of bookkeeper servers which need to be available
+ * for the ledger to be writable. Default is 3.</li>
+ * <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
+ * Number of bookkeeper servers in the write quorum. This is the
+ * number of bookkeeper servers which must have acknowledged the
+ * write of an entry before it is considered written.
+ * Default is 2.</li>
+ * <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
+ * Password to use when creating ledgers. </li>
+ * </ul>
+ */
+public class BookKeeperJournalManager implements JournalManager {
+ static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
+
+ public static final String BKJM_OUTPUT_BUFFER_SIZE
+ = "dfs.namenode.bookkeeperjournal.output-buffer-size";
+ public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
+
+ public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
+ = "dfs.namenode.bookkeeperjournal.ensemble-size";
+ public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
+
+ public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
+ = "dfs.namenode.bookkeeperjournal.quorum-size";
+ public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
+
+ public static final String BKJM_BOOKKEEPER_DIGEST_PW
+ = "dfs.namenode.bookkeeperjournal.digestPw";
+ public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
+
+ private static final int BKJM_LAYOUT_VERSION = -1;
+
+ private final ZooKeeper zkc;
+ private final Configuration conf;
+ private final BookKeeper bkc;
+ private final WriteLock wl;
+ private final String ledgerPath;
+ private final MaxTxId maxTxId;
+ private final int ensembleSize;
+ private final int quorumSize;
+ private final String digestpw;
+ private final CountDownLatch zkConnectLatch;
+
+ private LedgerHandle currentLedger = null;
+
+ private int bytesToInt(byte[] b) {
+ assert b.length >= 4;
+ return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
+ }
+
+ private byte[] intToBytes(int i) {
+ return new byte[] {
+ (byte)(i >> 24),
+ (byte)(i >> 16),
+ (byte)(i >> 8),
+ (byte)(i) };
+ }
+
+ /**
+ * Construct a Bookkeeper journal manager.
+ */
+ public BookKeeperJournalManager(Configuration conf, URI uri)
+ throws IOException {
+ this.conf = conf;
+ String zkConnect = uri.getAuthority().replace(";", ",");
+ String zkPath = uri.getPath();
+ ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+ BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
+ quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
+ BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
+
+ ledgerPath = zkPath + "/ledgers";
+ String maxTxIdPath = zkPath + "/maxtxid";
+ String lockPath = zkPath + "/lock";
+ String versionPath = zkPath + "/version";
+ digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
+ BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
+
+ try {
+ zkConnectLatch = new CountDownLatch(1);
+ zkc = new ZooKeeper(zkConnect, 3000, new ZkConnectionWatcher());
+ if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
+ throw new IOException("Error connecting to zookeeper");
+ }
+ if (zkc.exists(zkPath, false) == null) {
+ zkc.create(zkPath, new byte[] {'0'},
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ Stat versionStat = zkc.exists(versionPath, false);
+ if (versionStat != null) {
+ byte[] d = zkc.getData(versionPath, false, versionStat);
+ // There's only one version at the moment
+ assert bytesToInt(d) == BKJM_LAYOUT_VERSION;
+ } else {
+ zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ if (zkc.exists(ledgerPath, false) == null) {
+ zkc.create(ledgerPath, new byte[] {'0'},
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ bkc = new BookKeeper(new ClientConfiguration(),
+ zkc);
+ } catch (Exception e) {
+ throw new IOException("Error initializing zk", e);
+ }
+
+ wl = new WriteLock(zkc, lockPath);
+ maxTxId = new MaxTxId(zkc, maxTxIdPath);
+ }
+
+ /**
+ * Start a new log segment in a BookKeeper ledger.
+ * First ensure that we have the write lock for this journal.
+ * Then create a ledger and stream based on that ledger.
+ * The ledger id is written to the inprogress znode, so that in the
+ * case of a crash, a recovery process can find the ledger we were writing
+ * to when we crashed.
+ * @param txId First transaction id to be written to the stream
+ */
+ @Override
+ public EditLogOutputStream startLogSegment(long txId) throws IOException {
+ wl.acquire();
+
+ if (txId <= maxTxId.get()) {
+ throw new IOException("We've already seen " + txId
+ + ". A new stream cannot be created with it");
+ }
+ if (currentLedger != null) {
+ throw new IOException("Already writing to a ledger, id="
+ + currentLedger.getId());
+ }
+ try {
+ currentLedger = bkc.createLedger(ensembleSize, quorumSize,
+ BookKeeper.DigestType.MAC,
+ digestpw.getBytes());
+ String znodePath = inprogressZNode();
+ EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
+ HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId);
+ /* Write the ledger metadata out to the inprogress ledger znode
+ * This can fail if for some reason our write lock has
+ * expired (@see WriteLock) and another process has managed to
+ * create the inprogress znode.
+ * In this case, throw an exception. We don't want to continue
+ * as this would lead to a split brain situation.
+ */
+ l.write(zkc, znodePath);
+
+ return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
+ } catch (Exception e) {
+ if (currentLedger != null) {
+ try {
+ currentLedger.close();
+ } catch (Exception e2) {
+ //log & ignore, an IOException will be thrown soon
+ LOG.error("Error closing ledger", e2);
+ }
+ }
+ throw new IOException("Error creating ledger", e);
+ }
+ }
+
+ /**
+ * Finalize a log segment. If the journal manager is currently
+ * writing to a ledger, ensure that this is the ledger of the log segment
+ * being finalized.
+ *
+ * Otherwise this is the recovery case. In the recovery case, ensure that
+ * the firstTxId of the ledger matches firstTxId for the segment we are
+ * trying to finalize.
+ */
+ @Override
+ public void finalizeLogSegment(long firstTxId, long lastTxId)
+ throws IOException {
+ String inprogressPath = inprogressZNode();
+ try {
+ Stat inprogressStat = zkc.exists(inprogressPath, false);
+ if (inprogressStat == null) {
+ throw new IOException("Inprogress znode " + inprogressPath
+ + " doesn't exist");
+ }
+
+ wl.checkWriteLock();
+ EditLogLedgerMetadata l
+ = EditLogLedgerMetadata.read(zkc, inprogressPath);
+
+ if (currentLedger != null) { // normal, non-recovery case
+ if (l.getLedgerId() == currentLedger.getId()) {
+ try {
+ currentLedger.close();
+ } catch (BKException bke) {
+ LOG.error("Error closing current ledger", bke);
+ }
+ currentLedger = null;
+ } else {
+ throw new IOException(
+ "Active ledger has different ID to inprogress. "
+ + l.getLedgerId() + " found, "
+ + currentLedger.getId() + " expected");
+ }
+ }
+
+ if (l.getFirstTxId() != firstTxId) {
+ throw new IOException("Transaction id not as expected, "
+ + l.getFirstTxId() + " found, " + firstTxId + " expected");
+ }
+
+ l.finalizeLedger(lastTxId);
+ String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
+ try {
+ l.write(zkc, finalisedPath);
+ } catch (KeeperException.NodeExistsException nee) {
+ if (!l.verify(zkc, finalisedPath)) {
+ throw new IOException("Node " + finalisedPath + " already exists"
+ + " but data doesn't match");
+ }
+ }
+ maxTxId.store(lastTxId);
+ zkc.delete(inprogressPath, inprogressStat.getVersion());
+ } catch (KeeperException e) {
+ throw new IOException("Error finalising ledger", e);
+ } catch (InterruptedException ie) {
+ throw new IOException("Error finalising ledger", ie);
+ } finally {
+ wl.release();
+ }
+ }
+
+ // TODO(HA): Handle inProgressOk
+ @Override
+ public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
+ throws IOException {
+ for (EditLogLedgerMetadata l : getLedgerList()) {
+ if (l.getFirstTxId() == fromTxnId) {
+ try {
+ LedgerHandle h = bkc.openLedger(l.getLedgerId(),
+ BookKeeper.DigestType.MAC,
+ digestpw.getBytes());
+ return new BookKeeperEditLogInputStream(h, l);
+ } catch (Exception e) {
+ throw new IOException("Could not open ledger for " + fromTxnId, e);
+ }
+ }
+ }
+ throw new IOException("No ledger for fromTxnId " + fromTxnId + " found.");
+ }
+
+ // TODO(HA): Handle inProgressOk
+ @Override
+ public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
+ throws IOException {
+ long count = 0;
+ long expectedStart = 0;
+ for (EditLogLedgerMetadata l : getLedgerList()) {
+ if (l.isInProgress()) {
+ long endTxId = recoverLastTxId(l);
+ if (endTxId == HdfsConstants.INVALID_TXID) {
+ break;
+ }
+ count += (endTxId - l.getFirstTxId()) + 1;
+ break;
+ }
+
+ if (l.getFirstTxId() < fromTxnId) {
+ continue;
+ } else if (l.getFirstTxId() == fromTxnId) {
+ count = (l.getLastTxId() - l.getFirstTxId()) + 1;
+ expectedStart = l.getLastTxId() + 1;
+ } else {
+ if (expectedStart != l.getFirstTxId()) {
+ if (count == 0) {
+ throw new CorruptionException("StartTxId " + l.getFirstTxId()
+ + " is not as expected " + expectedStart
+ + ". Gap in transaction log?");
+ } else {
+ break;
+ }
+ }
+ count += (l.getLastTxId() - l.getFirstTxId()) + 1;
+ expectedStart = l.getLastTxId() + 1;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public void recoverUnfinalizedSegments() throws IOException {
+ wl.acquire();
+
+ synchronized (this) {
+ try {
+ EditLogLedgerMetadata l
+ = EditLogLedgerMetadata.read(zkc, inprogressZNode());
+ long endTxId = recoverLastTxId(l);
+ if (endTxId == HdfsConstants.INVALID_TXID) {
+ LOG.error("Unrecoverable corruption has occurred in segment "
+ + l.toString() + " at path " + inprogressZNode()
+ + ". Unable to continue recovery.");
+ throw new IOException("Unrecoverable corruption, please check logs.");
+ }
+ finalizeLogSegment(l.getFirstTxId(), endTxId);
+ } catch (KeeperException.NoNodeException nne) {
+ // nothing to recover, ignore
+ } finally {
+ if (wl.haveLock()) {
+ wl.release();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void purgeLogsOlderThan(long minTxIdToKeep)
+ throws IOException {
+ for (EditLogLedgerMetadata l : getLedgerList()) {
+ if (!l.isInProgress()
+ && l.getLastTxId() < minTxIdToKeep) {
+ try {
+ Stat stat = zkc.exists(l.getZkPath(), false);
+ zkc.delete(l.getZkPath(), stat.getVersion());
+ bkc.deleteLedger(l.getLedgerId());
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while purging " + l, ie);
+ } catch (BKException bke) {
+ LOG.error("Couldn't delete ledger from bookkeeper", bke);
+ } catch (KeeperException ke) {
+ LOG.error("Error deleting ledger entry in zookeeper", ke);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ bkc.close();
+ zkc.close();
+ } catch (Exception e) {
+ throw new IOException("Couldn't close zookeeper client", e);
+ }
+ }
+
+ /**
+ * Set the amount of memory that this stream should use to buffer edits.
+ * Setting this will only affect future output stream. Streams
+ * which have currently be created won't be affected.
+ */
+ @Override
+ public void setOutputBufferCapacity(int size) {
+ conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
+ }
+
+ /**
+ * Find the id of the last edit log transaction writen to a edit log
+ * ledger.
+ */
+ private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException {
+ try {
+ LedgerHandle lh = bkc.openLedger(l.getLedgerId(),
+ BookKeeper.DigestType.MAC,
+ digestpw.getBytes());
+ long lastAddConfirmed = lh.getLastAddConfirmed();
+ BookKeeperEditLogInputStream in
+ = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
+
+ long endTxId = HdfsConstants.INVALID_TXID;
+ FSEditLogOp op = in.readOp();
+ while (op != null) {
+ if (endTxId == HdfsConstants.INVALID_TXID
+ || op.getTransactionId() == endTxId+1) {
+ endTxId = op.getTransactionId();
+ }
+ op = in.readOp();
+ }
+ return endTxId;
+ } catch (Exception e) {
+ throw new IOException("Exception retreiving last tx id for ledger " + l,
+ e);
+ }
+ }
+
+ /**
+ * Get a list of all segments in the journal.
+ */
+ private List<EditLogLedgerMetadata> getLedgerList() throws IOException {
+ List<EditLogLedgerMetadata> ledgers
+ = new ArrayList<EditLogLedgerMetadata>();
+ try {
+ List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
+ for (String n : ledgerNames) {
+ ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
+ }
+ } catch (Exception e) {
+ throw new IOException("Exception reading ledger list from zk", e);
+ }
+
+ Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
+ return ledgers;
+ }
+
+ /**
+ * Get the znode path for a finalize ledger
+ */
+ String finalizedLedgerZNode(long startTxId, long endTxId) {
+ return String.format("%s/edits_%018d_%018d",
+ ledgerPath, startTxId, endTxId);
+ }
+
+ /**
+ * Get the znode path for the inprogressZNode
+ */
+ String inprogressZNode() {
+ return ledgerPath + "/inprogress";
+ }
+
+ /**
+ * Simple watcher to notify when zookeeper has connected
+ */
+ private class ZkConnectionWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ if (Event.KeeperState.SyncConnected.equals(event.getState())) {
+ zkConnectLatch.countDown();
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
new file mode 100644
index 0000000..9ae5cdd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.io.IOException;
+import java.util.Comparator;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class for storing the metadata associated
+ * with a single edit log segment, stored in a single ledger
+ */
+public class EditLogLedgerMetadata {
+ static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class);
+
+ private String zkPath;
+ private final long ledgerId;
+ private final int version;
+ private final long firstTxId;
+ private long lastTxId;
+ private boolean inprogress;
+
+ public static final Comparator COMPARATOR
+ = new Comparator<EditLogLedgerMetadata>() {
+ public int compare(EditLogLedgerMetadata o1,
+ EditLogLedgerMetadata o2) {
+ if (o1.firstTxId < o2.firstTxId) {
+ return -1;
+ } else if (o1.firstTxId == o2.firstTxId) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ };
+
+ EditLogLedgerMetadata(String zkPath, int version,
+ long ledgerId, long firstTxId) {
+ this.zkPath = zkPath;
+ this.ledgerId = ledgerId;
+ this.version = version;
+ this.firstTxId = firstTxId;
+ this.lastTxId = HdfsConstants.INVALID_TXID;
+ this.inprogress = true;
+ }
+
+ EditLogLedgerMetadata(String zkPath, int version, long ledgerId,
+ long firstTxId, long lastTxId) {
+ this.zkPath = zkPath;
+ this.ledgerId = ledgerId;
+ this.version = version;
+ this.firstTxId = firstTxId;
+ this.lastTxId = lastTxId;
+ this.inprogress = false;
+ }
+
+ String getZkPath() {
+ return zkPath;
+ }
+
+ long getFirstTxId() {
+ return firstTxId;
+ }
+
+ long getLastTxId() {
+ return lastTxId;
+ }
+
+ long getLedgerId() {
+ return ledgerId;
+ }
+
+ int getVersion() {
+ return version;
+ }
+
+ boolean isInProgress() {
+ return this.inprogress;
+ }
+
+ void finalizeLedger(long newLastTxId) {
+ assert this.lastTxId == HdfsConstants.INVALID_TXID;
+ this.lastTxId = newLastTxId;
+ this.inprogress = false;
+ }
+
+ static EditLogLedgerMetadata read(ZooKeeper zkc, String path)
+ throws IOException, KeeperException.NoNodeException {
+ try {
+ byte[] data = zkc.getData(path, false, null);
+ String[] parts = new String(data).split(";");
+ if (parts.length == 3) {
+ int version = Integer.valueOf(parts[0]);
+ long ledgerId = Long.valueOf(parts[1]);
+ long txId = Long.valueOf(parts[2]);
+ return new EditLogLedgerMetadata(path, version, ledgerId, txId);
+ } else if (parts.length == 4) {
+ int version = Integer.valueOf(parts[0]);
+ long ledgerId = Long.valueOf(parts[1]);
+ long firstTxId = Long.valueOf(parts[2]);
+ long lastTxId = Long.valueOf(parts[3]);
+ return new EditLogLedgerMetadata(path, version, ledgerId,
+ firstTxId, lastTxId);
+ } else {
+ throw new IOException("Invalid ledger entry, "
+ + new String(data));
+ }
+ } catch(KeeperException.NoNodeException nne) {
+ throw nne;
+ } catch(Exception e) {
+ throw new IOException("Error reading from zookeeper", e);
+ }
+ }
+
+ void write(ZooKeeper zkc, String path)
+ throws IOException, KeeperException.NodeExistsException {
+ this.zkPath = path;
+ String finalisedData;
+ if (inprogress) {
+ finalisedData = String.format("%d;%d;%d",
+ version, ledgerId, firstTxId);
+ } else {
+ finalisedData = String.format("%d;%d;%d;%d",
+ version, ledgerId, firstTxId, lastTxId);
+ }
+ try {
+ zkc.create(path, finalisedData.getBytes(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException nee) {
+ throw nee;
+ } catch (Exception e) {
+ throw new IOException("Error creating ledger znode");
+ }
+ }
+
+ boolean verify(ZooKeeper zkc, String path) {
+ try {
+ EditLogLedgerMetadata other = read(zkc, path);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Verifying " + this.toString()
+ + " against " + other);
+ }
+ return other == this;
+ } catch (Exception e) {
+ LOG.error("Couldn't verify data in " + path, e);
+ return false;
+ }
+ }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof EditLogLedgerMetadata)) {
+ return false;
+ }
+ EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o;
+ return ledgerId == ol.ledgerId
+ && firstTxId == ol.firstTxId
+ && lastTxId == ol.lastTxId
+ && version == ol.version;
+ }
+
+ public int hashCode() {
+ int hash = 1;
+ hash = hash * 31 + (int)ledgerId;
+ hash = hash * 31 + (int)firstTxId;
+ hash = hash * 31 + (int)lastTxId;
+ hash = hash * 31 + (int)version;
+ return hash;
+ }
+
+ public String toString() {
+ return "[LedgerId:"+ledgerId +
+ ", firstTxId:" + firstTxId +
+ ", lastTxId:" + lastTxId +
+ ", version:" + version + "]";
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
new file mode 100644
index 0000000..f272409
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.io.IOException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class for storing and reading
+ * the max seen txid in zookeeper
+ */
+class MaxTxId {
+ static final Log LOG = LogFactory.getLog(MaxTxId.class);
+
+ private final ZooKeeper zkc;
+ private final String path;
+
+ private Stat currentStat;
+
+ MaxTxId(ZooKeeper zkc, String path) {
+ this.zkc = zkc;
+ this.path = path;
+ }
+
+ synchronized void store(long maxTxId) throws IOException {
+ long currentMax = get();
+ if (currentMax < maxTxId) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Setting maxTxId to " + maxTxId);
+ }
+ String txidStr = Long.toString(maxTxId);
+ try {
+ if (currentStat != null) {
+ currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"),
+ currentStat.getVersion());
+ } else {
+ zkc.create(path, txidStr.getBytes("UTF-8"),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (Exception e) {
+ throw new IOException("Error writing max tx id", e);
+ }
+ }
+ }
+
+ synchronized long get() throws IOException {
+ try {
+ currentStat = zkc.exists(path, false);
+ if (currentStat == null) {
+ return 0;
+ } else {
+ byte[] bytes = zkc.getData(path, false, currentStat);
+ String txidString = new String(bytes, "UTF-8");
+ return Long.valueOf(txidString);
+ }
+ } catch (Exception e) {
+ throw new IOException("Error reading the max tx id from zk", e);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java
new file mode 100644
index 0000000..67743b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.util.Collections;
+import java.util.Comparator;
+
+import java.net.InetAddress;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Distributed lock, using ZooKeeper.
+ *
+ * The lock is vulnerable to timing issues. For example, the process could
+ * encounter a really long GC cycle between acquiring the lock, and writing to
+ * a ledger. This could have timed out the lock, and another process could have
+ * acquired the lock and started writing to bookkeeper. Therefore other
+ * mechanisms are required to ensure correctness (i.e. Fencing).
+ */
+class WriteLock implements Watcher {
+ static final Log LOG = LogFactory.getLog(WriteLock.class);
+
+ private final ZooKeeper zkc;
+ private final String lockpath;
+
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private String myznode = null;
+
+ WriteLock(ZooKeeper zkc, String lockpath) throws IOException {
+ this.lockpath = lockpath;
+
+ this.zkc = zkc;
+ try {
+ if (zkc.exists(lockpath, false) == null) {
+ String localString = InetAddress.getLocalHost().toString();
+ zkc.create(lockpath, localString.getBytes(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (Exception e) {
+ throw new IOException("Exception accessing Zookeeper", e);
+ }
+ }
+
+ void acquire() throws IOException {
+ while (true) {
+ if (lockCount.get() == 0) {
+ try {
+ synchronized(this) {
+ if (lockCount.get() > 0) {
+ lockCount.incrementAndGet();
+ return;
+ }
+ myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'},
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL_SEQUENTIAL);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Acquiring lock, trying " + myznode);
+ }
+
+ List<String> nodes = zkc.getChildren(lockpath, false);
+ Collections.sort(nodes, new Comparator<String>() {
+ public int compare(String o1,
+ String o2) {
+ Integer l1 = Integer.valueOf(o1.replace("lock-", ""));
+ Integer l2 = Integer.valueOf(o2.replace("lock-", ""));
+ return l1 - l2;
+ }
+ });
+ if ((lockpath + "/" + nodes.get(0)).equals(myznode)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Lock acquired - " + myznode);
+ }
+ lockCount.set(1);
+ zkc.exists(myznode, this);
+ return;
+ } else {
+ LOG.error("Failed to acquire lock with " + myznode
+ + ", " + nodes.get(0) + " already has it");
+ throw new IOException("Could not acquire lock");
+ }
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception accessing Zookeeper", e);
+ } catch (InterruptedException ie) {
+ throw new IOException("Exception accessing Zookeeper", ie);
+ }
+ } else {
+ int ret = lockCount.getAndIncrement();
+ if (ret == 0) {
+ lockCount.decrementAndGet();
+ continue; // try again;
+ } else {
+ return;
+ }
+ }
+ }
+ }
+
+ void release() throws IOException {
+ try {
+ if (lockCount.decrementAndGet() <= 0) {
+ if (lockCount.get() < 0) {
+ LOG.warn("Unbalanced lock handling somewhere, lockCount down to "
+ + lockCount.get());
+ }
+ synchronized(this) {
+ if (lockCount.get() <= 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("releasing lock " + myznode);
+ }
+ if (myznode != null) {
+ zkc.delete(myznode, -1);
+ myznode = null;
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Exception accessing Zookeeper", e);
+ }
+ }
+
+ public void checkWriteLock() throws IOException {
+ if (!haveLock()) {
+ throw new IOException("Lost writer lock");
+ }
+ }
+
+ boolean haveLock() throws IOException {
+ return lockCount.get() > 0;
+ }
+
+ public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.Disconnected
+ || event.getState() == KeeperState.Expired) {
+ LOG.warn("Lost zookeeper session, lost lock ");
+ lockCount.set(0);
+ } else {
+ // reapply the watch
+ synchronized (this) {
+ LOG.info("Zookeeper event " + event
+ + " received, reapplying watch to " + myznode);
+ if (myznode != null) {
+ try {
+ zkc.exists(myznode, this);
+ } catch (Exception e) {
+ LOG.warn("Could not set watch on lock, releasing", e);
+ try {
+ release();
+ } catch (IOException ioe) {
+ LOG.error("Could not release Zk lock", ioe);
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
new file mode 100644
index 0000000..5937fa8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.util.LocalBookKeeper;
+
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.SecurityUtil;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestBookKeeperJournalManager {
+ static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
+
+ private static final long DEFAULT_SEGMENT_SIZE = 1000;
+ private static final String zkEnsemble = "localhost:2181";
+
+ private static Thread bkthread;
+ protected static Configuration conf = new Configuration();
+ private ZooKeeper zkc;
+
+ private static ZooKeeper connectZooKeeper(String ensemble)
+ throws IOException, KeeperException, InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
+ public void process(WatchedEvent event) {
+ if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
+ latch.countDown();
+ }
+ }
+ });
+ if (!latch.await(3, TimeUnit.SECONDS)) {
+ throw new IOException("Zookeeper took too long to connect");
+ }
+ return zkc;
+ }
+
+ @BeforeClass
+ public static void setupBookkeeper() throws Exception {
+ final int numBookies = 5;
+ bkthread = new Thread() {
+ public void run() {
+ try {
+ String[] args = new String[1];
+ args[0] = String.valueOf(numBookies);
+ LOG.info("Starting bk");
+ LocalBookKeeper.main(args);
+ } catch (InterruptedException e) {
+ // go away quietly
+ } catch (Exception e) {
+ LOG.error("Error starting local bk", e);
+ }
+ }
+ };
+ bkthread.start();
+
+ if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
+ throw new Exception("Error starting zookeeper/bookkeeper");
+ }
+
+ ZooKeeper zkc = connectZooKeeper(zkEnsemble);
+ try {
+ boolean up = false;
+ for (int i = 0; i < 10; i++) {
+ try {
+ List<String> children = zkc.getChildren("/ledgers/available",
+ false);
+ if (children.size() == numBookies) {
+ up = true;
+ break;
+ }
+ } catch (KeeperException e) {
+ // ignore
+ }
+ Thread.sleep(1000);
+ }
+ if (!up) {
+ throw new IOException("Not enough bookies started");
+ }
+ } finally {
+ zkc.close();
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ zkc = connectZooKeeper(zkEnsemble);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ zkc.close();
+ }
+
+ @AfterClass
+ public static void teardownBookkeeper() throws Exception {
+ if (bkthread != null) {
+ bkthread.interrupt();
+ bkthread.join();
+ }
+ }
+
+ @Test
+ public void testSimpleWrite() throws Exception {
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
+ long txid = 1;
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1 ; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, 100);
+
+ String zkpath = bkjm.finalizedLedgerZNode(1, 100);
+
+ assertNotNull(zkc.exists(zkpath, false));
+ assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+ }
+
+ @Test
+ public void testNumberOfTransactions() throws Exception {
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
+ long txid = 1;
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1 ; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, 100);
+
+ long numTrans = bkjm.getNumberOfTransactions(1, true);
+ assertEquals(100, numTrans);
+ }
+
+ @Test
+ public void testNumberOfTransactionsWithGaps() throws Exception {
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
+ long txid = 1;
+ for (long i = 0; i < 3; i++) {
+ long start = txid;
+ EditLogOutputStream out = bkjm.startLogSegment(start);
+ for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(start, txid-1);
+ assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
+ }
+ zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
+
+ long numTrans = bkjm.getNumberOfTransactions(1, true);
+ assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
+
+ try {
+ numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true);
+ fail("Should have thrown corruption exception by this point");
+ } catch (JournalManager.CorruptionException ce) {
+ // if we get here, everything is going good
+ }
+
+ numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true);
+ assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
+ }
+
+ @Test
+ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
+ long txid = 1;
+ for (long i = 0; i < 3; i++) {
+ long start = txid;
+ EditLogOutputStream out = bkjm.startLogSegment(start);
+ for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+
+ out.close();
+ bkjm.finalizeLogSegment(start, (txid-1));
+ assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
+ }
+ long start = txid;
+ EditLogOutputStream out = bkjm.startLogSegment(start);
+ for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+ out.setReadyToFlush();
+ out.flush();
+ out.abort();
+ out.close();
+
+ long numTrans = bkjm.getNumberOfTransactions(1, true);
+ assertEquals((txid-1), numTrans);
+ }
+
+ /**
+ * Create a bkjm namespace, write a journal from txid 1, close stream.
+ * Try to create a new journal from txid 1. Should throw an exception.
+ */
+ @Test
+ public void testWriteRestartFrom1() throws Exception {
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
+ long txid = 1;
+ long start = txid;
+ EditLogOutputStream out = bkjm.startLogSegment(txid);
+ for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(start, (txid-1));
+
+ txid = 1;
+ try {
+ out = bkjm.startLogSegment(txid);
+ fail("Shouldn't be able to start another journal from " + txid
+ + " when one already exists");
+ } catch (Exception ioe) {
+ LOG.info("Caught exception as expected", ioe);
+ }
+
+ // test border case
+ txid = DEFAULT_SEGMENT_SIZE;
+ try {
+ out = bkjm.startLogSegment(txid);
+ fail("Shouldn't be able to start another journal from " + txid
+ + " when one already exists");
+ } catch (IOException ioe) {
+ LOG.info("Caught exception as expected", ioe);
+ }
+
+ // open journal continuing from before
+ txid = DEFAULT_SEGMENT_SIZE + 1;
+ start = txid;
+ out = bkjm.startLogSegment(start);
+ assertNotNull(out);
+
+ for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(start, (txid-1));
+
+ // open journal arbitarily far in the future
+ txid = DEFAULT_SEGMENT_SIZE * 4;
+ out = bkjm.startLogSegment(txid);
+ assertNotNull(out);
+ }
+
+ @Test
+ public void testTwoWriters() throws Exception {
+ long start = 1;
+ BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
+ BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
+
+ EditLogOutputStream out1 = bkjm1.startLogSegment(start);
+ try {
+ EditLogOutputStream out2 = bkjm2.startLogSegment(start);
+ fail("Shouldn't have been able to open the second writer");
+ } catch (IOException ioe) {
+ LOG.info("Caught exception as expected", ioe);
+ }
+ }
+
+ @Test
+ public void testSimpleRead() throws Exception {
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
+ long txid = 1;
+ final long numTransactions = 10000;
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1 ; i <= numTransactions; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, numTransactions);
+
+
+ EditLogInputStream in = bkjm.getInputStream(1, true);
+ try {
+ assertEquals(numTransactions,
+ FSEditLogTestUtil.countTransactionsInStream(in));
+ } finally {
+ in.close();
+ }
+ }
+
+ @Test
+ public void testSimpleRecovery() throws Exception {
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ long txid = 1;
+ for (long i = 1 ; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.setReadyToFlush();
+ out.flush();
+
+ out.abort();
+ out.close();
+
+
+ assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
+ assertNotNull(zkc.exists(bkjm.inprogressZNode(), false));
+
+ bkjm.recoverUnfinalizedSegments();
+
+ assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
+ assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
new file mode 100644
index 0000000..41f0292
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
+
+/**
+ * Utilities for testing edit logs
+ */
+public class FSEditLogTestUtil {
+ private static OpInstanceCache cache = new OpInstanceCache();
+
+ public static FSEditLogOp getNoOpInstance() {
+ return FSEditLogOp.LogSegmentOp.getInstance(cache,
+ FSEditLogOpCodes.OP_END_LOG_SEGMENT);
+ }
+
+ public static long countTransactionsInStream(EditLogInputStream in)
+ throws IOException {
+ FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
+ return validation.getNumTransactions();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8a6b217
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
@@ -0,0 +1,62 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+#
+# Bookkeeper Journal Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only
+log4j.rootLogger=OFF, CONSOLE
+
+# Example with rolling log file
+#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
+
+# Example with rolling log file and tracing
+#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=hdfs-namenode.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+
+
diff --git a/hadoop-hdfs-project/pom.xml b/hadoop-hdfs-project/pom.xml
index 7a0b0a4..d74e4d7 100644
--- a/hadoop-hdfs-project/pom.xml
+++ b/hadoop-hdfs-project/pom.xml
@@ -31,6 +31,7 @@
<module>hadoop-hdfs</module>
<module>hadoop-hdfs-httpfs</module>
<module>hadoop-hdfs/src/contrib/fuse-dfs</module>
+ <module>hadoop-hdfs/src/contrib/bkjournal</module>
</modules>
<build>