blob: 618a3725963a607626db86c52b53ab4898b33da4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Checkpointer is responsible for supporting periodic checkpoints
* of the HDFS metadata.
*
* The Checkpointer is a daemon that periodically wakes up
* up (determined by the schedule specified in the configuration),
* triggers a periodic checkpoint and then goes back to sleep.
*
* The start of a checkpoint is triggered by one of the two factors:
* (1) time or (2) the size of the edits file.
*/
class Checkpointer extends Daemon {
public static final Logger LOG =
LoggerFactory.getLogger(Checkpointer.class.getName());
private final BackupNode backupNode;
volatile boolean shouldRun;
private String infoBindAddress;
private CheckpointConf checkpointConf;
private final Configuration conf;
private BackupImage getFSImage() {
return (BackupImage)backupNode.getFSImage();
}
private NamenodeProtocol getRemoteNamenodeProxy(){
return backupNode.namenode;
}
/**
* Create a connection to the primary namenode.
*/
Checkpointer(Configuration conf, BackupNode bnNode) throws IOException {
this.conf = conf;
this.backupNode = bnNode;
try {
initialize(conf);
} catch(IOException e) {
LOG.warn("Checkpointer got exception", e);
shutdown();
throw e;
}
}
/**
* Initialize checkpoint.
*/
private void initialize(Configuration conf) throws IOException {
// Create connection to the namenode.
shouldRun = true;
// Initialize other scheduling parameters from the configuration
checkpointConf = new CheckpointConf(conf);
// Pull out exact http address for posting url to avoid ip aliasing issues
String fullInfoAddr = conf.get(DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT);
infoBindAddress = fullInfoAddr.substring(0, fullInfoAddr.indexOf(":"));
LOG.info("Checkpoint Period : " +
checkpointConf.getPeriod() + " secs " +
"(" + checkpointConf.getPeriod()/60 + " min)");
LOG.info("Transactions count is : " +
checkpointConf.getTxnCount() +
", to trigger checkpoint");
}
/**
* Shut down the checkpointer.
*/
void shutdown() {
shouldRun = false;
backupNode.stop();
}
//
// The main work loop
//
@Override
public void run() {
// How often to check the size of the edit log (min of checkpointCheckPeriod and checkpointPeriod)
long periodMSec = checkpointConf.getCheckPeriod() * 1000;
// How often to checkpoint regardless of number of txns
long checkpointPeriodMSec = checkpointConf.getPeriod() * 1000;
long lastCheckpointTime = 0;
long lastEditLogCheckTime =0;
if (!backupNode.shouldCheckpointAtStartup()) {
lastCheckpointTime = monotonicNow();
}
while(shouldRun) {
try {
long now = monotonicNow();
boolean shouldCheckpoint = false;
if(now >= lastCheckpointTime + checkpointPeriodMSec) {
shouldCheckpoint = true;
} else if(now >= lastEditLogCheckTime + periodMSec) {
long txns = countUncheckpointedTxns();
lastEditLogCheckTime = now;
if(txns >= checkpointConf.getTxnCount())
shouldCheckpoint = true;
}
if(shouldCheckpoint) {
doCheckpoint();
lastCheckpointTime = now;
lastEditLogCheckTime = now;
}
} catch(IOException e) {
LOG.error("Exception in doCheckpoint: ", e);
} catch(Throwable e) {
LOG.error("Throwable Exception in doCheckpoint: ", e);
shutdown();
break;
}
try {
Thread.sleep(LongMath.gcd(periodMSec, checkpointPeriodMSec));
} catch(InterruptedException ie) {
// do nothing
}
}
}
private long countUncheckpointedTxns() throws IOException {
long curTxId = getRemoteNamenodeProxy().getTransactionID();
long uncheckpointedTxns = curTxId -
getFSImage().getStorage().getMostRecentCheckpointTxId();
assert uncheckpointedTxns >= 0;
return uncheckpointedTxns;
}
/**
* Create a new checkpoint
*/
void doCheckpoint() throws IOException {
BackupImage bnImage = getFSImage();
NNStorage bnStorage = bnImage.getStorage();
long startTime = monotonicNow();
bnImage.freezeNamespaceAtNextRoll();
NamenodeCommand cmd =
getRemoteNamenodeProxy().startCheckpoint(backupNode.getRegistration());
CheckpointCommand cpCmd = null;
switch(cmd.getAction()) {
case NamenodeProtocol.ACT_SHUTDOWN:
shutdown();
throw new IOException("Name-node " + backupNode.nnRpcAddress
+ " requested shutdown.");
case NamenodeProtocol.ACT_CHECKPOINT:
cpCmd = (CheckpointCommand)cmd;
break;
default:
throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
}
bnImage.waitUntilNamespaceFrozen();
CheckpointSignature sig = cpCmd.getSignature();
// Make sure we're talking to the same NN!
sig.validateStorageInfo(bnImage);
long lastApplied = bnImage.getLastAppliedTxId();
LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
RemoteEditLogManifest manifest =
getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);
boolean needReloadImage = false;
if (!manifest.getLogs().isEmpty()) {
RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
// we don't have enough logs to roll forward using only logs. Need
// to download and load the image.
if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
LOG.info("Unable to roll forward using only logs. Downloading " +
"image with txid " + sig.mostRecentCheckpointTxId);
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage,
true, false);
bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId, downloadedHash);
lastApplied = sig.mostRecentCheckpointTxId;
needReloadImage = true;
}
if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
throw new IOException("No logs to roll forward from " + lastApplied);
}
// get edits files
for (RemoteEditLog log : manifest.getLogs()) {
TransferFsImage.downloadEditsToStorage(
backupNode.nnHttpAddress, log, bnStorage);
}
if(needReloadImage) {
LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
File file = bnStorage.findImageFile(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId);
bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
}
rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
}
long txid = bnImage.getLastAppliedTxId();
backupNode.namesystem.writeLock();
try {
backupNode.namesystem.setImageLoaded();
if(backupNode.namesystem.getBlocksTotal() > 0) {
long completeBlocksTotal =
backupNode.namesystem.getCompleteBlocksTotal();
backupNode.namesystem.getBlockManager().setBlockTotal(
completeBlocksTotal);
}
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
if (!backupNode.namenode.isRollingUpgrade()) {
bnImage.updateStorageVersion();
}
} finally {
backupNode.namesystem.writeUnlock("doCheckpoint");
}
if(cpCmd.needToReturnImage()) {
TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
bnStorage, NameNodeFile.IMAGE, txid);
}
getRemoteNamenodeProxy().endCheckpoint(backupNode.getRegistration(), sig);
if (backupNode.getRole() == NamenodeRole.BACKUP) {
bnImage.convergeJournalSpool();
}
backupNode.setRegistration(); // keep registration up to date
long imageSize = bnImage.getStorage().getFsImageName(txid).length();
LOG.info("Checkpoint completed in "
+ (monotonicNow() - startTime)/1000 + " seconds."
+ " New Image Size: " + imageSize);
}
private URL getImageListenAddress() {
InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
int httpPort = httpSocAddr.getPort();
try {
return new URL(DFSUtil.getHttpClientScheme(conf) + "://" + infoBindAddress + ":" + httpPort);
} catch (MalformedURLException e) {
// Unreachable
throw new RuntimeException(e);
}
}
static void rollForwardByApplyingLogs(
RemoteEditLogManifest manifest,
FSImage dstImage,
FSNamesystem dstNamesystem) throws IOException {
NNStorage dstStorage = dstImage.getStorage();
List<EditLogInputStream> editsStreams = Lists.newArrayList();
for (RemoteEditLog log : manifest.getLogs()) {
if (log.getEndTxId() > dstImage.getLastAppliedTxId()) {
File f = dstStorage.findFinalizedEditsFile(
log.getStartTxId(), log.getEndTxId());
editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(),
log.getEndTxId(), true));
}
}
LOG.info("Checkpointer about to load edits from " +
editsStreams.size() + " stream(s).");
dstImage.loadEdits(editsStreams, dstNamesystem);
}
}