blob: 484be2e0ec41d80670f4e78eb7e31b4f144433d2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
/**
* BackupNode.
* <p>
* Backup node can play two roles.
* <ol>
* <li>{@link NamenodeRole#CHECKPOINT} node periodically creates checkpoints,
* that is downloads image and edits from the active node, merges them, and
* uploads the new image back to the active.</li>
* <li>{@link NamenodeRole#BACKUP} node keeps its namespace in sync with the
* active node, and periodically creates checkpoints by simply saving the
* namespace image to local disk(s).</li>
* </ol>
*/
@InterfaceAudience.Private
public class BackupNode extends NameNode {
private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
private static final String BN_HTTP_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
private static final String BN_SERVICE_RPC_ADDRESS_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
/** Name-node proxy */
NamenodeProtocol namenode;
/** Name-node RPC address */
String nnRpcAddress;
/** Name-node HTTP address */
String nnHttpAddress;
/** Checkpoint manager */
Checkpointer checkpointManager;
/** ClusterID to which BackupNode belongs to */
String clusterId;
/** Block pool Id of the peer namenode of this BackupNode */
String blockPoolId;
BackupNode(Configuration conf, NamenodeRole role) throws IOException {
super(conf, role);
}
/////////////////////////////////////////////////////
// Common NameNode methods implementation for backup node.
/////////////////////////////////////////////////////
@Override // NameNode
protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
String addr = conf.get(BN_ADDRESS_NAME_KEY, BN_ADDRESS_DEFAULT);
return NetUtils.createSocketAddr(addr);
}
@Override
protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) throws IOException {
String addr = conf.get(BN_SERVICE_RPC_ADDRESS_KEY);
if (addr == null || addr.isEmpty()) {
return null;
}
return NetUtils.createSocketAddr(addr);
}
@Override // NameNode
protected void setRpcServerAddress(Configuration conf) {
conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(rpcAddress));
}
@Override // Namenode
protected void setRpcServiceServerAddress(Configuration conf) {
conf.set(BN_SERVICE_RPC_ADDRESS_KEY, getHostPortString(serviceRPCAddress));
}
@Override // NameNode
protected InetSocketAddress getHttpServerAddress(Configuration conf) {
assert rpcAddress != null : "rpcAddress should be calculated first";
String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT);
return NetUtils.createSocketAddr(addr);
}
@Override // NameNode
protected void setHttpServerAddress(Configuration conf){
conf.set(BN_HTTP_ADDRESS_NAME_KEY, getHostPortString(httpAddress));
}
@Override // NameNode
protected void loadNamesystem(Configuration conf) throws IOException {
BackupImage bnImage = new BackupImage();
this.namesystem = new FSNamesystem(conf, bnImage);
bnImage.recoverCreateRead(FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
}
@Override // NameNode
protected void initialize(Configuration conf) throws IOException {
// Trash is disabled in BackupNameNode,
// but should be turned back on if it ever becomes active.
conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY,
CommonConfigurationKeys.FS_TRASH_INTERVAL_DEFAULT);
NamespaceInfo nsInfo = handshake(conf);
super.initialize(conf);
// Backup node should never do lease recovery,
// therefore lease hard limit should never expire.
namesystem.leaseManager.setLeasePeriod(
FSConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE);
clusterId = nsInfo.getClusterID();
blockPoolId = nsInfo.getBlockPoolID();
// register with the active name-node
registerWith(nsInfo);
// Checkpoint daemon should start after the rpc server started
runCheckpointDaemon(conf);
}
@Override // NameNode
public void stop() {
if(checkpointManager != null) {
// Prevent from starting a new checkpoint.
// Checkpoints that has already been started may proceed until
// the error reporting to the name-node is complete.
// Checkpoint manager should not be interrupted yet because it will
// close storage file channels and the checkpoint may fail with
// ClosedByInterruptException.
checkpointManager.shouldRun = false;
}
if(namenode != null && getRegistration() != null) {
// Exclude this node from the list of backup streams on the name-node
try {
namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL,
"Shutting down.");
} catch(IOException e) {
LOG.error("Failed to report to name-node.", e);
}
}
// Stop the RPC client
RPC.stopProxy(namenode);
namenode = null;
// Stop the checkpoint manager
if(checkpointManager != null) {
checkpointManager.interrupt();
checkpointManager = null;
}
// Stop name-node threads
super.stop();
}
/////////////////////////////////////////////////////
// NamenodeProtocol implementation for backup node.
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
throw new UnsupportedActionException("getBlocks");
}
// Only active name-node can register other nodes.
@Override // NamenodeProtocol
public NamenodeRegistration register(NamenodeRegistration registration
) throws IOException {
throw new UnsupportedActionException("register");
}
@Override // NamenodeProtocol
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
throw new UnsupportedActionException("startCheckpoint");
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
throw new UnsupportedActionException("endCheckpoint");
}
@Override // NamenodeProtocol
public void journal(NamenodeRegistration nnReg,
int jAction,
int length,
byte[] args) throws IOException {
verifyRequest(nnReg);
if(!nnRpcAddress.equals(nnReg.getAddress()))
throw new IOException("Journal request from unexpected name-node: "
+ nnReg.getAddress() + " expecting " + nnRpcAddress);
BackupImage bnImage = (BackupImage)getFSImage();
switch(jAction) {
case (int)JA_IS_ALIVE:
return;
case (int)JA_JOURNAL:
bnImage.journal(length, args);
return;
case (int)JA_JSPOOL_START:
bnImage.startJournalSpool(nnReg);
return;
case (int)JA_CHECKPOINT_TIME:
bnImage.setCheckpointTime(length, args);
setRegistration(); // keep registration up to date
return;
default:
throw new IOException("Unexpected journal action: " + jAction);
}
}
boolean shouldCheckpointAtStartup() {
FSImage fsImage = getFSImage();
if(isRole(NamenodeRole.CHECKPOINT)) {
assert fsImage.getStorage().getNumStorageDirs() > 0;
return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists();
}
if(namesystem == null || namesystem.dir == null || getFSImage() == null)
return true;
return fsImage.getEditLog().getNumEditStreams() == 0;
}
private NamespaceInfo handshake(Configuration conf) throws IOException {
// connect to name node
InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true);
this.namenode =
(NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
NamenodeProtocol.versionID, nnAddress, conf);
this.nnRpcAddress = getHostPortString(nnAddress);
this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
// get version and id info from the name-node
NamespaceInfo nsInfo = null;
while(!isStopRequested()) {
try {
nsInfo = handshake(namenode);
break;
} catch(SocketTimeoutException e) { // name-node is busy
LOG.info("Problem connecting to server: " + nnAddress);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {}
}
}
return nsInfo;
}
/**
* Start a backup node daemon.
*/
private void runCheckpointDaemon(Configuration conf) throws IOException {
checkpointManager = new Checkpointer(conf, this);
checkpointManager.start();
}
/**
* Checkpoint.<br>
* Tests may use it to initiate a checkpoint process.
* @throws IOException
*/
void doCheckpoint() throws IOException {
checkpointManager.doCheckpoint();
}
CheckpointStates getCheckpointState() {
return getFSImage().getCheckpointState();
}
void setCheckpointState(CheckpointStates cs) {
getFSImage().setCheckpointState(cs);
}
/**
* Register this backup node with the active name-node.
* @param nsInfo
* @throws IOException
*/
private void registerWith(NamespaceInfo nsInfo) throws IOException {
BackupImage bnImage = (BackupImage)getFSImage();
// verify namespaceID
if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage
bnImage.getStorage().setStorageInfo(nsInfo);
else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID())
throw new IOException("Incompatible namespaceIDs"
+ ": active node namespaceID = " + nsInfo.getNamespaceID()
+ "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID());
setRegistration();
NamenodeRegistration nnReg = null;
while(!isStopRequested()) {
try {
nnReg = namenode.register(getRegistration());
break;
} catch(SocketTimeoutException e) { // name-node is busy
LOG.info("Problem connecting to name-node: " + nnRpcAddress);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {}
}
}
String msg = null;
if(nnReg == null) // consider as a rejection
msg = "Registration rejected by " + nnRpcAddress;
else if(!nnReg.isRole(NamenodeRole.ACTIVE)) {
msg = "Name-node " + nnRpcAddress + " is not active";
}
if(msg != null) {
msg += ". Shutting down.";
LOG.error(msg);
throw new IOException(msg); // stop the node
}
nnRpcAddress = nnReg.getAddress();
}
/**
* Reset node namespace state in memory and in storage directories.
* @throws IOException
*/
void resetNamespace() throws IOException {
((BackupImage)getFSImage()).reset();
}
/**
* Get size of the local journal (edit log).
* @return size of the current journal
* @throws IOException
*/
long journalSize() throws IOException {
return namesystem.getEditLogSize();
}
// TODO: move to a common with DataNode util class
private static NamespaceInfo handshake(NamenodeProtocol namenode)
throws IOException, SocketTimeoutException {
NamespaceInfo nsInfo;
nsInfo = namenode.versionRequest(); // throws SocketTimeoutException
String errorMsg = null;
// verify build version
if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion())) {
errorMsg = "Incompatible build versions: active name-node BV = "
+ nsInfo.getBuildVersion() + "; backup node BV = "
+ Storage.getBuildVersion();
LOG.fatal(errorMsg);
throw new IOException(errorMsg);
}
assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
"Active and backup node layout versions must be the same. Expected: "
+ FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
return nsInfo;
}
String getBlockPoolId() {
return blockPoolId;
}
String getClusterId() {
return clusterId;
}
}