Merge r1337003 through r1346681 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3092@1346682 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
new file mode 100644
index 0000000..f9a9c6f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
@@ -0,0 +1,42 @@
+Hadoop HDFS Change Log
+
+HDFS-3092 branch changes
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ HDFS-3185. Add configuration of Journal Service Daemons.
+ (Hari Mankude via suresh)
+
+ HDFS-3213. Persist the cluster id and nsid in JournalService storage
+ directory. (Hari Mankude via szetszwo)
+
+ HDFS-3274. Add a new conf key dfs.journal.edits.dir and use it for
+ edit log initialization. (Hari Mankude via szetszwo)
+
+ HDFS-3283. Add http server to journal service. (Brandon Li via szetszwo)
+
+ HDFS-3196. Add Journal and JournalDiskWriter for journal service.
+ (szetszwo)
+
+ HDFS-3201. Add GetJournalEditServlet for downloading edit logs from journal
+ service. (Brandon Li via szetszwo)
+
+ HDFS-3313. Create a protocol for journal service synchronziation. (Brandon
+ Li via szetszwo)
+
+ HDFS-3186. Add ability to journal service to sync journals from another
+ active journal service. (Brandon Li via suresh)
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ BUG FIXES
+
+ HDFS-3388. GetJournalEditServlet should close output stream only if the
+ stream is used. (Brandon Li via szetszwo)
+
+ HDFS-3511. Update GetJournalEditServlet and JournalHttpServer for the api
+ changes in SecurityUtil. (Brandon Li via szetszwo)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index ebff0f5..a0878df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -187,6 +187,25 @@
</configuration>
</execution>
<execution>
+ <id>journal</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <compile>false</compile>
+ <workingDirectory>${project.build.directory}/generated-src/main/jsp</workingDirectory>
+ <webFragmentFile>${project.build.directory}/journal-jsp-servlet-definitions.xml</webFragmentFile>
+ <packageName>org.apache.hadoop.hdfs.server.journalservice</packageName>
+ <sources>
+ <directory>${basedir}/src/main/webapps/journal</directory>
+ <includes>
+ <include>*.jsp</include>
+ </includes>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
<id>datanode</id>
<phase>generate-sources</phase>
<goals>
@@ -293,6 +312,7 @@
<loadfile property="hdfs.servlet.definitions" srcFile="${project.build.directory}/hdfs-jsp-servlet-definitions.xml"/>
<loadfile property="secondary.servlet.definitions" srcFile="${project.build.directory}/secondary-jsp-servlet-definitions.xml"/>
<loadfile property="datanode.servlet.definitions" srcFile="${project.build.directory}/datanode-jsp-servlet-definitions.xml"/>
+ <loadfile property="journal.servlet.definitions" srcFile="${project.build.directory}/journal-jsp-servlet-definitions.xml"/>
<echoproperties destfile="${project.build.directory}/webxml.properties">
<propertyset>
<propertyref regex=".*.servlet.definitions"/>
@@ -308,6 +328,9 @@
<copy file="${basedir}/src/main/webapps/proto-datanode-web.xml"
tofile="${project.build.directory}/webapps/datanode/WEB-INF/web.xml"
filtering="true"/>
+ <copy file="${basedir}/src/main/webapps/proto-journal-web.xml"
+ tofile="${project.build.directory}/webapps/journal/WEB-INF/web.xml"
+ filtering="true"/>
<copy toDir="${project.build.directory}/webapps">
<fileset dir="${basedir}/src/main/webapps">
<exclude name="**/*.jsp"/>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 309a72b..d132db7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -200,6 +200,16 @@
public static final String DFS_HOSTS = "dfs.hosts";
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
+
+ // This is a comma separated host:port list of addresses hosting the journal service
+ public static final String DFS_JOURNAL_ADDRESS_KEY = "dfs.journal.addresses";
+ public static final String DFS_JOURNAL_EDITS_DIR_KEY = "dfs.journal.edits.dir";
+ public static final String DFS_JOURNAL_HTTP_ADDRESS_KEY = "dfs.journal.http-addresses";
+ public static final String DFS_JOURNAL_HTTPS_PORT_KEY = "dfs.journal.https-port";
+ public static final int DFS_JOURNAL_HTTPS_PORT_DEFAULT = 50510;
+ public static final String DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY = "dfs.journal.kerberos.https.principal";
+ public static final String DFS_JOURNAL_KEYTAB_FILE_KEY = "dfs.journal.keytab.file";
+ public static final String DFS_JOURNAL_USER_NAME_KEY = "dfs.journal.kerberos.principal";
// Much code in hdfs is not yet updated to use these keys.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
@@ -306,7 +316,7 @@
"dfs.image.transfer.bandwidthPerSec";
public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling
- //Keys with no defaults
+ // Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index a78a61e..dc266fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -25,6 +25,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.security.SecureRandom;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -1064,4 +1065,61 @@
return null;
}
}
+
+ private static Collection<InetSocketAddress> getAddresses(Configuration conf,
+ String addrKey) {
+ Collection<String> addresses = conf.getTrimmedStringCollection(addrKey);
+ Collection<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
+ for (String address : emptyAsSingletonNull(addresses)) {
+ if (address == null) {
+ continue;
+ }
+ ret.add(NetUtils.createSocketAddr(address));
+ }
+ return ret;
+ }
+
+ /**
+ * Returns list of RPC server InetSocketAddresses of journal services from the
+ * configuration.
+ *
+ * @param conf configuration
+ * @return list of InetSocketAddresses
+ */
+ public static Collection<InetSocketAddress> getJournalNodeAddresses(
+ Configuration conf) {
+ return getAddresses(conf, DFS_JOURNAL_ADDRESS_KEY);
+ }
+
+ /**
+ * Returns list of http InetSocketAddresses of journal services from the
+ * configuration.
+ *
+ * @param conf configuration
+ * @return list of http InetSocketAddresses
+ */
+ public static Collection<InetSocketAddress> getJournalNodeHttpAddresses(
+ Configuration conf) {
+ return getAddresses(conf, DFS_JOURNAL_HTTP_ADDRESS_KEY);
+ }
+
+ /**
+ * Returns corresponding rpc address with the hostname running journal
+ * service.
+ *
+ * @param conf configuration
+ * @param hostname the hostname in the http address
+ * @return rpc address of the journal service
+ */
+ public static InetSocketAddress getJournalRpcAddrFromHostName(
+ Configuration conf, String hostname) {
+ Collection<InetSocketAddress> jRpcAddr = DFSUtil
+ .getJournalNodeAddresses(conf);
+ for (InetSocketAddress addr : jRpcAddr) {
+ if (addr != null && addr.getHostName().equals(hostname)) {
+ return addr;
+ }
+ }
+ return null;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
index eabdd22..1e3d0e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
@@ -41,6 +41,10 @@
public UnregisteredNodeException(NodeRegistration nodeReg) {
super("Unregistered server: " + nodeReg.toString());
}
+
+ public UnregisteredNodeException(String msg) {
+ super("Unregistered server: " + msg);
+ }
/**
* The exception is thrown if a different data-node claims the same
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolPB.java
new file mode 100644
index 0000000..0b8886e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolPB.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.JournalSyncProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to synchronize journal edit logs.
+ *
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
+ */
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY)
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Private
+public interface JournalSyncProtocolPB extends
+ JournalSyncProtocolService.BlockingInterface {
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..4fe0832
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolServerSideTranslatorPB.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link JournalSyncProtocolPB} to the
+ * {@link JournalSyncProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class JournalSyncProtocolServerSideTranslatorPB implements JournalSyncProtocolPB {
+ /** Server side implementation to delegate the requests to */
+ private final JournalSyncProtocol impl;
+
+ public JournalSyncProtocolServerSideTranslatorPB(JournalSyncProtocol impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public GetEditLogManifestResponseProto getEditLogManifest(
+ RpcController unused, GetEditLogManifestRequestProto request)
+ throws ServiceException {
+ RemoteEditLogManifest manifest;
+ try {
+ JournalInfo journalInfo = PBHelper.convert(request.getJournalInfo());
+ manifest = impl.getEditLogManifest(journalInfo, request.getSinceTxId());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return GetEditLogManifestResponseProto.newBuilder()
+ .setManifest(PBHelper.convert(manifest)).build();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolTranslatorPB.java
new file mode 100644
index 0000000..add41bd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalSyncProtocolTranslatorPB.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link JournalSyncProtocol} interfaces to the RPC server implementing
+ * {@link JournalSyncProtocolPB}.
+ */
+@InterfaceAudience.Private
+public class JournalSyncProtocolTranslatorPB implements ProtocolMetaInterface,
+ JournalSyncProtocol, Closeable {
+ /** RpcController is not used and hence is set to null */
+ private final static RpcController NULL_CONTROLLER = null;
+ private final JournalSyncProtocolPB rpcProxy;
+
+ public JournalSyncProtocolTranslatorPB(JournalSyncProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public RemoteEditLogManifest getEditLogManifest(JournalInfo journalInfo, long sinceTxId)
+ throws IOException {
+ JournalInfoProto journalInfoProto = PBHelper.convert(journalInfo);
+ GetEditLogManifestRequestProto req = GetEditLogManifestRequestProto
+ .newBuilder().setJournalInfo(journalInfoProto).setSinceTxId(sinceTxId).build();
+
+ try {
+ return PBHelper.convert(rpcProxy.getEditLogManifest(NULL_CONTROLLER, req)
+ .getManifest());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy, JournalSyncProtocolPB.class,
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(JournalSyncProtocolPB.class), methodName);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/GetJournalEditServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/GetJournalEditServlet.java
new file mode 100644
index 0000000..c14c4e8e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/GetJournalEditServlet.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
+import org.apache.hadoop.hdfs.server.namenode.GetImageServlet.GetImageParams;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class is used by the lagging Journal service to retrieve edit file from
+ * another Journal service for sync up.
+ */
+@InterfaceAudience.Private
+public class GetJournalEditServlet extends HttpServlet {
+
+ private static final long serialVersionUID = -4635891628211723009L;
+ private static final Log LOG = LogFactory.getLog(GetJournalEditServlet.class);
+
+ // TODO: create security tests
+ protected boolean isValidRequestor(String remoteUser, Configuration conf)
+ throws IOException {
+ if (remoteUser == null) { // This really shouldn't happen...
+ LOG.warn("Received null remoteUser while authorizing access to GetJournalEditServlet");
+ return false;
+ }
+
+ String[] validRequestors = {
+ SecurityUtil.getServerPrincipal(conf
+ .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
+ .getAddress(conf).getHostName()),
+ SecurityUtil.getServerPrincipal(
+ conf.get(DFSConfigKeys.DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY),
+ NameNode.getAddress(conf).getHostName()),
+ SecurityUtil.getServerPrincipal(conf
+ .get(DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY),
+ NameNode.getAddress(conf).getHostName()) };
+
+ for (String v : validRequestors) {
+ if (v != null && v.equals(remoteUser)) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("isValidRequestor is allowing: " + remoteUser);
+ return true;
+ }
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("isValidRequestor is rejecting: " + remoteUser);
+ return false;
+ }
+
+ @Override
+ public void doGet(final HttpServletRequest request,
+ final HttpServletResponse response) throws ServletException, IOException {
+ try {
+ ServletContext context = getServletContext();
+ final NNStorage storage = JournalHttpServer
+ .getJournalFromContext(context).getStorage();
+
+ final GetImageParams parsedParams = new GetImageParams(request, response);
+
+ final Configuration conf = (Configuration) getServletContext()
+ .getAttribute(JspHelper.CURRENT_CONF);
+
+ if (UserGroupInformation.isSecurityEnabled()
+ && !isValidRequestor(request.getRemoteUser(), conf)) {
+ response
+ .sendError(HttpServletResponse.SC_FORBIDDEN,
+ "Only Namenode and another Journal service may access this servlet");
+ LOG.warn("Received non-NN/Journal request for edits from "
+ + request.getRemoteHost());
+ return;
+ }
+
+ String myStorageInfoString = storage.toColonSeparatedString();
+ String theirStorageInfoString = parsedParams.getStorageInfoString();
+ if (theirStorageInfoString != null
+ && !myStorageInfoString.equals(theirStorageInfoString)) {
+ response
+ .sendError(HttpServletResponse.SC_FORBIDDEN,
+ "This node has storage info " + myStorageInfoString
+ + " but the requesting node expected "
+ + theirStorageInfoString);
+ LOG.warn("Received an invalid request file transfer request "
+ + " with storage info " + theirStorageInfoString);
+ return;
+ }
+
+ UserGroupInformation.getCurrentUser().doAs(
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ if (parsedParams.isGetEdit()) {
+ long startTxId = parsedParams.getStartTxId();
+ long endTxId = parsedParams.getEndTxId();
+ File editFile = storage.findFinalizedEditsFile(startTxId,
+ endTxId);
+
+ GetImageServlet.setVerificationHeaders(response, editFile);
+ GetImageServlet.setFileNameHeaders(response, editFile);
+
+ DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
+
+ // send edits
+ FaultInjector.instance.beforeSendEdits();
+ ServletOutputStream output = response.getOutputStream();
+ try {
+ TransferFsImage.getFileServer(output, editFile, throttler);
+ } finally {
+ if (output != null)
+ output.close();
+ }
+
+ } else {
+ response
+ .sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED,
+ "The server only accepts getedit request. This request is not getedit.");
+ }
+ return null;
+ }
+ });
+
+ } catch (Exception ie) {
+ String errMsg = "getedit failed. " + StringUtils.stringifyException(ie);
+ response.sendError(HttpServletResponse.SC_GONE, errMsg);
+ throw new IOException(errMsg);
+ }
+ }
+
+ /**
+ * Static nested class only for fault injection. Typical usage of this class
+ * is to make a Mockito object of this class, and then use the Mackito object
+ * to control the behavior of the fault injection.
+ */
+ public static class FaultInjector {
+ public static FaultInjector instance =
+ new FaultInjector();
+
+ public void beforeSendEdits() throws IOException {}
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
new file mode 100644
index 0000000..bb02075
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+/** The journal stored in local directories. */
+class Journal {
+ static final Log LOG = LogFactory.getLog(Journal.class);
+
+ private final FSImage image;
+ private volatile boolean isFormatted;
+
+ /**
+ * Constructor. It is possible that the directory is not formatted.
+ */
+ Journal(Configuration conf) throws IOException {
+ this.image = new FSImage(conf, Collections.<URI>emptyList(), getEditDirs(conf));
+
+ final Map<StorageDirectory, StorageState> states
+ = new HashMap<StorageDirectory, StorageState>();
+ isFormatted = image.recoverStorageDirs(StartupOption.REGULAR, states);
+ for(Map.Entry<StorageDirectory, StorageState> e : states.entrySet()) {
+ LOG.info(e.getKey() + ": " + e.getValue());
+ }
+ LOG.info("The journal is " + (isFormatted? "already": "not yet")
+ + " formatted: " + image.getStorage());
+ }
+
+ /**
+ * Format the local edit directory.
+ */
+ synchronized void format(int namespaceId, String clusterId) throws IOException {
+ if (isFormatted) {
+ throw new IllegalStateException("The joural is already formatted.");
+ }
+ final NNStorage s = image.getStorage();
+ s.format(new NamespaceInfo(namespaceId, clusterId, "dummy-bpid", 0, 0));
+ isFormatted = true;
+ LOG.info("Formatted journal: " + s);
+ }
+
+ boolean isFormatted() {
+ return isFormatted;
+ }
+
+ NNStorage getStorage() {
+ return image.getStorage();
+ }
+
+ synchronized void verifyVersion(JournalService service, NamespaceInfo info
+ ) throws IOException {
+ if (!isFormatted) {
+ return;
+ }
+
+ final StorageInfo stored = image.getStorage();
+ if (!stored.getClusterID().equals(info.getClusterID())) {
+ throw new IOException("Cluster IDs not matched: stored = "
+ + stored.getClusterID() + " != passed = " + info.getClusterID());
+ }
+ if (stored.getNamespaceID() != info.getNamespaceID()) {
+ throw new IOException("Namespace IDs not matched: stored = "
+ + stored.getNamespaceID() + " != passed = " + info.getNamespaceID());
+ }
+ if (stored.getLayoutVersion() != info.getLayoutVersion()) {
+ throw new IOException("Layout versions not matched: stored = "
+ + stored.getLayoutVersion() + " != passed = " + info.getLayoutVersion());
+ }
+ if (stored.getCTime() != info.getCTime()) {
+ throw new IOException("CTimes not matched: stored = "
+ + stored.getCTime() + " != passed = " + info.getCTime());
+ }
+ }
+
+ void close() throws IOException {
+ image.close();
+ }
+
+ FSEditLog getEditLog() {
+ return image.getEditLog();
+ }
+
+ RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
+ return image.getEditLog().getEditLogManifest(sinceTxId);
+ }
+
+ static List<URI> getEditDirs(Configuration conf) throws IOException {
+ final Collection<String> dirs = conf.getTrimmedStringCollection(
+ DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY);
+ LOG.info(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY + " = " + dirs);
+ return Util.stringCollectionAsURIs(dirs);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
new file mode 100644
index 0000000..3b3c412
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/** A JournalListener for writing journal to an edit log. */
+class JournalDiskWriter implements JournalListener {
+ static final Log LOG = LogFactory.getLog(JournalDiskWriter.class);
+
+ private final Journal journal;
+
+ /**
+ * Constructor. It is possible that the directory is not formatted. In this
+ * case, it creates dummy entries and storage is later formatted.
+ */
+ JournalDiskWriter(Journal journal) throws IOException {
+ this.journal = journal;
+ }
+
+ Journal getJournal() {
+ return journal;
+ }
+
+ @Override
+ public synchronized void verifyVersion(JournalService service,
+ NamespaceInfo info) throws IOException {
+ journal.verifyVersion(service, info);
+ }
+
+ @Override
+ public synchronized void journal(JournalService service, long firstTxId,
+ int numTxns, byte[] records) throws IOException {
+ journal.getEditLog().journal(firstTxId, numTxns, records);
+ }
+
+ @Override
+ public synchronized void startLogSegment(JournalService service, long txid
+ ) throws IOException {
+ journal.getEditLog().startLogSegment(txid, false);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
new file mode 100644
index 0000000..dc11012
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Encapsulates the HTTP server started by the Journal Service.
+ */
+@InterfaceAudience.Private
+public class JournalHttpServer {
+ public static final Log LOG = LogFactory.getLog(JournalHttpServer.class);
+
+ public static final String JOURNAL_ATTRIBUTE_KEY = "localjournal";
+
+ private HttpServer httpServer;
+ private InetSocketAddress httpAddress;
+ private String infoBindAddress;
+ private int infoPort;
+ private int httpsPort;
+ private Journal localJournal;
+
+ private final Configuration conf;
+
+ JournalHttpServer(Configuration conf, Journal journal,
+ InetSocketAddress bindAddress) {
+ this.conf = conf;
+ this.localJournal = journal;
+ this.httpAddress = bindAddress;
+ }
+
+ void start() throws IOException {
+ infoBindAddress = httpAddress.getHostName();
+
+ // initialize the webserver for uploading/downloading files.
+ // Kerberized SSL servers must be run from the host principal...
+ UserGroupInformation httpUGI = UserGroupInformation
+ .loginUserFromKeytabAndReturnUGI(SecurityUtil.getServerPrincipal(
+ conf.get(DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY),
+ infoBindAddress), conf.get(DFS_JOURNAL_KEYTAB_FILE_KEY));
+ try {
+ httpServer = httpUGI.doAs(new PrivilegedExceptionAction<HttpServer>() {
+ @Override
+ public HttpServer run() throws IOException, InterruptedException {
+ LOG.info("Starting web server as: "
+ + UserGroupInformation.getCurrentUser().getUserName());
+
+ int tmpInfoPort = httpAddress.getPort();
+ httpServer = new HttpServer("journal", infoBindAddress,
+ tmpInfoPort, tmpInfoPort == 0, conf, new AccessControlList(conf
+ .get(DFS_ADMIN, " ")));
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // TODO: implementation
+ }
+ httpServer.setAttribute(JOURNAL_ATTRIBUTE_KEY, localJournal);
+ httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
+ // use "/getimage" because GetJournalEditServlet uses some
+ // GetImageServlet methods.
+ // TODO: change getimage to getedit
+ httpServer.addInternalServlet("getimage", "/getimage",
+ GetJournalEditServlet.class, true);
+ httpServer.start();
+ return httpServer;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ // The web-server port can be ephemeral... ensure we have the correct info
+ infoPort = httpServer.getPort();
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ httpsPort = infoPort;
+ }
+
+ LOG.info("Journal Web-server up at: " + infoBindAddress + ":" + infoPort
+ + " and https port is: " + httpsPort);
+ }
+
+ void stop() throws IOException {
+ if (httpServer != null) {
+ try {
+ httpServer.stop();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ InetSocketAddress getHttpAddress() {
+ return httpAddress;
+ }
+
+ public static Journal getJournalFromContext(ServletContext context) {
+ return (Journal) context.getAttribute(JOURNAL_ATTRIBUTE_KEY);
+ }
+
+ public static Configuration getConfFromContext(ServletContext context) {
+ return (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ }
+
+ /**
+ * Download <code>edits</code> files from another journal service
+ *
+ * @return true if a new image has been downloaded and needs to be loaded
+ * @throws IOException
+ */
+ boolean downloadEditFiles(final String jnHostPort,
+ final RemoteEditLogManifest manifest) throws IOException {
+
+ // Sanity check manifest
+ if (manifest.getLogs().isEmpty()) {
+ throw new IOException("Found no edit logs to download");
+ }
+
+ try {
+ Boolean b = UserGroupInformation.getCurrentUser().doAs(
+ new PrivilegedExceptionAction<Boolean>() {
+
+ @Override
+ public Boolean run() throws Exception {
+ // get edits file
+ for (RemoteEditLog log : manifest.getLogs()) {
+ TransferFsImage.downloadEditsToStorage(jnHostPort, log,
+ localJournal.getStorage());
+ }
+ return true;
+ }
+ });
+ return b.booleanValue();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
index 2d5ec9e..e2c6f68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
@@ -35,8 +35,10 @@
*
* The application using {@link JournalService} can stop the service if
* {@code info} validation fails.
+ * @throws IOException
*/
- public void verifyVersion(JournalService service, NamespaceInfo info);
+ public void verifyVersion(JournalService service, NamespaceInfo info
+ ) throws IOException;
/**
* Process the received Journal record
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
index e8d7073..88b918c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
@@ -17,31 +17,43 @@
*/
package org.apache.hadoop.hdfs.server.journalservice;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
+import org.apache.hadoop.hdfs.protocol.proto.JournalSyncProtocolProtos.JournalSyncProtocolService;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.JournalServiceProtocols;
+import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
@@ -61,18 +73,26 @@
* listener implementation can handle the callbacks based on the application
* requirement.
*/
-public class JournalService implements JournalProtocol {
+public class JournalService implements JournalServiceProtocols {
public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
-
- private final JournalListener listener;
private final InetSocketAddress nnAddress;
- private final NamenodeRegistration registration;
+ private NamenodeRegistration registration;
private final NamenodeProtocol namenode;
private final StateHandler stateHandler = new StateHandler();
private final RPC.Server rpcServer;
+ private final JournalHttpServer httpServer;
private long epoch = 0;
private String fencerInfo;
+ private Daemon syncThread = null;
+ private Configuration conf;
+ // Flags to indicate whether to start sync
+ private boolean toStartSync = false;
+ private long syncSinceTxid = -1;
+
+ private final Journal journal;
+ private final JournalListener listener;
+
enum State {
/** The service is initialized and ready to start. */
INIT(false, false),
@@ -121,12 +141,35 @@
current = State.WAITING_FOR_ROLL;
}
- synchronized void startLogSegment() {
+ synchronized State startLogSegment() {
+ State prevState = current;
if (current == State.WAITING_FOR_ROLL) {
current = State.SYNCING;
}
+ return prevState;
}
+
+ /**
+ * Try to transit to IN_SYNC state
+ * @return current state. if returned state is not IN_SYNC, caller should
+ * know inSync failed
+ */
+ synchronized State inSync() {
+ if (current == State.IN_SYNC) {
+ throw new IllegalStateException("Service cannot be in " + current
+ + " state.");
+ }
+ if (current == State.SYNCING) {
+ current = State.IN_SYNC;
+ }
+ return current;
+ }
+
+ synchronized void fence() {
+ current = State.WAITING_FOR_ROLL;
+ }
+
synchronized void isStartLogSegmentAllowed() throws IOException {
if (!current.isStartLogSegmentAllowed) {
throw new IOException("Cannot start log segment in " + current
@@ -161,36 +204,56 @@
* {@code server} is a valid server that is managed out side this
* service.
* @param listener call-back interface to listen to journal activities
+ * @param journal the journal used by both Listener and JournalService
* @throws IOException on error
*/
JournalService(Configuration conf, InetSocketAddress nnAddr,
- InetSocketAddress serverAddress, JournalListener listener)
- throws IOException {
+ InetSocketAddress serverAddress, InetSocketAddress httpAddress,
+ JournalListener listener, Journal journal) throws IOException {
this.nnAddress = nnAddr;
this.listener = listener;
+ this.journal = journal;
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
.getProxy();
this.rpcServer = createRpcServer(conf, serverAddress, this);
-
- String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
- StorageInfo storage = new StorageInfo(
- LayoutVersion.getCurrentLayoutVersion(), 0, "", 0);
- registration = new NamenodeRegistration(addr, "", storage,
- NamenodeRole.BACKUP);
+ this.httpServer = new JournalHttpServer(conf, journal, httpAddress);
+ this.conf = conf;
}
+ Journal getJournal() {
+ return journal;
+ }
+
+ synchronized NamenodeRegistration getRegistration() {
+ if (!journal.isFormatted()) {
+ throw new IllegalStateException("Journal is not formatted.");
+ }
+
+ if (registration == null) {
+ registration = new NamenodeRegistration(
+ NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
+ journal.getStorage(), NamenodeRole.BACKUP);
+ }
+ return registration;
+ }
+
/**
* Start the service.
+ * @throws IOException on error
*/
- public void start() {
+ public void start() throws IOException {
stateHandler.start();
// Start the RPC server
- LOG.info("Starting rpc server");
+ LOG.info("Starting journal service rpc server");
rpcServer.start();
+
+ // Start the HTTP server
+ LOG.info("Starting journal service http server");
+ httpServer.start();
- for(boolean registered = false, handshakeComplete = false; ; ) {
+ for (boolean registered = false, handshakeComplete = false;;) {
try {
// Perform handshake
if (!handshakeComplete) {
@@ -198,7 +261,7 @@
handshakeComplete = true;
LOG.info("handshake completed");
}
-
+
// Register with the namenode
if (!registered) {
registerWithNamenode();
@@ -211,7 +274,7 @@
} catch (Exception e) {
LOG.warn("Encountered exception ", e);
}
-
+
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
@@ -220,6 +283,14 @@
}
stateHandler.waitForRoll();
+
+ // Create a never ending daemon to sync journal segments
+ // TODO: remove the assumption that "won't delete logs"
+ // use 3 because NN rolls with txid=3 when first journal service joining.
+ // need to fix this after NN is modified to ignore its local storage dir
+ syncThread = new Daemon(new JournalSync(this));
+ syncThread.start();
+
try {
namenode.rollEditLog();
} catch (IOException e) {
@@ -230,10 +301,14 @@
/**
* Stop the service. For application with RPC Server managed outside, the
* RPC Server must be stopped the application.
+ * @throws IOException on error
*/
- public void stop() {
+ public void stop() throws IOException {
if (!stateHandler.isStopped()) {
+ syncThread.interrupt();
+ httpServer.stop();
rpcServer.stop();
+ journal.close();
}
}
@@ -257,34 +332,70 @@
stateHandler.isStartLogSegmentAllowed();
verify(epoch, journalInfo);
listener.startLogSegment(this, txid);
- stateHandler.startLogSegment();
+
+ if (stateHandler.startLogSegment() == State.WAITING_FOR_ROLL) {
+ LOG.info("Notify syncThread to re-sync with txid:" + syncSinceTxid);
+ startSync(syncSinceTxid);
+ }
}
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
+
+ // It is the first fence if the journal is not formatted,
+ if (!journal.isFormatted()) {
+ journal.format(journalInfo.getNamespaceId(), journalInfo.getClusterId());
+ }
verifyFence(epoch, fencerInfo);
- verify(journalInfo);
+ verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
long previousEpoch = epoch;
this.epoch = epoch;
this.fencerInfo = fencerInfo;
-
+ stateHandler.fence();
+
// TODO:HDFS-3092 set lastTransId and inSync
return new FenceResponse(previousEpoch, 0, false);
}
+ @Override
+ public RemoteEditLogManifest getEditLogManifest(JournalInfo journalInfo,
+ long sinceTxId) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received getEditLogManifest " + sinceTxId);
+ }
+ if (!journal.isFormatted()) {
+ throw new IOException("This journal service is not formatted.");
+ }
+ verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
+
+ // Journal has only one storage directory
+ return journal.getEditLogManifest(sinceTxId);
+ }
+
/** Create an RPC server. */
private static RPC.Server createRpcServer(Configuration conf,
- InetSocketAddress address, JournalProtocol impl) throws IOException {
+ InetSocketAddress address, JournalServiceProtocols impl) throws IOException {
+
RPC.setProtocolEngine(conf, JournalProtocolPB.class,
ProtobufRpcEngine.class);
JournalProtocolServerSideTranslatorPB xlator =
new JournalProtocolServerSideTranslatorPB(impl);
BlockingService service =
JournalProtocolService.newReflectiveBlockingService(xlator);
- return RPC.getServer(JournalProtocolPB.class, service,
+
+ JournalSyncProtocolServerSideTranslatorPB syncXlator =
+ new JournalSyncProtocolServerSideTranslatorPB(impl);
+ BlockingService syncService =
+ JournalSyncProtocolService.newReflectiveBlockingService(syncXlator);
+
+ RPC.Server rpcServer = RPC.getServer(JournalProtocolPB.class, service,
address.getHostName(), address.getPort(), 1, false, conf, null);
+ DFSUtil.addPBProtocol(conf, JournalSyncProtocolPB.class, syncService,
+ rpcServer);
+
+ return rpcServer;
}
private void verifyEpoch(long e) throws FencedException {
@@ -310,20 +421,22 @@
/**
* Verifies a journal request
*/
- private void verify(JournalInfo journalInfo) throws IOException {
+ private void verify(int nsid, String clusid) throws IOException {
String errorMsg = null;
- int expectedNamespaceID = registration.getNamespaceID();
- if (journalInfo.getNamespaceId() != expectedNamespaceID) {
- errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
- + " actual " + journalInfo.getNamespaceId();
+ final NamenodeRegistration reg = getRegistration();
+
+ if (nsid != reg.getNamespaceID()) {
+ errorMsg = "Invalid namespaceID in journal request - expected "
+ + reg.getNamespaceID() + " actual " + nsid;
LOG.warn(errorMsg);
- throw new UnregisteredNodeException(journalInfo);
- }
- if (!journalInfo.getClusterId().equals(registration.getClusterID())) {
- errorMsg = "Invalid clusterId in journal request - expected "
- + journalInfo.getClusterId() + " actual " + registration.getClusterID();
+ throw new UnregisteredNodeException(errorMsg);
+ }
+ if ((clusid == null)
+ || (!clusid.equals(reg.getClusterID()))) {
+ errorMsg = "Invalid clusterId in journal request - incoming "
+ + clusid + " expected " + reg.getClusterID();
LOG.warn(errorMsg);
- throw new UnregisteredNodeException(journalInfo);
+ throw new UnregisteredNodeException(errorMsg);
}
}
@@ -332,14 +445,14 @@
*/
private void verify(long e, JournalInfo journalInfo) throws IOException {
verifyEpoch(e);
- verify(journalInfo);
+ verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
}
/**
* Register this service with the active namenode.
*/
private void registerWithNamenode() throws IOException {
- NamenodeRegistration nnReg = namenode.register(registration);
+ NamenodeRegistration nnReg = namenode.register(getRegistration());
String msg = null;
if(nnReg == null) { // consider as a rejection
msg = "Registration rejected by " + nnAddress;
@@ -355,11 +468,177 @@
private void handshake() throws IOException {
NamespaceInfo nsInfo = namenode.versionRequest();
listener.verifyVersion(this, nsInfo);
- registration.setStorageInfo(nsInfo);
+
+ // If this is the first initialization of journal service, then storage
+ // directory will be setup. Otherwise, nsid and clusterid has to match with
+ // the info saved in the edits dir.
+ if (!journal.isFormatted()) {
+ journal.format(nsInfo.getNamespaceID(), nsInfo.getClusterID());
+ } else {
+ verify(nsInfo.getNamespaceID(), nsInfo.getClusterID());
+ }
}
@VisibleForTesting
long getEpoch() {
return epoch;
}
+
+ public static JournalSyncProtocol createProxyWithJournalSyncProtocol(
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
+ throws IOException {
+ RPC.setProtocolEngine(conf, JournalSyncProtocolPB.class,
+ ProtobufRpcEngine.class);
+ Object proxy = RPC.getProxy(JournalSyncProtocolPB.class,
+ RPC.getProtocolVersion(JournalSyncProtocolPB.class), address, ugi,
+ conf, NetUtils.getDefaultSocketFactory(conf), 30000);
+
+ return new JournalSyncProtocolTranslatorPB((JournalSyncProtocolPB) proxy);
+ }
+
+ /**
+ * Only invoked by sync thread to wait for {@code #syncSinceTxid} to be set to
+ * start syncing.
+ *
+ * @return txid to start syncing from
+ * @throws InterruptedException
+ */
+ synchronized long waitForStartSync() throws InterruptedException {
+ while (!toStartSync) {
+ wait();
+ }
+ // Sync starting - Unset toStartSync so main thread can set it again
+ toStartSync = false;
+ return syncSinceTxid;
+ }
+
+ /**
+ * Only invoked by main thread to notify sync thread that another round of
+ * sync is needed
+ */
+ synchronized void startSync(long sinceTxid) {
+ if (toStartSync) {
+ LOG.trace("toStartSync is already set.");
+ return;
+ }
+ toStartSync = true;
+ syncSinceTxid = sinceTxid;
+ notify();
+ }
+
+ /**
+ * JournalSync downloads journal segments from other journal services
+ */
+ class JournalSync implements Runnable {
+ private final JournalInfo journalInfo;
+ private final JournalService journalService;
+ private long sinceTxid;
+
+ /**
+ * Constructor
+ * @param journalService Local journal service
+ */
+ JournalSync(JournalService journalService) {
+ NNStorage storage = journalService.getJournal().getStorage();
+ this.journalInfo = new JournalInfo(storage.layoutVersion,
+ storage.clusterID, storage.namespaceID);
+ this.sinceTxid = 0;
+ this.journalService = journalService;
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ sinceTxid = journalService.waitForStartSync();
+ syncAllJournalSegments(conf, journalInfo, sinceTxid);
+ } catch (IOException e) {
+ LOG.error("Unable to sync for "
+ + journalService.getRegistration().getHttpAddress()
+ + " with exception: " + e);
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e1) {
+ break;
+ }
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+ LOG.info("Stopping the JouranlSync thread");
+ }
+
+ public String toString() {
+ return "JournalSync for "
+ + journalService.getRegistration().getHttpAddress();
+ }
+
+ /**
+ * Contact journal service one by one to get all missed journal segments
+ *
+ * @param conf Configuration
+ * @param journalInfo the JournalInfo of the local journal service
+ * @param sinceTxid the transaction id to start with
+ * @throws IOException
+ */
+ void syncAllJournalSegments(Configuration conf, JournalInfo journalInfo,
+ long sinceTxid) throws IOException {
+
+ // Get a list of configured journal services
+ Collection<InetSocketAddress> addrList = DFSUtil
+ .getJournalNodeHttpAddresses(conf);
+ FSEditLog editLog = journal.getEditLog();
+ File currentDir = new File(
+ conf.get(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY));
+
+ boolean needSync = !editLog.hasCompleteJournalSegments(sinceTxid,
+ currentDir);
+ if (!needSync) {
+ LOG.trace("Nothing to sync.");
+ return;
+ }
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ InetSocketAddress selfAddr = journalService.httpServer.getHttpAddress();
+ for (InetSocketAddress addr : addrList) {
+ try {
+ // Skip itself
+ if (addr.getHostName().equals(selfAddr.getHostName())
+ && addr.getPort() == selfAddr.getPort()) {
+ continue;
+ }
+ // Download journal segments
+ InetSocketAddress rpcAddr = DFSUtil.getJournalRpcAddrFromHostName(
+ conf, addr.getHostName());
+ JournalSyncProtocol syncProxy = createProxyWithJournalSyncProtocol(
+ rpcAddr, conf, ugi);
+ RemoteEditLogManifest manifest = syncProxy.getEditLogManifest(
+ journalInfo, sinceTxid);
+ httpServer.downloadEditFiles(NetUtils.getHostPortString(addr),
+ manifest);
+ } catch (IOException e) {
+ LOG.debug("Sync failed for " + selfAddr + "with exception ", e);
+ // Ignore error and try the next journal service
+ }
+
+ if (editLog.hasCompleteJournalSegments(sinceTxid, currentDir)) {
+ needSync = false;
+ break;
+ }
+ }
+
+ if (needSync) {
+ throw new IOException("Journal sync failed.");
+ }
+
+ // Journal service may not be in SYNCING state
+ State jState = stateHandler.inSync();
+ if (jState != State.IN_SYNC) {
+ LOG.debug("Journal service state changed during syncing : " + jState);
+ } else {
+ LOG.debug("Journal sync is done.");
+ // TODO: report IN_SYNC state to NN. Note that, it's ok if state changes
+ // to another state because NN could reject the IN_SYNC report
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index ff38c95..5ebe5b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -19,6 +19,7 @@
import static org.apache.hadoop.hdfs.server.common.Util.now;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
@@ -64,6 +65,7 @@
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -443,6 +445,59 @@
}
/**
+ * Check if current node has complete set of journal segments from sinceTxid
+ * to (curSegmentTxId-1)
+ * @param sinceTxid the transaction to start with
+ * @param currentDir the storage directory to find journal segments
+ * @return true if journal segments are complete, otherwise false
+ * @throws IOException
+ */
+ public synchronized boolean hasCompleteJournalSegments(long sinceTxid,
+ File currentDir) throws IOException {
+ List<RemoteEditLog> segments = getFinalizedSegments(sinceTxid, currentDir);
+ long curSegTxid = getCurSegmentTxId();
+ return hasCompleteJournalSegments(segments, sinceTxid, curSegTxid);
+ }
+
+ public static boolean hasCompleteJournalSegments(
+ List<RemoteEditLog> segments, long sinceTxid, long curSegTxid)
+ throws IOException {
+ long minTxid = -1, maxTxid = -1;
+
+ if (sinceTxid > curSegTxid) {
+ throw new RuntimeException("illegal input: sinceTxid=" + sinceTxid
+ + " curSegTxid= " + curSegTxid);
+ }
+ if (segments.size() == 0) {
+ return false;
+ }
+
+ for (RemoteEditLog log : segments) {
+ if (log.getStartTxId() > curSegTxid || log.getEndTxId() > curSegTxid) {
+ throw new RuntimeException("Invalid log: [" + log.getStartTxId() + ","
+ + log.getEndTxId() + "] and curSegTxid=" + curSegTxid);
+ }
+
+ if (minTxid == -1 || minTxid == -1) {
+ minTxid = log.getStartTxId();
+ maxTxid = log.getEndTxId();
+ assert (minTxid > 0 && maxTxid > 0 && maxTxid > minTxid);
+ } else {
+ // check gap in the middle
+ if (maxTxid != log.getStartTxId() - 1) {
+ return false;
+ }
+ maxTxid = log.getEndTxId();
+ }
+ }
+ // check gap at ends
+ if (minTxid > sinceTxid || maxTxid < curSegTxid - 1) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Set the transaction ID to use for the next transaction written.
*/
synchronized void setNextTxId(long nextTxId) {
@@ -866,7 +921,15 @@
throws IOException {
return journalSet.getEditLogManifest(fromTxId);
}
-
+
+ /**
+ * Return a list of what finalized edit logs are available
+ */
+ public synchronized List<RemoteEditLog> getFinalizedSegments(long fromTxId,
+ File currentDir) throws IOException {
+ return journalSet.getFinalizedSegments(fromTxId, currentDir);
+ }
+
/**
* Finalizes the current edit log and opens a new log segment.
* @return the transaction id of the BEGIN_LOG_SEGMENT transaction
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index c76a16f..7a8a137 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -113,7 +113,7 @@
* @param editsDirs Directories the editlog can be stored in.
* @throws IOException if directories are invalid.
*/
- protected FSImage(Configuration conf,
+ public FSImage(Configuration conf,
Collection<URI> imageDirs,
List<URI> editsDirs)
throws IOException {
@@ -253,7 +253,7 @@
* @param dataDirStates output of storage directory states
* @return true if there is at least one valid formatted storage directory
*/
- private boolean recoverStorageDirs(StartupOption startOpt,
+ public boolean recoverStorageDirs(StartupOption startOpt,
Map<StorageDirectory, StorageState> dataDirStates) throws IOException {
boolean isFormatted = false;
for (Iterator<StorageDirectory> it =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
index 958cb14..d04c06b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
@@ -192,7 +192,7 @@
}
}
- private static void setFileNameHeaders(HttpServletResponse response,
+ public static void setFileNameHeaders(HttpServletResponse response,
File file) {
response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
file.getName());
@@ -204,7 +204,7 @@
* @param conf configuration
* @return a data transfer throttler
*/
- private final DataTransferThrottler getThrottler(Configuration conf) {
+ public static DataTransferThrottler getThrottler(Configuration conf) {
long transferBandwidth =
conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
@@ -262,7 +262,7 @@
* Set headers for content length, and, if available, md5.
* @throws IOException
*/
- private void setVerificationHeaders(HttpServletResponse response, File file)
+ public static void setVerificationHeaders(HttpServletResponse response, File file)
throws IOException {
response.setHeader(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
@@ -302,7 +302,7 @@
}
- static class GetImageParams {
+ public static class GetImageParams {
private boolean isGetImage;
private boolean isGetEdit;
private boolean isPutImage;
@@ -382,7 +382,7 @@
return endTxId;
}
- boolean isGetEdit() {
+ public boolean isGetEdit() {
return isGetEdit;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
index 3d2e23bf..d0ef373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -616,6 +617,39 @@
}
/**
+ * Returns a list of finalized edit logs that are available in given
+ * storageDir {@code currentDir}. All available edit logs are returned
+ * starting from the transaction id {@code fromTxId}. Note that the list may
+ * have gaps in these finalized edit logs.
+ *
+ * @param fromTxId Starting transaction id to read the logs.
+ * @param currentDir The storage directory to find the logs.
+ * @return a list of finalized segments.
+ */
+ public synchronized List<RemoteEditLog> getFinalizedSegments(long fromTxId,
+ File currentDir) {
+ List<RemoteEditLog> allLogs = null;
+ for (JournalAndStream j : journals) {
+ if (j.getManager() instanceof FileJournalManager) {
+ FileJournalManager fjm = (FileJournalManager) j.getManager();
+ if (fjm.getStorageDirectory().getRoot().equals(currentDir)) {
+ try {
+ allLogs = fjm.getRemoteEditLogs(fromTxId);
+ break;
+ } catch (IOException t) {
+ LOG.warn("Cannot list edit logs in " + fjm, t);
+ }
+ }
+ }
+ }
+ // sort collected segments
+ if (allLogs != null && allLogs.size() > 0) {
+ Collections.sort(allLogs);
+ }
+ return allLogs;
+ }
+
+ /**
* Add sync times to the buffer.
*/
String getSyncTimes() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
index 01c951a..ccd942c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
@@ -95,7 +95,7 @@
* or of type EDITS which stores edits or of type IMAGE_AND_EDITS which
* stores both fsimage and edits.
*/
- static enum NameNodeDirType implements StorageDirType {
+ public static enum NameNodeDirType implements StorageDirType {
UNDEFINED,
IMAGE,
EDITS,
@@ -736,7 +736,7 @@
/**
* Return the first readable finalized edits file for the given txid.
*/
- File findFinalizedEditsFile(long startTxId, long endTxId)
+ public File findFinalizedEditsFile(long startTxId, long endTxId)
throws IOException {
File ret = findFile(NameNodeDirType.EDITS,
getFinalizedEditsFileName(startTxId, endTxId));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 97088c5..d64d283 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -78,7 +78,7 @@
return hash;
}
- static void downloadEditsToStorage(String fsName, RemoteEditLog log,
+ public static void downloadEditsToStorage(String fsName, RemoteEditLog log,
NNStorage dstStorage) throws IOException {
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
"bad log: " + log;
@@ -145,7 +145,7 @@
* A server-side method to respond to a getfile http request
* Copies the contents of the local file into the output stream.
*/
- static void getFileServer(OutputStream outstream, File localfile,
+ public static void getFileServer(OutputStream outstream, File localfile,
DataTransferThrottler throttler)
throws IOException {
byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalServiceProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalServiceProtocols.java
new file mode 100644
index 0000000..72baba9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalServiceProtocols.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/** The full set of RPC methods implemented by the Journal Service. */
+@InterfaceAudience.Private
+public interface JournalServiceProtocols
+ extends JournalProtocol, JournalSyncProtocol {
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalSyncProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalSyncProtocol.java
new file mode 100644
index 0000000..da29d55
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalSyncProtocol.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to sync journal edits.
+ *
+ */
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_JOURNAL_USER_NAME_KEY)
+@InterfaceAudience.Private
+public interface JournalSyncProtocol {
+ /**
+ *
+ * This class is used by both the Namenode and Journal Service to insulate
+ * from the protocol serialization.
+ *
+ * If you are adding/changing journal sync protocol then you need to change both this
+ * class and ALSO related protocol buffer wire protocol definition in
+ * JournalSyncProtocol.proto.
+ *
+ * For more details on protocol buffer wire protocol, please see
+ * .../org/apache/hadoop/hdfs/protocolPB/overview.html
+ */
+
+ /**
+ * Return a structure containing details about all edit logs available to be
+ * fetched from the JournalNode.
+ *
+ * @param journalInfo journal information
+ * @param sinceTxId return only logs containing transactions >= sinceTxI
+ * @throws IOException
+ */
+ public RemoteEditLogManifest getEditLogManifest(JournalInfo info, long sinceTxId)
+ throws IOException;
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalSyncProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalSyncProtocol.proto
new file mode 100644
index 0000000..c5e234a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalSyncProtocol.proto
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used throughout HDFS -- i.e.
+// by the client, server, and data transfer protocols.
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "JournalSyncProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "hdfs.proto"; // for RemoteEditLogManifestProto
+import "JournalProtocol.proto"; // for JournalInfoProto
+
+/**
+ * sinceTxId - return the editlog information for transactions >= sinceTxId
+ */
+message GetEditLogManifestRequestProto {
+ required JournalInfoProto journalInfo = 1; // Info about the journal
+ required uint64 sinceTxId = 2; // Transaction ID
+}
+
+/**
+ * manifest - Enumeration of editlogs from journal service
+ * for logs >= sinceTxId in the request
+ */
+message GetEditLogManifestResponseProto {
+ required RemoteEditLogManifestProto manifest = 1;
+}
+
+/**
+ * Protocol used to sync journal edits to a remote node.
+ *
+ * See the request and response for details of rpc call.
+ */
+service JournalSyncProtocolService {
+ /**
+ * Get editlog manifests from the active journalnode for all the editlogs
+ */
+ rpc getEditLogManifest(GetEditLogManifestRequestProto)
+ returns(GetEditLogManifestResponseProto);
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
new file mode 100644
index 0000000..bc9ea42
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
@@ -0,0 +1,29 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=journalstatus.jsp"/>
+<html>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head><title>Hadoop Administration</title></head>
+
+<body>
+<h1>Hadoop Administration</h1>
+
+<ul>
+ <li><a href="journalstatus.jsp">Status</a></li>
+</ul>
+
+</body>
+</html>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
new file mode 100644
index 0000000..2c6e5a3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
@@ -0,0 +1,42 @@
+<%
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page
+ contentType="text/html; charset=UTF-8"
+ import="org.apache.hadoop.hdfs.server.common.JspHelper"
+ import="org.apache.hadoop.util.ServletUtil"
+%>
+<%!
+ //for java.io.Serializable
+ private static final long serialVersionUID = 1L;
+%>
+
+<!DOCTYPE html>
+<html>
+<link rel="stylesheet" type="text/css" href="/static/hadoop.css">
+<title>Hadoop JournalNode</title>
+
+<body>
+<h1>JournalNode</h1>
+<%= JspHelper.getVersionTable() %>
+<hr />
+
+<br />
+<b><a href="/logs/">Logs</a></b>
+<%= ServletUtil.htmlFooter() %>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
new file mode 100644
index 0000000..b27f82b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee">
+@journal.servlet.definitions@
+</web-app>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index d57f792..c116ed1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -715,6 +715,15 @@
minNN + "-through-" + maxNN));
}
+ public File getJournalEditsDir() throws IOException {
+ return formatJournalEditsDir(base_dir);
+ }
+
+ public static File formatJournalEditsDir(File baseDir)
+ throws IOException {
+ return (new File(baseDir, "journal-edits"));
+ }
+
public NameNodeInfo[] getNameNodeInfos() {
return this.nameNodes;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index 6a0f4e7..acd3dda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -608,4 +608,45 @@
assertEquals(1, uris.size());
assertTrue(uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
}
+
+ @Test
+ public void testJournalNodeConf() throws Exception {
+ HdfsConfiguration conf = new HdfsConfiguration(false);
+ String str1 = "localhost:50200";
+ String str2 = "localhost:50210";
+ InetSocketAddress isa1 = NetUtils.createSocketAddr(str1);
+ InetSocketAddress isa2 = NetUtils.createSocketAddr(str2);
+
+ // If nothing is added, zero length collection is returned.
+ Collection<InetSocketAddress> ret = DFSUtil.getJournalNodeAddresses(conf);
+ assertEquals(ret.size(), 0);
+
+ // Setup single journal node
+ conf.set(DFS_JOURNAL_ADDRESS_KEY, str1);
+ ret = DFSUtil.getJournalNodeAddresses(conf);
+ assertEquals(ret.size(), 1);
+ for (InetSocketAddress addr : ret) {
+ assertTrue(addr.equals(isa1));
+ }
+
+ // Add two entries now.
+ conf.set(DFS_JOURNAL_ADDRESS_KEY, "localhost:50200, localhost:50210");
+ ret = DFSUtil.getJournalNodeAddresses(conf);
+ assertEquals(ret.size(), 2);
+
+ for (InetSocketAddress addr: ret) {
+ assertTrue(addr.equals(isa1) || addr.equals(isa2));
+ }
+
+ // Test Http addresses
+ InetSocketAddress isa3 = NetUtils.createSocketAddr("localhost:50100");
+ InetSocketAddress isa4 = NetUtils.createSocketAddr("localhost:50110");
+ conf.set(DFS_JOURNAL_HTTP_ADDRESS_KEY, "localhost:50100, localhost:50110");
+ ret = DFSUtil.getJournalNodeHttpAddresses(conf);
+ assertEquals(ret.size(), 2);
+
+ for (InetSocketAddress addr: ret) {
+ assertTrue(addr.equals(isa3) || addr.equals(isa4));
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
new file mode 100644
index 0000000..f8cd370
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJournal {
+ static final Log LOG = LogFactory.getLog(TestJournal.class);
+
+ static Configuration newConf(String name) {
+ Configuration conf = new HdfsConfiguration();
+ File dir = new File(MiniDFSCluster.getBaseDirectory(), name + "-edits");
+ if (dir.exists()) {
+ Assert.assertTrue(FileUtil.fullyDelete(dir));
+ }
+ Assert.assertTrue(dir.mkdirs());
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, dir.toURI().toString());
+ return conf;
+ }
+
+ @Test
+ public void testFormat() throws Exception {
+ final Configuration conf = newConf("testFormat");
+ final Journal j = new Journal(conf);
+ LOG.info("Initial : " + j.getStorage());
+ Assert.assertFalse(j.isFormatted());
+
+ //format
+ final int namespaceId = 123;
+ final String clusterId = "my-cluster-id";
+ j.format(namespaceId, clusterId);
+ Assert.assertTrue(j.isFormatted());
+
+ final StorageInfo info = j.getStorage();
+ LOG.info("Formatted: " + info);
+
+ Assert.assertEquals(namespaceId, info.getNamespaceID());
+ Assert.assertEquals(clusterId, info.getClusterID());
+ j.close();
+
+ //create another Journal object
+ final StorageInfo another = new Journal(conf).getStorage();
+ Assert.assertEquals(info.toString(), another.toString());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
new file mode 100644
index 0000000..8cb19b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.journalservice.GetJournalEditServlet;
+import org.apache.hadoop.hdfs.server.journalservice.JournalHttpServer;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestJournalHttpServer {
+ public static final Log LOG = LogFactory.getLog(TestJournalHttpServer.class);
+
+ static {
+ ((Log4JLogger) JournalHttpServer.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private Configuration conf;
+ private File hdfsDir = null;
+ private File path1, path2;
+
+ @Before
+ public void setUp() throws Exception {
+ HdfsConfiguration.init();
+ conf = new HdfsConfiguration();
+
+ hdfsDir = new File(MiniDFSCluster.getBaseDirectory()).getCanonicalFile();
+ if (hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir)) {
+ throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
+ }
+
+ hdfsDir.mkdirs();
+
+ path1 = new File(hdfsDir, "j1dir");
+ path2 = new File(hdfsDir, "j2dir");
+ path1.mkdir();
+ path2.mkdir();
+ if (!path1.exists() || !path2.exists()) {
+ throw new IOException("Couldn't create path in "
+ + hdfsDir.getAbsolutePath());
+ }
+
+ System.out.println("configuring hdfsdir is " + hdfsDir.getAbsolutePath()
+ + "; j1Dir = " + path1.getPath() + "; j2Dir = " + path2.getPath());
+ }
+
+ /**
+ * Test Journal service Http Server by verifying the html page is accessible
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHttpServer() throws Exception {
+ MiniDFSCluster cluster = null;
+ JournalHttpServer jh1 = null;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
+ jh1 = new JournalHttpServer(conf, new Journal(conf),
+ NetUtils.createSocketAddr("localhost:50200"));
+ jh1.start();
+
+ String pageContents = DFSTestUtil.urlGet(new URL(
+ "http://localhost:50200/journalstatus.jsp"));
+ assertTrue(pageContents.contains("JournalNode"));
+
+ } catch (IOException e) {
+ LOG.error("Error in TestHttpServer:", e);
+ assertTrue(e.getLocalizedMessage(), false);
+ } finally {
+ if (jh1 != null)
+ jh1.stop();
+ if (cluster != null)
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test hasCompleteJournalSegments with different log list combinations
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHasCompleteJournalSegments() throws Exception {
+ List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
+ logs.add(new RemoteEditLog(3,6));
+ logs.add(new RemoteEditLog(7,10));
+ logs.add(new RemoteEditLog(11,12));
+
+ assertTrue(FSEditLog.hasCompleteJournalSegments(logs, 3, 13));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 1, 13));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 13, 19));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 11, 19));
+
+ logs.remove(1);
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 3, 13));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 1, 13));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 3, 19));
+ }
+
+ private void copyNNFiles(MiniDFSCluster cluster, File dstDir)
+ throws IOException {
+ Collection<URI> editURIs = cluster.getNameEditsDirs(0);
+ String firstURI = editURIs.iterator().next().getPath();
+ File nnDir = new File(firstURI + "/current");
+
+ File allFiles[] = FileUtil.listFiles(nnDir);
+ for (File f : allFiles) {
+ IOUtils.copyBytes(new FileInputStream(f),
+ new FileOutputStream(dstDir + "/" + f.getName()), 4096, true);
+ }
+ }
+
+ /**
+ * Test lagging Journal service copies edit segments from another Journal
+ * service:
+ * 1. start one journal service
+ * 2. reboot namenode so more segments are created
+ * 3. add another journal service and this new journal service should sync
+ * with the first journal service
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCopyEdits() throws Exception {
+ MiniDFSCluster cluster = null;
+ JournalService service1 = null, service2 = null;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+ // start journal service
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
+ InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
+ InetSocketAddress serverAddr = NetUtils
+ .createSocketAddr("localhost:50900");
+ Journal j1 = new Journal(conf);
+ JournalListener listener1 = new JournalDiskWriter(j1);
+ service1 = new JournalService(conf, nnAddr, serverAddr,
+ new InetSocketAddress(50901), listener1, j1);
+ service1.start();
+
+ // get namenode clusterID/layoutVersion/namespaceID
+ StorageInfo si = service1.getJournal().getStorage();
+ JournalInfo journalInfo = new JournalInfo(si.layoutVersion, si.clusterID,
+ si.namespaceID);
+
+ // restart namenode, so there will be one more journal segments
+ cluster.restartNameNode();
+
+ // TODO: remove file copy when NN can work with journal auto-machine
+ copyNNFiles(cluster, new File(path1 + "/current"));
+
+ // start another journal service that will do the sync
+ conf.set(DFSConfigKeys.DFS_JOURNAL_ADDRESS_KEY, "localhost:50900");
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
+ conf.set(DFSConfigKeys.DFS_JOURNAL_HTTP_ADDRESS_KEY,
+ "localhost:50902, localhost:50901");
+ Journal j2 = new Journal(conf);
+ JournalListener listener2 = new JournalDiskWriter(j2);
+ service2 = new JournalService(conf, nnAddr, new InetSocketAddress(50800),
+ NetUtils.createSocketAddr("localhost:50902"), listener2, j2);
+ service2.start();
+
+ // give service2 sometime to sync
+ Thread.sleep(5000);
+
+ // TODO: change to sinceTxid to 1 after NN is modified to use journal
+ // service to start
+ RemoteEditLogManifest manifest2 = service2.getEditLogManifest(
+ journalInfo, 3);
+ assertTrue(manifest2.getLogs().size() > 0);
+
+ } catch (IOException e) {
+ LOG.error("Error in TestCopyEdits:", e);
+ assertTrue(e.getLocalizedMessage(), false);
+ } finally {
+ if (cluster != null)
+ cluster.shutdown();
+ if (service1 != null)
+ service1.stop();
+ if (service2 != null)
+ service2.stop();
+ }
+ }
+
+ /**
+ * Test lagging Journal service fails to copy edit segments from another
+ * Journal service with injected error:
+ * 1. start one journal service
+ * 2. reboot namenode so more segments are created
+ * 3. add another journal service and this new journal service should sync
+ * with the first journal service. Use injected error to fail the sync.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCopyEditsFailure() throws Exception {
+ MiniDFSCluster cluster = null;
+ JournalService service1 = null, service2 = null;
+ GetJournalEditServlet.FaultInjector faultInjector;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+ // start journal service
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
+ InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
+ InetSocketAddress serverAddr = NetUtils
+ .createSocketAddr("localhost:50900");
+ Journal j1 = new Journal(conf);
+ JournalListener listener1 = new JournalDiskWriter(j1);
+ service1 = new JournalService(conf, nnAddr, serverAddr,
+ new InetSocketAddress(50901), listener1, j1);
+ service1.start();
+
+ // get namenode clusterID/layoutVersion/namespaceID
+ StorageInfo si = service1.getJournal().getStorage();
+ JournalInfo journalInfo = new JournalInfo(si.layoutVersion, si.clusterID,
+ si.namespaceID);
+
+ // restart namenode, so there will be one more journal segments
+ cluster.restartNameNode();
+
+ // TODO: remove file copy when NN can work with journal auto-machine
+ copyNNFiles(cluster, new File(path1 + "/current"));
+
+ faultInjector = Mockito.mock(GetJournalEditServlet.FaultInjector.class);
+ GetJournalEditServlet.FaultInjector.instance = faultInjector;
+ Mockito
+ .doThrow(new IOException("Injecting failure before sending edits"))
+ .when(faultInjector).beforeSendEdits();
+ // start another journal service that will do the sync
+ conf.set(DFSConfigKeys.DFS_JOURNAL_ADDRESS_KEY, "localhost:50900");
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
+ conf.set(DFSConfigKeys.DFS_JOURNAL_HTTP_ADDRESS_KEY,
+ "localhost:50902, localhost:50901");
+ Journal j2 = new Journal(conf);
+ JournalListener listener2 = new JournalDiskWriter(j2);
+ service2 = new JournalService(conf, nnAddr, new InetSocketAddress(50800),
+ NetUtils.createSocketAddr("localhost:50902"), listener2, j2);
+ service2.start();
+
+ // give service2 sometime to sync
+ Thread.sleep(5000);
+
+ // TODO: change to sinceTxid to 1 after NN is modified to use journal
+ // service to start
+ RemoteEditLogManifest manifest2 = service2.getEditLogManifest(
+ journalInfo, 3);
+ assertTrue(manifest2.getLogs().size() == 0);
+
+ } catch (IOException e) {
+ LOG.error("Error in TestCopyEdits:", e);
+ assertTrue(e.getLocalizedMessage(), false);
+ } finally {
+ if (cluster != null)
+ cluster.shutdown();
+ if (service1 != null)
+ service1.stop();
+ if (service2 != null)
+ service2.stop();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
index b2cb080..b0ad97a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
@@ -20,18 +20,20 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import junit.framework.Assert;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -39,9 +41,9 @@
* Tests for {@link JournalService}
*/
public class TestJournalService {
- private MiniDFSCluster cluster;
- private Configuration conf = new HdfsConfiguration();
-
+ static final Log LOG = LogFactory.getLog(TestJournalService.class);
+ static final InetSocketAddress RPC_ADDR = new InetSocketAddress(0);
+
/**
* Test calls backs {@link JournalListener#startLogSegment(JournalService, long)} and
* {@link JournalListener#journal(JournalService, long, int, byte[])} are
@@ -49,35 +51,85 @@
*/
@Test
public void testCallBacks() throws Exception {
+ Configuration conf = TestJournal.newConf("testCallBacks");
+ Journal journal = new Journal(conf);
JournalListener listener = Mockito.mock(JournalListener.class);
JournalService service = null;
+ MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(0);
- service = startJournalService(listener);
+ InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
+ service = newJournalService(nnAddr, listener, journal, conf);
+ service.start();
verifyRollLogsCallback(service, listener);
- verifyJournalCallback(service, listener);
- verifyFence(service, cluster.getNameNode(0));
+ verifyJournalCallback(cluster.getFileSystem(), service, listener);
} finally {
- if (service != null) {
- service.stop();
- }
if (cluster != null) {
cluster.shutdown();
}
+ if (service != null) {
+ service.stop();
+ }
}
}
- private JournalService startJournalService(JournalListener listener)
- throws IOException {
- InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
- InetSocketAddress serverAddr = new InetSocketAddress(0);
- JournalService service = new JournalService(conf, nnAddr, serverAddr,
- listener);
- service.start();
- return service;
+ /**
+ * Test journal service fence with a combination of epoch, nsid and clusterid
+ * @throws Exception
+ */
+ @Test
+ public void testFence() throws Exception {
+ final Configuration conf = TestJournal.newConf("testFence");
+ final JournalListener listener = Mockito.mock(JournalListener.class);
+ final InetSocketAddress nnAddress;
+
+ JournalService service = null;
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive(0);
+ NameNode nn = cluster.getNameNode(0);
+ nnAddress = nn.getNameNodeAddress();
+ Journal j1 = new Journal(conf);
+ service = newJournalService(nnAddress, listener, j1, conf);
+ service.start();
+ String cid = nn.getNamesystem().getClusterId();
+ int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
+ int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+
+ verifyFence(service, listener, cid, nsId, lv);
+ } finally {
+ cluster.shutdown();
+ }
+
+ //test restart journal service
+ StorageInfo before = service.getJournal().getStorage();
+ LOG.info("before: " + before);
+ service.stop();
+ Journal j2 = new Journal(conf);
+ service = newJournalService(nnAddress, listener, j2, conf);
+ StorageInfo after = service.getJournal().getStorage();
+ LOG.info("after : " + after);
+ Assert.assertEquals(before.toString(), after.toString());
}
-
+
+ /**
+ * Create additional journal service
+ * @param nnAddr namenode rpc address
+ * @param listener the listener serving journal requests from namenode
+ * @param journal local journal
+ * @param conf local confiuration
+ * @return journal service
+ * @throws Exception
+ */
+ private JournalService newJournalService(InetSocketAddress nnAddr,
+ JournalListener listener, Journal journal, Configuration conf)
+ throws Exception {
+ return new JournalService(conf, nnAddr, RPC_ADDR, new InetSocketAddress(0),
+ listener, journal);
+ }
+
/**
* Starting {@link JournalService} should result in Namenode calling
* {@link JournalService#startLogSegment}, resulting in callback
@@ -92,36 +144,74 @@
* File system write operations should result in JournalListener call
* backs.
*/
- private void verifyJournalCallback(JournalService s, JournalListener l) throws IOException {
+ private void verifyJournalCallback(FileSystem fs, JournalService s,
+ JournalListener l) throws IOException {
Path fileName = new Path("/tmp/verifyJournalCallback");
- FileSystem fs = cluster.getFileSystem();
FileSystemTestHelper.createFile(fs, fileName);
fs.delete(fileName, true);
Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s),
Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
}
- public void verifyFence(JournalService s, NameNode nn) throws Exception {
- String cid = nn.getNamesystem().getClusterId();
- int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
- int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+ /**
+ * Verify the fence with different epoch, clusterid and nsid combinations
+ *
+ * @param s The Journal Service to write to
+ * @param listener the listener to serve journal request
+ * @param cid cluster id
+ * @param nsId namespace id
+ * @param lv layoutVersion
+ * @throws Exception
+ */
+ void verifyFence(JournalService s, JournalListener listener,
+ String cid, int nsId, int lv) throws Exception {
// Fence the journal service
JournalInfo info = new JournalInfo(lv, cid, nsId);
long currentEpoch = s.getEpoch();
-
+
// New epoch lower than the current epoch is rejected
try {
s.fence(info, (currentEpoch - 1), "fencer");
- } catch (FencedException ignore) { /* Ignored */ }
+ Assert.fail();
+ } catch (FencedException ignore) { /* Ignored */ }
// New epoch equal to the current epoch is rejected
try {
s.fence(info, currentEpoch, "fencer");
- } catch (FencedException ignore) { /* Ignored */ }
+ Assert.fail();
+ } catch (FencedException ignore) { /* Ignored */ }
// New epoch higher than the current epoch is successful
FenceResponse resp = s.fence(info, currentEpoch+1, "fencer");
Assert.assertNotNull(resp);
+
+ JournalInfo badInfo = new JournalInfo(lv, "fake", nsId);
+ currentEpoch = s.getEpoch();
+
+ // Send in the wrong cluster id. fence should fail
+ try {
+ s.fence(badInfo, currentEpoch+1, "fencer");
+ Assert.fail();
+
+ } catch (UnregisteredNodeException ignore) {
+ LOG.info(ignore.getMessage());
+ }
+
+ badInfo = new JournalInfo(lv, cid, nsId+1);
+ currentEpoch = s.getEpoch();
+
+ // Send in the wrong nsid. fence should fail
+ try {
+ s.fence(badInfo, currentEpoch+1, "fencer");
+ Assert.fail();
+ } catch (UnregisteredNodeException ignore) {
+ LOG.info(ignore.getMessage());
+ }
+
+ // New epoch higher than the current epoch is successful
+ resp = s.fence(info, currentEpoch+1, "fencer");
+ Assert.assertNotNull(resp);
+
}
}
\ No newline at end of file