Merge r1346682 through r1354801 from trunk.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3092@1354832 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
new file mode 100644
index 0000000..f9a9c6f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
@@ -0,0 +1,42 @@
+Hadoop HDFS Change Log
+
+HDFS-3092 branch changes
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+    HDFS-3185. Add configuration of Journal Service Daemons. 
+    (Hari Mankude via suresh)
+
+    HDFS-3213. Persist the cluster id and nsid in JournalService storage
+    directory.  (Hari Mankude via szetszwo)
+
+    HDFS-3274. Add a new conf key dfs.journal.edits.dir and use it for
+    edit log initialization.  (Hari Mankude via szetszwo)
+
+    HDFS-3283. Add http server to journal service.  (Brandon Li via szetszwo)
+
+    HDFS-3196. Add Journal and JournalDiskWriter for journal service.
+    (szetszwo)
+
+    HDFS-3201. Add GetJournalEditServlet for downloading edit logs from journal
+    service.  (Brandon Li via szetszwo)
+
+    HDFS-3313. Create a protocol for journal service synchronziation.  (Brandon
+    Li via szetszwo)
+
+    HDFS-3186. Add ability to journal service to sync journals from another
+    active journal service. (Brandon Li via suresh)
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+    HDFS-3388. GetJournalEditServlet should close output stream only if the
+    stream is used.  (Brandon Li via szetszwo)
+
+    HDFS-3511. Update GetJournalEditServlet and JournalHttpServer for the api
+    changes in SecurityUtil.  (Brandon Li via szetszwo)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 41bcf31..1c342de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -178,6 +178,25 @@
             </configuration>
           </execution>
           <execution>
+            <id>journal</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <configuration>
+              <compile>false</compile>
+              <workingDirectory>${project.build.directory}/generated-src/main/jsp</workingDirectory>
+              <webFragmentFile>${project.build.directory}/journal-jsp-servlet-definitions.xml</webFragmentFile>
+              <packageName>org.apache.hadoop.hdfs.server.journalservice</packageName>
+              <sources>
+                <directory>${basedir}/src/main/webapps/journal</directory>
+                <includes>
+                  <include>*.jsp</include>
+                </includes>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
             <id>datanode</id>
             <phase>generate-sources</phase>
             <goals>
@@ -284,6 +303,7 @@
                 <loadfile property="hdfs.servlet.definitions" srcFile="${project.build.directory}/hdfs-jsp-servlet-definitions.xml"/>
                 <loadfile property="secondary.servlet.definitions" srcFile="${project.build.directory}/secondary-jsp-servlet-definitions.xml"/>
                 <loadfile property="datanode.servlet.definitions" srcFile="${project.build.directory}/datanode-jsp-servlet-definitions.xml"/>
+                <loadfile property="journal.servlet.definitions" srcFile="${project.build.directory}/journal-jsp-servlet-definitions.xml"/>               
                 <echoproperties destfile="${project.build.directory}/webxml.properties">
                   <propertyset>
                     <propertyref regex=".*.servlet.definitions"/>
@@ -299,6 +319,9 @@
                 <copy file="${basedir}/src/main/webapps/proto-datanode-web.xml"
                       tofile="${project.build.directory}/webapps/datanode/WEB-INF/web.xml"
                       filtering="true"/>
+                <copy file="${basedir}/src/main/webapps/proto-journal-web.xml"
+                      tofile="${project.build.directory}/webapps/journal/WEB-INF/web.xml"
+                      filtering="true"/>
                 <copy toDir="${project.build.directory}/webapps">
                   <fileset dir="${basedir}/src/main/webapps">
                     <exclude name="**/*.jsp"/>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 146ed83..721dda5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -204,6 +204,16 @@
   public static final String  DFS_HOSTS = "dfs.hosts";
   public static final String  DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
   public static final String  DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
+  
+  // This is a comma separated host:port list of addresses hosting the journal service
+  public static final String  DFS_JOURNAL_ADDRESS_KEY = "dfs.journal.addresses";
+  public static final String  DFS_JOURNAL_EDITS_DIR_KEY = "dfs.journal.edits.dir";
+  public static final String  DFS_JOURNAL_HTTP_ADDRESS_KEY = "dfs.journal.http-addresses";
+  public static final String  DFS_JOURNAL_HTTPS_PORT_KEY = "dfs.journal.https-port";
+  public static final int     DFS_JOURNAL_HTTPS_PORT_DEFAULT = 50510;
+  public static final String  DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY = "dfs.journal.kerberos.https.principal";
+  public static final String  DFS_JOURNAL_KEYTAB_FILE_KEY = "dfs.journal.keytab.file";
+  public static final String  DFS_JOURNAL_USER_NAME_KEY = "dfs.journal.kerberos.principal";
 
   // Much code in hdfs is not yet updated to use these keys.
   public static final String  DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
@@ -310,7 +320,7 @@
                                            "dfs.image.transfer.bandwidthPerSec";
   public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
 
-  //Keys with no defaults
+  // Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
   public static final String  DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index a78a61e..dc266fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -25,6 +25,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.SecureRandom;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -1064,4 +1065,61 @@
       return null;
     }
   }
+  
+  private static Collection<InetSocketAddress> getAddresses(Configuration conf,
+      String addrKey) {
+    Collection<String> addresses = conf.getTrimmedStringCollection(addrKey);
+    Collection<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
+    for (String address : emptyAsSingletonNull(addresses)) {
+      if (address == null) {
+        continue;
+      }
+      ret.add(NetUtils.createSocketAddr(address));
+    }
+    return ret;
+  }
+  
+  /**
+   * Returns list of RPC server InetSocketAddresses of journal services from the
+   * configuration.
+   * 
+   * @param conf configuration
+   * @return list of InetSocketAddresses
+   */
+  public static Collection<InetSocketAddress> getJournalNodeAddresses(
+      Configuration conf) {
+    return getAddresses(conf, DFS_JOURNAL_ADDRESS_KEY);
+  }
+  
+  /**
+   * Returns list of http InetSocketAddresses of journal services from the
+   * configuration.
+   * 
+   * @param conf configuration
+   * @return list of http InetSocketAddresses
+   */
+  public static Collection<InetSocketAddress> getJournalNodeHttpAddresses(
+      Configuration conf) {
+    return getAddresses(conf, DFS_JOURNAL_HTTP_ADDRESS_KEY);
+  }
+  
+  /**
+   * Returns corresponding rpc address with the hostname running journal
+   * service.
+   * 
+   * @param conf configuration
+   * @param hostname the hostname in the http address
+   * @return rpc address of the journal service
+   */
+  public static InetSocketAddress getJournalRpcAddrFromHostName(
+      Configuration conf, String hostname) {
+    Collection<InetSocketAddress> jRpcAddr = DFSUtil
+        .getJournalNodeAddresses(conf);
+    for (InetSocketAddress addr : jRpcAddr) {
+      if (addr != null && addr.getHostName().equals(hostname)) {
+        return addr;
+      }
+    }
+    return null;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
index eabdd22..1e3d0e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
@@ -41,6 +41,10 @@
   public UnregisteredNodeException(NodeRegistration nodeReg) {
     super("Unregistered server: " + nodeReg.toString());
   }
+  
+  public UnregisteredNodeException(String msg) {
+    super("Unregistered server: " + msg);
+  }
 
   /**
    * The exception is thrown if a different data-node claims the same
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolPB.java
new file mode 100644
index 0000000..0b8886e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolPB.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.JournalSyncProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to synchronize journal edit logs.
+ * 
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY)
+@ProtocolInfo(protocolName = 
+    "org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol",
+    protocolVersion = 1)
+@InterfaceAudience.Private
+public interface JournalSyncProtocolPB extends
+    JournalSyncProtocolService.BlockingInterface {
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..4fe0832
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolServerSideTranslatorPB.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link JournalSyncProtocolPB} to the 
+ * {@link JournalSyncProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class JournalSyncProtocolServerSideTranslatorPB implements JournalSyncProtocolPB {
+  /** Server side implementation to delegate the requests to */
+  private final JournalSyncProtocol impl;
+
+  public JournalSyncProtocolServerSideTranslatorPB(JournalSyncProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public GetEditLogManifestResponseProto getEditLogManifest(
+      RpcController unused, GetEditLogManifestRequestProto request)
+      throws ServiceException {
+    RemoteEditLogManifest manifest;
+    try {
+      JournalInfo journalInfo = PBHelper.convert(request.getJournalInfo());
+      manifest = impl.getEditLogManifest(journalInfo, request.getSinceTxId());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return GetEditLogManifestResponseProto.newBuilder()
+        .setManifest(PBHelper.convert(manifest)).build();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolTranslatorPB.java
new file mode 100644
index 0000000..add41bd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolTranslatorPB.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link JournalSyncProtocol} interfaces to the RPC server implementing
+ * {@link JournalSyncProtocolPB}.
+ */
+@InterfaceAudience.Private
+public class JournalSyncProtocolTranslatorPB implements ProtocolMetaInterface,
+    JournalSyncProtocol, Closeable {
+  /** RpcController is not used and hence is set to null */
+  private final static RpcController NULL_CONTROLLER = null;
+  private final JournalSyncProtocolPB rpcProxy;
+  
+  public JournalSyncProtocolTranslatorPB(JournalSyncProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  @Override
+  public void close() {
+    RPC.stopProxy(rpcProxy);
+  }
+  
+  @Override
+  public RemoteEditLogManifest getEditLogManifest(JournalInfo journalInfo, long sinceTxId)
+      throws IOException {
+    JournalInfoProto journalInfoProto = PBHelper.convert(journalInfo);
+    GetEditLogManifestRequestProto req = GetEditLogManifestRequestProto
+        .newBuilder().setJournalInfo(journalInfoProto).setSinceTxId(sinceTxId).build();
+    
+    try {
+      return PBHelper.convert(rpcProxy.getEditLogManifest(NULL_CONTROLLER, req)
+          .getManifest());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public boolean isMethodSupported(String methodName) throws IOException {
+    return RpcClientUtil.isMethodSupported(rpcProxy, JournalSyncProtocolPB.class,
+        RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(JournalSyncProtocolPB.class), methodName);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/GetJournalEditServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/GetJournalEditServlet.java
new file mode 100644
index 0000000..c14c4e8e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/GetJournalEditServlet.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
+import org.apache.hadoop.hdfs.server.namenode.GetImageServlet.GetImageParams;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class is used by the lagging Journal service to retrieve edit file from
+ * another Journal service for sync up.
+ */
+@InterfaceAudience.Private
+public class GetJournalEditServlet extends HttpServlet {
+
+  private static final long serialVersionUID = -4635891628211723009L;
+  private static final Log LOG = LogFactory.getLog(GetJournalEditServlet.class);
+
+  // TODO: create security tests
+  protected boolean isValidRequestor(String remoteUser, Configuration conf)
+      throws IOException {
+    if (remoteUser == null) { // This really shouldn't happen...
+      LOG.warn("Received null remoteUser while authorizing access to GetJournalEditServlet");
+      return false;
+    }
+
+    String[] validRequestors = {
+        SecurityUtil.getServerPrincipal(conf
+            .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
+            .getAddress(conf).getHostName()),
+        SecurityUtil.getServerPrincipal(
+            conf.get(DFSConfigKeys.DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY),
+            NameNode.getAddress(conf).getHostName()),
+        SecurityUtil.getServerPrincipal(conf
+            .get(DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY),
+            NameNode.getAddress(conf).getHostName()) };
+
+    for (String v : validRequestors) {
+      if (v != null && v.equals(remoteUser)) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("isValidRequestor is allowing: " + remoteUser);
+        return true;
+      }
+    }
+    if (LOG.isDebugEnabled())
+      LOG.debug("isValidRequestor is rejecting: " + remoteUser);
+    return false;
+  }
+
+  @Override
+  public void doGet(final HttpServletRequest request,
+      final HttpServletResponse response) throws ServletException, IOException {
+    try {
+      ServletContext context = getServletContext();
+      final NNStorage storage = JournalHttpServer
+          .getJournalFromContext(context).getStorage();
+
+      final GetImageParams parsedParams = new GetImageParams(request, response);
+
+      final Configuration conf = (Configuration) getServletContext()
+          .getAttribute(JspHelper.CURRENT_CONF);
+
+      if (UserGroupInformation.isSecurityEnabled()
+          && !isValidRequestor(request.getRemoteUser(), conf)) {
+        response
+            .sendError(HttpServletResponse.SC_FORBIDDEN,
+                "Only Namenode and another Journal service may access this servlet");
+        LOG.warn("Received non-NN/Journal request for edits from "
+            + request.getRemoteHost());
+        return;
+      }
+
+      String myStorageInfoString = storage.toColonSeparatedString();
+      String theirStorageInfoString = parsedParams.getStorageInfoString();
+      if (theirStorageInfoString != null
+          && !myStorageInfoString.equals(theirStorageInfoString)) {
+        response
+            .sendError(HttpServletResponse.SC_FORBIDDEN,
+                "This node has storage info " + myStorageInfoString
+                    + " but the requesting node expected "
+                    + theirStorageInfoString);
+        LOG.warn("Received an invalid request file transfer request "
+            + " with storage info " + theirStorageInfoString);
+        return;
+      }
+
+      UserGroupInformation.getCurrentUser().doAs(
+          new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              if (parsedParams.isGetEdit()) {
+                long startTxId = parsedParams.getStartTxId();
+                long endTxId = parsedParams.getEndTxId();
+                File editFile = storage.findFinalizedEditsFile(startTxId,
+                    endTxId);
+
+                GetImageServlet.setVerificationHeaders(response, editFile);
+                GetImageServlet.setFileNameHeaders(response, editFile);
+                
+                DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
+
+                // send edits
+                FaultInjector.instance.beforeSendEdits();
+                ServletOutputStream output = response.getOutputStream();
+                try {
+                  TransferFsImage.getFileServer(output, editFile, throttler);
+                } finally {
+                  if (output != null)
+                    output.close();
+                }
+
+              } else {
+                response
+                    .sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED,
+                        "The server only accepts getedit request. This request is not getedit.");
+              }
+              return null;
+            }
+          });
+
+    } catch (Exception ie) {
+      String errMsg = "getedit failed. " + StringUtils.stringifyException(ie);
+      response.sendError(HttpServletResponse.SC_GONE, errMsg);
+      throw new IOException(errMsg);
+    }
+  }
+  
+  /**
+   * Static nested class only for fault injection. Typical usage of this class
+   * is to make a Mockito object of this class, and then use the Mackito object
+   * to control the behavior of the fault injection.
+   */
+  public static class FaultInjector {
+    public static FaultInjector instance = 
+        new FaultInjector();
+    
+    public void beforeSendEdits() throws IOException {}
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
new file mode 100644
index 0000000..bb02075
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+/** The journal stored in local directories. */
+class Journal {
+  static final Log LOG = LogFactory.getLog(Journal.class);
+
+  private final FSImage image;
+  private volatile boolean isFormatted;
+
+  /**
+   * Constructor. It is possible that the directory is not formatted.
+   */
+  Journal(Configuration conf) throws IOException {
+    this.image = new FSImage(conf, Collections.<URI>emptyList(), getEditDirs(conf));
+
+    final Map<StorageDirectory, StorageState> states
+        = new HashMap<StorageDirectory, StorageState>();
+    isFormatted = image.recoverStorageDirs(StartupOption.REGULAR, states);
+    for(Map.Entry<StorageDirectory, StorageState> e : states.entrySet()) {
+      LOG.info(e.getKey() + ": " + e.getValue());
+    }
+    LOG.info("The journal is " + (isFormatted? "already": "not yet")
+        + " formatted: " + image.getStorage());
+  }
+
+  /**
+   * Format the local edit directory.
+   */
+  synchronized void format(int namespaceId, String clusterId) throws IOException {
+    if (isFormatted) {
+      throw new IllegalStateException("The joural is already formatted.");
+    }
+    final NNStorage s = image.getStorage(); 
+    s.format(new NamespaceInfo(namespaceId, clusterId, "dummy-bpid", 0, 0));
+    isFormatted = true;
+    LOG.info("Formatted journal: " + s);
+  }
+
+  boolean isFormatted() {
+    return isFormatted;
+  }
+  
+  NNStorage getStorage() {
+    return image.getStorage();
+  }
+  
+  synchronized void verifyVersion(JournalService service, NamespaceInfo info
+      ) throws IOException {
+    if (!isFormatted) {
+      return;
+    }
+
+    final StorageInfo stored = image.getStorage();
+    if (!stored.getClusterID().equals(info.getClusterID())) {
+      throw new IOException("Cluster IDs not matched: stored = "
+          + stored.getClusterID() + " != passed = " + info.getClusterID());
+    }
+    if (stored.getNamespaceID() != info.getNamespaceID()) {
+      throw new IOException("Namespace IDs not matched: stored = "
+          + stored.getNamespaceID() + " != passed = " + info.getNamespaceID());
+    }
+    if (stored.getLayoutVersion() != info.getLayoutVersion()) {
+      throw new IOException("Layout versions not matched: stored = "
+          + stored.getLayoutVersion() + " != passed = " + info.getLayoutVersion());
+    }
+    if (stored.getCTime() != info.getCTime()) {
+      throw new IOException("CTimes not matched: stored = "
+          + stored.getCTime() + " != passed = " + info.getCTime());
+    }
+  }
+
+  void close() throws IOException {
+    image.close();
+  }
+
+  FSEditLog getEditLog() {
+    return image.getEditLog();
+  }
+  
+  RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
+    return image.getEditLog().getEditLogManifest(sinceTxId);
+  }
+  
+  static List<URI> getEditDirs(Configuration conf) throws IOException {
+    final Collection<String> dirs = conf.getTrimmedStringCollection(
+        DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY);
+    LOG.info(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY + " = " + dirs);
+    return Util.stringCollectionAsURIs(dirs);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
new file mode 100644
index 0000000..3b3c412
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/** A JournalListener for writing journal to an edit log. */
+class JournalDiskWriter implements JournalListener {
+  static final Log LOG = LogFactory.getLog(JournalDiskWriter.class);
+
+  private final Journal journal;
+
+  /**
+   * Constructor. It is possible that the directory is not formatted. In this
+   * case, it creates dummy entries and storage is later formatted.
+   */
+  JournalDiskWriter(Journal journal) throws IOException {
+    this.journal = journal;
+  }
+
+  Journal getJournal() {
+    return journal;
+  }
+
+  @Override
+  public synchronized void verifyVersion(JournalService service,
+      NamespaceInfo info) throws IOException {
+    journal.verifyVersion(service, info);
+   }
+
+  @Override
+  public synchronized void journal(JournalService service, long firstTxId,
+      int numTxns, byte[] records) throws IOException {
+    journal.getEditLog().journal(firstTxId, numTxns, records);
+  }
+
+  @Override
+  public synchronized void startLogSegment(JournalService service, long txid
+      ) throws IOException {
+    journal.getEditLog().startLogSegment(txid, false);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
new file mode 100644
index 0000000..dc11012
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Encapsulates the HTTP server started by the Journal Service.
+ */
+@InterfaceAudience.Private
+public class JournalHttpServer {
+  public static final Log LOG = LogFactory.getLog(JournalHttpServer.class);
+
+  public static final String JOURNAL_ATTRIBUTE_KEY = "localjournal";
+
+  private HttpServer httpServer;
+  private InetSocketAddress httpAddress;
+  private String infoBindAddress;
+  private int infoPort;
+  private int httpsPort;
+  private Journal localJournal;
+
+  private final Configuration conf;
+
+  JournalHttpServer(Configuration conf, Journal journal,
+      InetSocketAddress bindAddress) {
+    this.conf = conf;
+    this.localJournal = journal;
+    this.httpAddress = bindAddress;
+  }
+
+  void start() throws IOException {
+    infoBindAddress = httpAddress.getHostName();
+
+    // initialize the webserver for uploading/downloading files.
+    // Kerberized SSL servers must be run from the host principal...
+    UserGroupInformation httpUGI = UserGroupInformation
+        .loginUserFromKeytabAndReturnUGI(SecurityUtil.getServerPrincipal(
+            conf.get(DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY),
+            infoBindAddress), conf.get(DFS_JOURNAL_KEYTAB_FILE_KEY));
+    try {
+      httpServer = httpUGI.doAs(new PrivilegedExceptionAction<HttpServer>() {
+        @Override
+        public HttpServer run() throws IOException, InterruptedException {
+          LOG.info("Starting web server as: "
+              + UserGroupInformation.getCurrentUser().getUserName());
+
+          int tmpInfoPort = httpAddress.getPort();
+          httpServer = new HttpServer("journal", infoBindAddress,
+              tmpInfoPort, tmpInfoPort == 0, conf, new AccessControlList(conf
+                  .get(DFS_ADMIN, " ")));
+
+          if (UserGroupInformation.isSecurityEnabled()) {
+            // TODO: implementation 
+          }
+          httpServer.setAttribute(JOURNAL_ATTRIBUTE_KEY, localJournal);
+          httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
+          // use "/getimage" because GetJournalEditServlet uses some
+          // GetImageServlet methods.
+          // TODO: change getimage to getedit
+          httpServer.addInternalServlet("getimage", "/getimage",
+              GetJournalEditServlet.class, true);
+          httpServer.start();
+          return httpServer;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+
+    // The web-server port can be ephemeral... ensure we have the correct info
+    infoPort = httpServer.getPort();
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      httpsPort = infoPort;
+    }
+
+    LOG.info("Journal Web-server up at: " + infoBindAddress + ":" + infoPort
+        + " and https port is: " + httpsPort);
+  }
+
+  void stop() throws IOException {
+    if (httpServer != null) {
+      try {
+        httpServer.stop();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  InetSocketAddress getHttpAddress() {
+    return httpAddress;
+  }
+  
+  public static Journal getJournalFromContext(ServletContext context) {
+    return (Journal) context.getAttribute(JOURNAL_ATTRIBUTE_KEY);
+  }
+
+  public static Configuration getConfFromContext(ServletContext context) {
+    return (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+  }
+
+  /**
+   * Download <code>edits</code> files from another journal service
+   * 
+   * @return true if a new image has been downloaded and needs to be loaded
+   * @throws IOException
+   */
+  boolean downloadEditFiles(final String jnHostPort,
+      final RemoteEditLogManifest manifest) throws IOException {
+
+    // Sanity check manifest
+    if (manifest.getLogs().isEmpty()) {
+      throw new IOException("Found no edit logs to download");
+    }
+
+    try {
+      Boolean b = UserGroupInformation.getCurrentUser().doAs(
+          new PrivilegedExceptionAction<Boolean>() {
+
+            @Override
+            public Boolean run() throws Exception {
+              // get edits file
+              for (RemoteEditLog log : manifest.getLogs()) {
+                TransferFsImage.downloadEditsToStorage(jnHostPort, log,
+                    localJournal.getStorage());
+              }
+              return true;
+            }
+          });
+      return b.booleanValue();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
index 2d5ec9e..e2c6f68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
@@ -35,8 +35,10 @@
    * 
    * The application using {@link JournalService} can stop the service if
    * {@code info} validation fails.
+   * @throws IOException 
    */
-  public void verifyVersion(JournalService service, NamespaceInfo info);
+  public void verifyVersion(JournalService service, NamespaceInfo info
+      ) throws IOException;
   
   /**
    * Process the received Journal record
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
index e8d7073..88b918c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
@@ -17,31 +17,43 @@
  */
 package org.apache.hadoop.hdfs.server.journalservice;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.JournalSyncProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.FencedException;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.JournalServiceProtocols;
+import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
@@ -61,18 +73,26 @@
  * listener implementation can handle the callbacks based on the application
  * requirement.
  */
-public class JournalService implements JournalProtocol {
+public class JournalService implements JournalServiceProtocols {
   public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
-
-  private final JournalListener listener;
   private final InetSocketAddress nnAddress;
-  private final NamenodeRegistration registration;
+  private NamenodeRegistration registration;
   private final NamenodeProtocol namenode;
   private final StateHandler stateHandler = new StateHandler();
   private final RPC.Server rpcServer;
+  private final JournalHttpServer httpServer;
   private long epoch = 0;
   private String fencerInfo;
+  private Daemon syncThread = null;
+  private Configuration conf;
   
+  // Flags to indicate whether to start sync
+  private boolean toStartSync = false;
+  private long syncSinceTxid = -1;
+
+  private final Journal journal;
+  private final JournalListener listener;
+
   enum State {
     /** The service is initialized and ready to start. */
     INIT(false, false),
@@ -121,12 +141,35 @@
       current = State.WAITING_FOR_ROLL;
     }
 
-    synchronized void startLogSegment() {
+    synchronized State startLogSegment() {
+      State prevState = current;
       if (current == State.WAITING_FOR_ROLL) {
         current = State.SYNCING;
       }
+      return prevState;
     }
+    
+    /**
+     * Try to transit to IN_SYNC state
+     * @return current state. if returned state is not IN_SYNC, caller should
+     *         know inSync failed
+     */
+    synchronized State inSync() {
+      if (current == State.IN_SYNC) {
+        throw new IllegalStateException("Service cannot be in " + current
+            + " state.");
+      }
 
+      if (current == State.SYNCING) {
+        current = State.IN_SYNC;
+      }
+      return current;
+    }
+    
+    synchronized void fence() {     
+      current = State.WAITING_FOR_ROLL;
+    }
+    
     synchronized void isStartLogSegmentAllowed() throws IOException {
       if (!current.isStartLogSegmentAllowed) {
         throw new IOException("Cannot start log segment in " + current
@@ -161,36 +204,56 @@
    *          {@code server} is a valid server that is managed out side this
    *          service.
    * @param listener call-back interface to listen to journal activities
+   * @param journal the journal used by both Listener and JournalService
    * @throws IOException on error
    */
   JournalService(Configuration conf, InetSocketAddress nnAddr,
-      InetSocketAddress serverAddress, JournalListener listener)
-      throws IOException {
+      InetSocketAddress serverAddress, InetSocketAddress httpAddress,
+      JournalListener listener, Journal journal) throws IOException {
     this.nnAddress = nnAddr;
     this.listener = listener;
+    this.journal = journal;
     this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
         NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
         .getProxy();
     this.rpcServer = createRpcServer(conf, serverAddress, this);
-
-    String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
-    StorageInfo storage = new StorageInfo(
-        LayoutVersion.getCurrentLayoutVersion(), 0, "", 0);
-    registration = new NamenodeRegistration(addr, "", storage,
-        NamenodeRole.BACKUP);
+    this.httpServer = new JournalHttpServer(conf, journal, httpAddress); 
+    this.conf = conf;
   }
   
+  Journal getJournal() {
+    return journal;
+  }
+
+  synchronized NamenodeRegistration getRegistration() {
+    if (!journal.isFormatted()) {
+      throw new IllegalStateException("Journal is not formatted.");
+    }
+
+    if (registration == null) {
+      registration = new NamenodeRegistration(
+          NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
+          journal.getStorage(), NamenodeRole.BACKUP);
+    }
+    return registration;
+  }
+
   /**
    * Start the service.
+   * @throws IOException on error
    */
-  public void start() {
+  public void start() throws IOException {
     stateHandler.start();
 
     // Start the RPC server
-    LOG.info("Starting rpc server");
+    LOG.info("Starting journal service rpc server");
     rpcServer.start();
+    
+    // Start the HTTP server
+    LOG.info("Starting journal service http server");
+    httpServer.start();
 
-    for(boolean registered = false, handshakeComplete = false; ; ) {
+    for (boolean registered = false, handshakeComplete = false;;) {
       try {
         // Perform handshake
         if (!handshakeComplete) {
@@ -198,7 +261,7 @@
           handshakeComplete = true;
           LOG.info("handshake completed");
         }
-        
+
         // Register with the namenode
         if (!registered) {
           registerWithNamenode();
@@ -211,7 +274,7 @@
       } catch (Exception e) {
         LOG.warn("Encountered exception ", e);
       }
-      
+
       try {
         Thread.sleep(1000);
       } catch (InterruptedException ie) {
@@ -220,6 +283,14 @@
     }
 
     stateHandler.waitForRoll();
+
+    // Create a never ending daemon to sync journal segments
+    // TODO: remove the assumption that "won't delete logs"
+    // use 3 because NN rolls with txid=3 when first journal service joining.
+    // need to fix this after NN is modified to ignore its local storage dir
+    syncThread = new Daemon(new JournalSync(this));
+    syncThread.start();
+
     try {
       namenode.rollEditLog();
     } catch (IOException e) {
@@ -230,10 +301,14 @@
   /**
    * Stop the service. For application with RPC Server managed outside, the
    * RPC Server must be stopped the application.
+   * @throws IOException on error
    */
-  public void stop() {
+  public void stop() throws IOException {
     if (!stateHandler.isStopped()) {
+      syncThread.interrupt();
+      httpServer.stop();
       rpcServer.stop();
+      journal.close();
     }
   }
 
@@ -257,34 +332,70 @@
     stateHandler.isStartLogSegmentAllowed();
     verify(epoch, journalInfo);
     listener.startLogSegment(this, txid);
-    stateHandler.startLogSegment();
+    
+    if (stateHandler.startLogSegment() == State.WAITING_FOR_ROLL) {
+      LOG.info("Notify syncThread to re-sync with txid:" + syncSinceTxid);
+      startSync(syncSinceTxid);
+    }
   }
 
   @Override
   public FenceResponse fence(JournalInfo journalInfo, long epoch,
       String fencerInfo) throws IOException {
     LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
+   
+    // It is the first fence if the journal is not formatted, 
+    if (!journal.isFormatted()) {
+      journal.format(journalInfo.getNamespaceId(), journalInfo.getClusterId());
+    }
     verifyFence(epoch, fencerInfo);
-    verify(journalInfo);
+    verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
     long previousEpoch = epoch;
     this.epoch = epoch;
     this.fencerInfo = fencerInfo;
-    
+    stateHandler.fence();
+
     // TODO:HDFS-3092 set lastTransId and inSync
     return new FenceResponse(previousEpoch, 0, false);
   }
 
+  @Override
+  public RemoteEditLogManifest getEditLogManifest(JournalInfo journalInfo,
+      long sinceTxId) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Received getEditLogManifest " + sinceTxId);
+    }
+    if (!journal.isFormatted()) {
+      throw new IOException("This journal service is not formatted.");
+    }
+    verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
+
+    // Journal has only one storage directory
+    return journal.getEditLogManifest(sinceTxId);
+  }
+
   /** Create an RPC server. */
   private static RPC.Server createRpcServer(Configuration conf,
-      InetSocketAddress address, JournalProtocol impl) throws IOException {
+      InetSocketAddress address, JournalServiceProtocols impl) throws IOException {
+    
     RPC.setProtocolEngine(conf, JournalProtocolPB.class,
         ProtobufRpcEngine.class);
     JournalProtocolServerSideTranslatorPB xlator = 
         new JournalProtocolServerSideTranslatorPB(impl);
     BlockingService service = 
         JournalProtocolService.newReflectiveBlockingService(xlator);
-    return RPC.getServer(JournalProtocolPB.class, service,
+    
+    JournalSyncProtocolServerSideTranslatorPB syncXlator = 
+        new JournalSyncProtocolServerSideTranslatorPB(impl);   
+    BlockingService syncService = 
+        JournalSyncProtocolService.newReflectiveBlockingService(syncXlator);
+    
+    RPC.Server rpcServer = RPC.getServer(JournalProtocolPB.class, service,
         address.getHostName(), address.getPort(), 1, false, conf, null);
+    DFSUtil.addPBProtocol(conf, JournalSyncProtocolPB.class, syncService,
+        rpcServer);
+    
+    return rpcServer;
   }
   
   private void verifyEpoch(long e) throws FencedException {
@@ -310,20 +421,22 @@
   /** 
    * Verifies a journal request
    */
-  private void verify(JournalInfo journalInfo) throws IOException {
+  private void verify(int nsid, String clusid) throws IOException {
     String errorMsg = null;
-    int expectedNamespaceID = registration.getNamespaceID();
-    if (journalInfo.getNamespaceId() != expectedNamespaceID) {
-      errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
-          + " actual " + journalInfo.getNamespaceId();
+    final NamenodeRegistration reg = getRegistration(); 
+
+    if (nsid != reg.getNamespaceID()) {
+      errorMsg = "Invalid namespaceID in journal request - expected "
+          + reg.getNamespaceID() + " actual " + nsid;
       LOG.warn(errorMsg);
-      throw new UnregisteredNodeException(journalInfo);
-    } 
-    if (!journalInfo.getClusterId().equals(registration.getClusterID())) {
-      errorMsg = "Invalid clusterId in journal request - expected "
-          + journalInfo.getClusterId() + " actual " + registration.getClusterID();
+      throw new UnregisteredNodeException(errorMsg);
+    }
+    if ((clusid == null)
+        || (!clusid.equals(reg.getClusterID()))) {
+      errorMsg = "Invalid clusterId in journal request - incoming "
+          + clusid + " expected " + reg.getClusterID();
       LOG.warn(errorMsg);
-      throw new UnregisteredNodeException(journalInfo);
+      throw new UnregisteredNodeException(errorMsg);
     }
   }
   
@@ -332,14 +445,14 @@
    */
   private void verify(long e, JournalInfo journalInfo) throws IOException {
     verifyEpoch(e);
-    verify(journalInfo);
+    verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
   }
   
   /**
    * Register this service with the active namenode.
    */
   private void registerWithNamenode() throws IOException {
-    NamenodeRegistration nnReg = namenode.register(registration);
+    NamenodeRegistration nnReg = namenode.register(getRegistration());
     String msg = null;
     if(nnReg == null) { // consider as a rejection
       msg = "Registration rejected by " + nnAddress;
@@ -355,11 +468,177 @@
   private void handshake() throws IOException {
     NamespaceInfo nsInfo = namenode.versionRequest();
     listener.verifyVersion(this, nsInfo);
-    registration.setStorageInfo(nsInfo);
+
+    // If this is the first initialization of journal service, then storage
+    // directory will be setup. Otherwise, nsid and clusterid has to match with
+    // the info saved in the edits dir.
+    if (!journal.isFormatted()) {
+      journal.format(nsInfo.getNamespaceID(), nsInfo.getClusterID());
+    } else {
+      verify(nsInfo.getNamespaceID(), nsInfo.getClusterID());
+    }
   }
 
   @VisibleForTesting
   long getEpoch() {
     return epoch;
   }
+  
+  public static JournalSyncProtocol createProxyWithJournalSyncProtocol(
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
+      throws IOException {
+    RPC.setProtocolEngine(conf, JournalSyncProtocolPB.class,
+        ProtobufRpcEngine.class);
+    Object proxy = RPC.getProxy(JournalSyncProtocolPB.class,
+        RPC.getProtocolVersion(JournalSyncProtocolPB.class), address, ugi,
+        conf, NetUtils.getDefaultSocketFactory(conf), 30000);
+
+    return new JournalSyncProtocolTranslatorPB((JournalSyncProtocolPB) proxy);
+  }
+  
+  /**
+   * Only invoked by sync thread to wait for {@code #syncSinceTxid} to be set to
+   * start syncing.
+   * 
+   * @return txid to start syncing from
+   * @throws InterruptedException 
+   */
+  synchronized long waitForStartSync() throws InterruptedException {
+    while (!toStartSync) {
+      wait();
+    }
+    // Sync starting - Unset toStartSync so main thread can set it again
+    toStartSync = false;
+    return syncSinceTxid;
+  }
+
+  /**
+   * Only invoked by main thread to notify sync thread that another round of
+   * sync is needed
+   */
+  synchronized void startSync(long sinceTxid) {
+    if (toStartSync) {
+      LOG.trace("toStartSync is already set.");
+      return;
+    }
+    toStartSync = true;
+    syncSinceTxid = sinceTxid;
+    notify();
+  }
+
+  /**
+   * JournalSync downloads journal segments from other journal services
+   */
+  class JournalSync implements Runnable {
+    private final JournalInfo journalInfo;
+    private final JournalService journalService;
+    private long sinceTxid;
+
+    /**
+     * Constructor
+     * @param journalService Local journal service
+     */
+    JournalSync(JournalService journalService) {
+      NNStorage storage = journalService.getJournal().getStorage();
+      this.journalInfo = new JournalInfo(storage.layoutVersion,
+          storage.clusterID, storage.namespaceID);
+      this.sinceTxid = 0;
+      this.journalService = journalService;
+    }
+
+    public void run() {
+      while (true) {
+        try {
+          sinceTxid = journalService.waitForStartSync();
+          syncAllJournalSegments(conf, journalInfo, sinceTxid);
+        } catch (IOException e) {
+          LOG.error("Unable to sync for "
+              + journalService.getRegistration().getHttpAddress()
+              + " with exception: " + e);
+          try {
+            Thread.sleep(60000);
+          } catch (InterruptedException e1) {
+            break;
+          }
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+      LOG.info("Stopping the JouranlSync thread");
+    }
+
+    public String toString() {
+      return "JournalSync for "
+          + journalService.getRegistration().getHttpAddress();
+    }
+
+    /**
+     * Contact journal service one by one to get all missed journal segments
+     * 
+     * @param conf  Configuration
+     * @param journalInfo  the JournalInfo of the local journal service
+     * @param sinceTxid  the transaction id to start with
+     * @throws IOException
+     */
+    void syncAllJournalSegments(Configuration conf, JournalInfo journalInfo,
+        long sinceTxid) throws IOException {
+
+      // Get a list of configured journal services
+      Collection<InetSocketAddress> addrList = DFSUtil
+          .getJournalNodeHttpAddresses(conf);
+      FSEditLog editLog = journal.getEditLog();
+      File currentDir = new File(
+          conf.get(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY));
+
+      boolean needSync = !editLog.hasCompleteJournalSegments(sinceTxid,
+          currentDir);
+      if (!needSync) {
+        LOG.trace("Nothing to sync.");
+        return;
+      }
+
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      InetSocketAddress selfAddr = journalService.httpServer.getHttpAddress();
+      for (InetSocketAddress addr : addrList) {
+        try {
+          // Skip itself
+          if (addr.getHostName().equals(selfAddr.getHostName())
+              && addr.getPort() == selfAddr.getPort()) {
+            continue;
+          }
+          // Download journal segments
+          InetSocketAddress rpcAddr = DFSUtil.getJournalRpcAddrFromHostName(
+              conf, addr.getHostName());
+          JournalSyncProtocol syncProxy = createProxyWithJournalSyncProtocol(
+              rpcAddr, conf, ugi);
+          RemoteEditLogManifest manifest = syncProxy.getEditLogManifest(
+              journalInfo, sinceTxid);
+          httpServer.downloadEditFiles(NetUtils.getHostPortString(addr),
+              manifest);
+        } catch (IOException e) {
+          LOG.debug("Sync failed for " + selfAddr + "with exception ", e);
+          // Ignore error and try the next journal service
+        }
+
+        if (editLog.hasCompleteJournalSegments(sinceTxid, currentDir)) {
+          needSync = false;
+          break;
+        }
+      }
+
+      if (needSync) {
+        throw new IOException("Journal sync failed.");
+      }
+
+      // Journal service may not be in SYNCING state
+      State jState = stateHandler.inSync();
+      if (jState != State.IN_SYNC) {
+        LOG.debug("Journal service state changed during syncing : " + jState);
+      } else {
+        LOG.debug("Journal sync is done.");
+        // TODO: report IN_SYNC state to NN. Note that, it's ok if state changes
+        // to another state because NN could reject the IN_SYNC report
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index c0e8ff0..8e12dcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -19,6 +19,7 @@
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.URI;
@@ -64,6 +65,7 @@
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -443,6 +445,59 @@
   }
   
   /**
+   * Check if current node has complete set of journal segments from sinceTxid
+   * to (curSegmentTxId-1)
+   * @param sinceTxid the transaction to start with
+   * @param currentDir the storage directory to find journal segments
+   * @return true if journal segments are complete, otherwise false
+   * @throws IOException
+   */
+  public synchronized boolean hasCompleteJournalSegments(long sinceTxid,
+      File currentDir) throws IOException {
+    List<RemoteEditLog> segments = getFinalizedSegments(sinceTxid, currentDir);
+    long curSegTxid = getCurSegmentTxId();
+    return hasCompleteJournalSegments(segments, sinceTxid, curSegTxid);
+  }
+
+  public static boolean hasCompleteJournalSegments(
+      List<RemoteEditLog> segments, long sinceTxid, long curSegTxid)
+      throws IOException {
+    long minTxid = -1, maxTxid = -1;
+
+    if (sinceTxid > curSegTxid) {
+      throw new RuntimeException("illegal input: sinceTxid=" + sinceTxid
+          + " curSegTxid= " + curSegTxid);
+    }
+    if (segments.size() == 0) {      
+      return false;
+    }
+
+    for (RemoteEditLog log : segments) {
+      if (log.getStartTxId() > curSegTxid || log.getEndTxId() > curSegTxid) {
+        throw new RuntimeException("Invalid log: [" + log.getStartTxId() + ","
+            + log.getEndTxId() + "] and curSegTxid=" + curSegTxid);
+      }
+      
+      if (minTxid == -1 || minTxid == -1) {
+        minTxid = log.getStartTxId();
+        maxTxid = log.getEndTxId();
+        assert (minTxid > 0 && maxTxid > 0 && maxTxid > minTxid);
+      } else {
+        // check gap in the middle
+        if (maxTxid != log.getStartTxId() - 1) {
+          return false;
+        }
+        maxTxid = log.getEndTxId();
+      }
+    }
+    // check gap at ends
+    if (minTxid > sinceTxid || maxTxid < curSegTxid - 1) {
+      return false;
+    }    
+    return true;
+  }
+  
+  /**
    * Set the transaction ID to use for the next transaction written.
    */
   synchronized void setNextTxId(long nextTxId) {
@@ -866,7 +921,15 @@
       throws IOException {
     return journalSet.getEditLogManifest(fromTxId);
   }
- 
+  
+  /**
+   * Return a list of what finalized edit logs are available
+   */
+  public synchronized List<RemoteEditLog> getFinalizedSegments(long fromTxId,
+      File currentDir) throws IOException {
+    return journalSet.getFinalizedSegments(fromTxId, currentDir);
+  }
+  
   /**
    * Finalizes the current edit log and opens a new log segment.
    * @return the transaction id of the BEGIN_LOG_SEGMENT transaction
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index c76a16f..7a8a137 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -113,7 +113,7 @@
    * @param editsDirs Directories the editlog can be stored in.
    * @throws IOException if directories are invalid.
    */
-  protected FSImage(Configuration conf,
+  public FSImage(Configuration conf,
                     Collection<URI> imageDirs,
                     List<URI> editsDirs)
       throws IOException {
@@ -253,7 +253,7 @@
    * @param dataDirStates output of storage directory states
    * @return true if there is at least one valid formatted storage directory
    */
-  private boolean recoverStorageDirs(StartupOption startOpt,
+  public boolean recoverStorageDirs(StartupOption startOpt,
       Map<StorageDirectory, StorageState> dataDirStates) throws IOException {
     boolean isFormatted = false;
     for (Iterator<StorageDirectory> it = 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
index 958cb14..d04c06b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
@@ -192,7 +192,7 @@
     }
   }
   
-  private static void setFileNameHeaders(HttpServletResponse response,
+  public static void setFileNameHeaders(HttpServletResponse response,
       File file) {
     response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
         file.getName());
@@ -204,7 +204,7 @@
    * @param conf configuration
    * @return a data transfer throttler
    */
-  private final DataTransferThrottler getThrottler(Configuration conf) {
+  public static DataTransferThrottler getThrottler(Configuration conf) {
     long transferBandwidth = 
       conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
                    DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
@@ -262,7 +262,7 @@
    * Set headers for content length, and, if available, md5.
    * @throws IOException 
    */
-  private void setVerificationHeaders(HttpServletResponse response, File file)
+  public static void setVerificationHeaders(HttpServletResponse response, File file)
   throws IOException {
     response.setHeader(TransferFsImage.CONTENT_LENGTH,
         String.valueOf(file.length()));
@@ -302,7 +302,7 @@
   }
 
   
-  static class GetImageParams {
+  public static class GetImageParams {
     private boolean isGetImage;
     private boolean isGetEdit;
     private boolean isPutImage;
@@ -382,7 +382,7 @@
       return endTxId;
     }
 
-    boolean isGetEdit() {
+    public boolean isGetEdit() {
       return isGetEdit;
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
index 92c3e52..9d6c8d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -619,6 +620,39 @@
   }
 
   /**
+   * Returns a list of finalized edit logs that are available in given
+   * storageDir {@code currentDir}. All available edit logs are returned
+   * starting from the transaction id {@code fromTxId}. Note that the list may
+   * have gaps in these finalized edit logs.
+   * 
+   * @param fromTxId Starting transaction id to read the logs.
+   * @param currentDir The storage directory to find the logs.
+   * @return a list of finalized segments.
+   */
+  public synchronized List<RemoteEditLog> getFinalizedSegments(long fromTxId,
+      File currentDir) {
+    List<RemoteEditLog> allLogs = null;
+    for (JournalAndStream j : journals) {
+      if (j.getManager() instanceof FileJournalManager) {
+        FileJournalManager fjm = (FileJournalManager) j.getManager();
+        if (fjm.getStorageDirectory().getRoot().equals(currentDir)) {
+          try {
+            allLogs = fjm.getRemoteEditLogs(fromTxId);
+            break;
+          } catch (IOException t) {
+            LOG.warn("Cannot list edit logs in " + fjm, t);
+          }
+        }
+      }
+    }
+    // sort collected segments
+    if (allLogs != null && allLogs.size() > 0) {
+      Collections.sort(allLogs);
+    }
+    return allLogs;
+  }
+
+  /**
    * Add sync times to the buffer.
    */
   String getSyncTimes() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
index 01c951a..ccd942c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
@@ -95,7 +95,7 @@
    * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which
    * stores both fsimage and edits.
    */
-  static enum NameNodeDirType implements StorageDirType {
+  public static enum NameNodeDirType implements StorageDirType {
     UNDEFINED,
     IMAGE,
     EDITS,
@@ -736,7 +736,7 @@
   /**
    * Return the first readable finalized edits file for the given txid.
    */
-  File findFinalizedEditsFile(long startTxId, long endTxId)
+  public File findFinalizedEditsFile(long startTxId, long endTxId)
   throws IOException {
     File ret = findFile(NameNodeDirType.EDITS,
         getFinalizedEditsFileName(startTxId, endTxId));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 492cf28..d41593c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -78,7 +78,7 @@
     return hash;
   }
   
-  static void downloadEditsToStorage(String fsName, RemoteEditLog log,
+  public static void downloadEditsToStorage(String fsName, RemoteEditLog log,
       NNStorage dstStorage) throws IOException {
     assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
       "bad log: " + log;
@@ -145,7 +145,7 @@
    * A server-side method to respond to a getfile http request
    * Copies the contents of the local file into the output stream.
    */
-  static void getFileServer(OutputStream outstream, File localfile,
+  public static void getFileServer(OutputStream outstream, File localfile,
       DataTransferThrottler throttler) 
     throws IOException {
     byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalServiceProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalServiceProtocols.java
new file mode 100644
index 0000000..72baba9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalServiceProtocols.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/** The full set of RPC methods implemented by the Journal Service.  */
+@InterfaceAudience.Private
+public interface JournalServiceProtocols
+  extends JournalProtocol, JournalSyncProtocol {
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalSyncProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalSyncProtocol.java
new file mode 100644
index 0000000..da29d55
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalSyncProtocol.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to sync journal edits. 
+ * 
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY)
+@InterfaceAudience.Private
+public interface JournalSyncProtocol {
+  /**
+   * 
+   * This class is used by both the Namenode and Journal Service to insulate
+   * from the protocol serialization.
+   * 
+   * If you are adding/changing journal sync protocol then you need to change both this
+   * class and ALSO related protocol buffer wire protocol definition in
+   * JournalSyncProtocol.proto.
+   * 
+   * For more details on protocol buffer wire protocol, please see
+   * .../org/apache/hadoop/hdfs/protocolPB/overview.html
+   */
+
+  /**
+   * Return a structure containing details about all edit logs available to be
+   * fetched from the JournalNode.
+   * 
+   * @param journalInfo journal information 
+   * @param sinceTxId return only logs containing transactions >= sinceTxI
+   * @throws IOException
+   */
+  public RemoteEditLogManifest getEditLogManifest(JournalInfo info, long sinceTxId)
+      throws IOException;
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalSyncProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalSyncProtocol.proto
new file mode 100644
index 0000000..c5e234a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalSyncProtocol.proto
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used throughout HDFS -- i.e.
+// by the client, server, and data transfer protocols.
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "JournalSyncProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "hdfs.proto";  // for RemoteEditLogManifestProto
+import "JournalProtocol.proto"; // for JournalInfoProto
+
+/**
+ * sinceTxId - return the editlog information for transactions >= sinceTxId
+ */
+message GetEditLogManifestRequestProto {
+  required JournalInfoProto journalInfo = 1; // Info about the journal
+  required uint64 sinceTxId = 2;  // Transaction ID
+}
+
+/**
+ * manifest - Enumeration of editlogs from journal service 
+ *            for logs >= sinceTxId in the request
+ */
+message GetEditLogManifestResponseProto {
+  required RemoteEditLogManifestProto manifest = 1; 
+}
+
+/**
+ * Protocol used to sync journal edits to a remote node.
+ *
+ * See the request and response for details of rpc call.
+ */
+service JournalSyncProtocolService {
+  /**
+   * Get editlog manifests from the active journalnode for all the editlogs
+   */
+  rpc getEditLogManifest(GetEditLogManifestRequestProto)
+      returns(GetEditLogManifestResponseProto);
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
new file mode 100644
index 0000000..bc9ea42
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
@@ -0,0 +1,29 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=journalstatus.jsp"/>
+<html>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<head><title>Hadoop Administration</title></head>
+
+<body>
+<h1>Hadoop Administration</h1>
+
+<ul> 
+  <li><a href="journalstatus.jsp">Status</a></li> 
+</ul>
+
+</body> 
+</html>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
new file mode 100644
index 0000000..2c6e5a3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
@@ -0,0 +1,42 @@
+<%
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file 
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="org.apache.hadoop.hdfs.server.common.JspHelper"
+  import="org.apache.hadoop.util.ServletUtil"
+%>
+<%!
+  //for java.io.Serializable
+  private static final long serialVersionUID = 1L;
+%>
+
+<!DOCTYPE html>
+<html>
+<link rel="stylesheet" type="text/css" href="/static/hadoop.css">
+<title>Hadoop JournalNode</title>
+    
+<body>
+<h1>JournalNode</h1>
+<%= JspHelper.getVersionTable() %>
+<hr />
+
+<br />
+<b><a href="/logs/">Logs</a></b>
+<%= ServletUtil.htmlFooter() %>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
new file mode 100644
index 0000000..b27f82b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee">
+@journal.servlet.definitions@
+</web-app>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 65a67e5..9cf0888 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -729,6 +729,15 @@
         minNN + "-through-" + maxNN));
   }
   
+  public File getJournalEditsDir() throws IOException {
+    return formatJournalEditsDir(base_dir);
+  }
+  
+  public static File formatJournalEditsDir(File baseDir)
+      throws IOException {
+    return (new File(baseDir, "journal-edits"));
+  }
+  
   public NameNodeInfo[] getNameNodeInfos() {
     return this.nameNodes;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index 6a0f4e7..acd3dda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -608,4 +608,45 @@
     assertEquals(1, uris.size());
     assertTrue(uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
   }
+  
+  @Test
+  public void testJournalNodeConf() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration(false);
+    String str1 = "localhost:50200";
+    String str2 = "localhost:50210";
+    InetSocketAddress isa1 = NetUtils.createSocketAddr(str1);
+    InetSocketAddress isa2 = NetUtils.createSocketAddr(str2);
+    
+    // If nothing is added, zero length collection is returned.
+    Collection<InetSocketAddress> ret = DFSUtil.getJournalNodeAddresses(conf);
+    assertEquals(ret.size(), 0);
+    
+    // Setup single journal node
+    conf.set(DFS_JOURNAL_ADDRESS_KEY, str1);  
+    ret = DFSUtil.getJournalNodeAddresses(conf);
+    assertEquals(ret.size(), 1);
+    for (InetSocketAddress addr : ret) {
+      assertTrue(addr.equals(isa1));
+    }
+    
+    // Add two entries now.
+    conf.set(DFS_JOURNAL_ADDRESS_KEY, "localhost:50200, localhost:50210");
+    ret = DFSUtil.getJournalNodeAddresses(conf);
+    assertEquals(ret.size(), 2);
+    
+    for (InetSocketAddress addr: ret) {
+      assertTrue(addr.equals(isa1) || addr.equals(isa2));
+    }
+    
+    // Test Http addresses
+    InetSocketAddress isa3 = NetUtils.createSocketAddr("localhost:50100");
+    InetSocketAddress isa4 = NetUtils.createSocketAddr("localhost:50110");
+    conf.set(DFS_JOURNAL_HTTP_ADDRESS_KEY, "localhost:50100, localhost:50110");
+    ret = DFSUtil.getJournalNodeHttpAddresses(conf);
+    assertEquals(ret.size(), 2);
+    
+    for (InetSocketAddress addr: ret) {
+      assertTrue(addr.equals(isa3) || addr.equals(isa4));
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
new file mode 100644
index 0000000..f8cd370
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJournal {
+  static final Log LOG = LogFactory.getLog(TestJournal.class);
+  
+  static Configuration newConf(String name) {
+    Configuration conf = new HdfsConfiguration();
+    File dir = new File(MiniDFSCluster.getBaseDirectory(), name + "-edits");
+    if (dir.exists()) {
+      Assert.assertTrue(FileUtil.fullyDelete(dir));
+    }
+    Assert.assertTrue(dir.mkdirs());
+    conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, dir.toURI().toString());
+    return conf;
+  }
+
+  @Test
+  public void testFormat() throws Exception {
+    final Configuration conf = newConf("testFormat");
+    final Journal j = new Journal(conf);
+    LOG.info("Initial  : " + j.getStorage());
+    Assert.assertFalse(j.isFormatted());
+
+    //format
+    final int namespaceId = 123;
+    final String clusterId = "my-cluster-id";
+    j.format(namespaceId, clusterId);
+    Assert.assertTrue(j.isFormatted());
+
+    final StorageInfo info = j.getStorage();
+    LOG.info("Formatted: " + info);
+    
+    Assert.assertEquals(namespaceId, info.getNamespaceID());
+    Assert.assertEquals(clusterId, info.getClusterID());
+    j.close();
+    
+    //create another Journal object
+    final StorageInfo another = new Journal(conf).getStorage();
+    Assert.assertEquals(info.toString(), another.toString());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
new file mode 100644
index 0000000..8cb19b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.journalservice.GetJournalEditServlet;
+import org.apache.hadoop.hdfs.server.journalservice.JournalHttpServer;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestJournalHttpServer {
+  public static final Log LOG = LogFactory.getLog(TestJournalHttpServer.class);
+
+  static {
+    ((Log4JLogger) JournalHttpServer.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private Configuration conf;
+  private File hdfsDir = null;
+  private File path1, path2;
+
+  @Before
+  public void setUp() throws Exception {
+    HdfsConfiguration.init();
+    conf = new HdfsConfiguration();
+
+    hdfsDir = new File(MiniDFSCluster.getBaseDirectory()).getCanonicalFile();
+    if (hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir)) {
+      throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
+    }
+
+    hdfsDir.mkdirs();
+
+    path1 = new File(hdfsDir, "j1dir");
+    path2 = new File(hdfsDir, "j2dir");
+    path1.mkdir();
+    path2.mkdir();
+    if (!path1.exists() || !path2.exists()) {
+      throw new IOException("Couldn't create path in "
+          + hdfsDir.getAbsolutePath());
+    }
+
+    System.out.println("configuring hdfsdir is " + hdfsDir.getAbsolutePath()
+        + "; j1Dir = " + path1.getPath() + "; j2Dir = " + path2.getPath());
+  }
+
+  /**
+   * Test Journal service Http Server by verifying the html page is accessible
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testHttpServer() throws Exception {
+    MiniDFSCluster cluster = null;
+    JournalHttpServer jh1 = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+      conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
+      jh1 = new JournalHttpServer(conf, new Journal(conf),
+          NetUtils.createSocketAddr("localhost:50200"));
+      jh1.start();
+
+      String pageContents = DFSTestUtil.urlGet(new URL(
+          "http://localhost:50200/journalstatus.jsp"));
+      assertTrue(pageContents.contains("JournalNode"));
+
+    } catch (IOException e) {
+      LOG.error("Error in TestHttpServer:", e);
+      assertTrue(e.getLocalizedMessage(), false);
+    } finally {
+      if (jh1 != null)
+        jh1.stop();
+      if (cluster != null)
+        cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Test hasCompleteJournalSegments with different log list combinations
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testHasCompleteJournalSegments() throws Exception {
+    List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
+    logs.add(new RemoteEditLog(3,6));
+    logs.add(new RemoteEditLog(7,10));
+    logs.add(new RemoteEditLog(11,12));
+
+    assertTrue(FSEditLog.hasCompleteJournalSegments(logs, 3, 13));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 1, 13));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 13, 19));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 11, 19));
+
+    logs.remove(1);
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 3, 13));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 1, 13));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 3, 19));
+  }
+  
+  private void copyNNFiles(MiniDFSCluster cluster, File dstDir)
+      throws IOException {
+    Collection<URI> editURIs = cluster.getNameEditsDirs(0);
+    String firstURI = editURIs.iterator().next().getPath();
+    File nnDir = new File(firstURI + "/current");
+   
+    File allFiles[] = FileUtil.listFiles(nnDir);
+    for (File f : allFiles) {
+      IOUtils.copyBytes(new FileInputStream(f),
+          new FileOutputStream(dstDir + "/" + f.getName()), 4096, true);
+    }
+  }
+
+  /**
+   * Test lagging Journal service copies edit segments from another Journal
+   * service: 
+   * 1. start one journal service 
+   * 2. reboot namenode so more segments are created
+   * 3. add another journal service and this new journal service should sync
+   *    with the first journal service
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testCopyEdits() throws Exception {
+    MiniDFSCluster cluster = null;
+    JournalService service1 = null, service2 = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+      // start journal service
+      conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
+      InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
+      InetSocketAddress serverAddr = NetUtils
+          .createSocketAddr("localhost:50900");
+      Journal j1 = new Journal(conf);
+      JournalListener listener1 = new JournalDiskWriter(j1);
+      service1 = new JournalService(conf, nnAddr, serverAddr,
+          new InetSocketAddress(50901), listener1, j1);
+      service1.start();
+
+      // get namenode clusterID/layoutVersion/namespaceID
+      StorageInfo si = service1.getJournal().getStorage();
+      JournalInfo journalInfo = new JournalInfo(si.layoutVersion, si.clusterID,
+          si.namespaceID);
+
+      // restart namenode, so there will be one more journal segments
+      cluster.restartNameNode();
+
+      // TODO: remove file copy when NN can work with journal auto-machine
+      copyNNFiles(cluster, new File(path1 + "/current"));
+
+      // start another journal service that will do the sync
+      conf.set(DFSConfigKeys.DFS_JOURNAL_ADDRESS_KEY, "localhost:50900");
+      conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
+      conf.set(DFSConfigKeys.DFS_JOURNAL_HTTP_ADDRESS_KEY,
+          "localhost:50902, localhost:50901");
+      Journal j2 = new Journal(conf);
+      JournalListener listener2 = new JournalDiskWriter(j2);
+      service2 = new JournalService(conf, nnAddr, new InetSocketAddress(50800),
+          NetUtils.createSocketAddr("localhost:50902"), listener2, j2);
+      service2.start();
+
+      // give service2 sometime to sync
+      Thread.sleep(5000);
+      
+      // TODO: change to sinceTxid to 1 after NN is modified to use journal
+      // service to start
+      RemoteEditLogManifest manifest2 = service2.getEditLogManifest(
+          journalInfo, 3);
+      assertTrue(manifest2.getLogs().size() > 0);
+
+    } catch (IOException e) {
+      LOG.error("Error in TestCopyEdits:", e);
+      assertTrue(e.getLocalizedMessage(), false);
+    } finally {
+      if (cluster != null)
+        cluster.shutdown();
+      if (service1 != null)
+        service1.stop();
+      if (service2 != null)
+        service2.stop();
+    }
+  }
+  
+  /**
+   * Test lagging Journal service fails to copy edit segments from another
+   * Journal service with injected error: 
+   * 1. start one journal service 
+   * 2. reboot namenode so more segments are created
+   * 3. add another journal service and this new journal service should sync
+   *    with the first journal service. Use injected error to fail the sync.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testCopyEditsFailure() throws Exception {
+    MiniDFSCluster cluster = null;
+    JournalService service1 = null, service2 = null;
+    GetJournalEditServlet.FaultInjector faultInjector;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+      // start journal service
+      conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
+      InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
+      InetSocketAddress serverAddr = NetUtils
+          .createSocketAddr("localhost:50900");
+      Journal j1 = new Journal(conf);
+      JournalListener listener1 = new JournalDiskWriter(j1);
+      service1 = new JournalService(conf, nnAddr, serverAddr,
+          new InetSocketAddress(50901), listener1, j1);
+      service1.start();
+
+      // get namenode clusterID/layoutVersion/namespaceID
+      StorageInfo si = service1.getJournal().getStorage();
+      JournalInfo journalInfo = new JournalInfo(si.layoutVersion, si.clusterID,
+          si.namespaceID);
+
+      // restart namenode, so there will be one more journal segments
+      cluster.restartNameNode();
+
+      // TODO: remove file copy when NN can work with journal auto-machine
+      copyNNFiles(cluster, new File(path1 + "/current"));
+
+      faultInjector = Mockito.mock(GetJournalEditServlet.FaultInjector.class);
+      GetJournalEditServlet.FaultInjector.instance = faultInjector;
+      Mockito
+          .doThrow(new IOException("Injecting failure before sending edits"))
+          .when(faultInjector).beforeSendEdits();
+      // start another journal service that will do the sync
+      conf.set(DFSConfigKeys.DFS_JOURNAL_ADDRESS_KEY, "localhost:50900");
+      conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
+      conf.set(DFSConfigKeys.DFS_JOURNAL_HTTP_ADDRESS_KEY,
+          "localhost:50902, localhost:50901");
+      Journal j2 = new Journal(conf);
+      JournalListener listener2 = new JournalDiskWriter(j2);
+      service2 = new JournalService(conf, nnAddr, new InetSocketAddress(50800),
+          NetUtils.createSocketAddr("localhost:50902"), listener2, j2);
+      service2.start();
+
+      // give service2 sometime to sync
+      Thread.sleep(5000);
+      
+      // TODO: change to sinceTxid to 1 after NN is modified to use journal
+      // service to start
+      RemoteEditLogManifest manifest2 = service2.getEditLogManifest(
+          journalInfo, 3);
+      assertTrue(manifest2.getLogs().size() == 0);
+
+    } catch (IOException e) {
+      LOG.error("Error in TestCopyEdits:", e);
+      assertTrue(e.getLocalizedMessage(), false);
+    } finally {
+      if (cluster != null)
+        cluster.shutdown();
+      if (service1 != null)
+        service1.stop();
+      if (service2 != null)
+        service2.stop();
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
index b2cb080..b0ad97a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
@@ -20,18 +20,20 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import junit.framework.Assert;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.FencedException;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -39,9 +41,9 @@
  * Tests for {@link JournalService}
  */
 public class TestJournalService {
-  private MiniDFSCluster cluster;
-  private Configuration conf = new HdfsConfiguration();
-  
+  static final Log LOG = LogFactory.getLog(TestJournalService.class);
+  static final InetSocketAddress RPC_ADDR = new InetSocketAddress(0);
+
   /**
    * Test calls backs {@link JournalListener#startLogSegment(JournalService, long)} and
    * {@link JournalListener#journal(JournalService, long, int, byte[])} are
@@ -49,35 +51,85 @@
    */
   @Test
   public void testCallBacks() throws Exception {
+    Configuration conf = TestJournal.newConf("testCallBacks");
+    Journal journal = new Journal(conf);
     JournalListener listener = Mockito.mock(JournalListener.class);
     JournalService service = null;
+    MiniDFSCluster cluster = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive(0);
-      service = startJournalService(listener);
+      InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
+      service = newJournalService(nnAddr, listener, journal, conf);
+      service.start();
       verifyRollLogsCallback(service, listener);
-      verifyJournalCallback(service, listener);
-      verifyFence(service, cluster.getNameNode(0));
+      verifyJournalCallback(cluster.getFileSystem(), service, listener);
     } finally {
-      if (service != null) {
-        service.stop();
-      }
       if (cluster != null) {
         cluster.shutdown();
       }
+      if (service != null) {
+        service.stop();
+      }
     }
   }
 
-  private JournalService startJournalService(JournalListener listener)
-      throws IOException {
-    InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
-    InetSocketAddress serverAddr = new InetSocketAddress(0);
-    JournalService service = new JournalService(conf, nnAddr, serverAddr,
-        listener);
-    service.start();
-    return service;
+  /**
+   * Test journal service fence with a combination of epoch, nsid and clusterid
+   * @throws Exception
+   */
+  @Test
+  public void testFence() throws Exception {
+    final Configuration conf = TestJournal.newConf("testFence");
+    final JournalListener listener = Mockito.mock(JournalListener.class);
+    final InetSocketAddress nnAddress;
+
+    JournalService service = null;
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive(0);
+      NameNode nn = cluster.getNameNode(0);
+      nnAddress = nn.getNameNodeAddress();
+      Journal j1 = new Journal(conf);
+      service = newJournalService(nnAddress, listener, j1, conf);
+      service.start();
+      String cid = nn.getNamesystem().getClusterId();
+      int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
+      int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+
+      verifyFence(service, listener, cid, nsId, lv);
+    } finally {
+      cluster.shutdown();
+    }
+
+    //test restart journal service
+    StorageInfo before = service.getJournal().getStorage();
+    LOG.info("before: " + before);
+    service.stop();
+    Journal j2 = new Journal(conf);
+    service = newJournalService(nnAddress, listener, j2, conf);
+    StorageInfo after = service.getJournal().getStorage();
+    LOG.info("after : " + after);
+    Assert.assertEquals(before.toString(), after.toString());
   }
-  
+
+  /**
+   * Create additional journal service
+   * @param nnAddr  namenode rpc address
+   * @param listener the listener serving journal requests from namenode
+   * @param journal local journal
+   * @param conf local confiuration 
+   * @return journal service
+   * @throws Exception
+   */
+  private JournalService newJournalService(InetSocketAddress nnAddr,
+      JournalListener listener, Journal journal, Configuration conf)
+      throws Exception {
+    return new JournalService(conf, nnAddr, RPC_ADDR, new InetSocketAddress(0),
+        listener, journal);
+  }
+
   /**
    * Starting {@link JournalService} should result in Namenode calling
    * {@link JournalService#startLogSegment}, resulting in callback 
@@ -92,36 +144,74 @@
    * File system write operations should result in JournalListener call
    * backs.
    */
-  private void verifyJournalCallback(JournalService s, JournalListener l) throws IOException {
+  private void verifyJournalCallback(FileSystem fs, JournalService s,
+      JournalListener l) throws IOException {
     Path fileName = new Path("/tmp/verifyJournalCallback");
-    FileSystem fs = cluster.getFileSystem();
     FileSystemTestHelper.createFile(fs, fileName);
     fs.delete(fileName, true);
     Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s),
         Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
   }
   
-  public void verifyFence(JournalService s, NameNode nn) throws Exception {
-    String cid = nn.getNamesystem().getClusterId();
-    int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
-    int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+  /**
+   * Verify the fence with different epoch, clusterid and nsid combinations
+   *  
+   * @param s The Journal Service to write to 
+   * @param listener the listener to serve journal request
+   * @param cid cluster id
+   * @param nsId namespace id
+   * @param lv layoutVersion
+   * @throws Exception
+   */
+  void verifyFence(JournalService s, JournalListener listener,
+      String cid, int nsId, int lv) throws Exception {
     
     // Fence the journal service
     JournalInfo info = new JournalInfo(lv, cid, nsId);
     long currentEpoch = s.getEpoch();
-    
+   
     // New epoch lower than the current epoch is rejected
     try {
       s.fence(info, (currentEpoch - 1), "fencer");
-    } catch (FencedException ignore) { /* Ignored */ } 
+      Assert.fail();
+    } catch (FencedException ignore) { /* Ignored */ }
     
     // New epoch equal to the current epoch is rejected
     try {
       s.fence(info, currentEpoch, "fencer");
-    } catch (FencedException ignore) { /* Ignored */ } 
+      Assert.fail();
+    } catch (FencedException ignore) { /* Ignored */ }
     
     // New epoch higher than the current epoch is successful
     FenceResponse resp = s.fence(info, currentEpoch+1, "fencer");
     Assert.assertNotNull(resp);
+    
+    JournalInfo badInfo = new JournalInfo(lv, "fake", nsId);
+    currentEpoch = s.getEpoch();
+   
+    // Send in the wrong cluster id. fence should fail
+    try {
+      s.fence(badInfo, currentEpoch+1, "fencer");
+      Assert.fail();
+      
+    } catch (UnregisteredNodeException ignore) {
+      LOG.info(ignore.getMessage());
+    }
+  
+    badInfo = new JournalInfo(lv, cid, nsId+1);
+    currentEpoch = s.getEpoch();
+    
+    // Send in the wrong nsid. fence should fail
+    try {
+      s.fence(badInfo, currentEpoch+1, "fencer");
+      Assert.fail();
+    } catch (UnregisteredNodeException ignore) {
+      LOG.info(ignore.getMessage());
+    } 
+    
+    // New epoch higher than the current epoch is successful
+    resp = s.fence(info, currentEpoch+1, "fencer");
+    Assert.assertNotNull(resp);
+   
   }
 }
\ No newline at end of file