blob: 40ae5746a9bdbb2da8784cf92d0f1df4526c7251 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.LongWritable;
/**
* Extension of FSImage for the backup node.
* This class handles the setup of the journaling
* spool on the backup namenode.
*/
@InterfaceAudience.Private
public class BackupImage extends FSImage {
// Names of the journal spool directory and the spool file
private static final String STORAGE_JSPOOL_DIR = "jspool";
private static final String STORAGE_JSPOOL_FILE =
NNStorage.NameNodeFile.EDITS_NEW.getName();
/** Backup input stream for loading edits into memory */
private EditLogBackupInputStream backupInputStream;
/** Is journal spooling in progress */
volatile JSpoolState jsState;
static enum JSpoolState {
OFF,
INPROGRESS,
WAIT;
}
/**
*/
BackupImage() {
super();
storage.setDisablePreUpgradableLayoutCheck(true);
jsState = JSpoolState.OFF;
}
/**
* Analyze backup storage directories for consistency.<br>
* Recover from incomplete checkpoints if required.<br>
* Read VERSION and fstime files if exist.<br>
* Do not load image or edits.
*
* @param imageDirs list of image directories as URI.
* @param editsDirs list of edits directories URI.
* @throws IOException if the node should shutdown.
*/
void recoverCreateRead(Collection<URI> imageDirs,
Collection<URI> editsDirs) throws IOException {
storage.setStorageDirectories(imageDirs, editsDirs);
storage.setCheckpointTime(0L);
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState;
try {
curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
// sd is locked but not opened
switch(curState) {
case NON_EXISTENT:
// fail if any of the configured storage dirs are inaccessible
throw new InconsistentFSStateException(sd.getRoot(),
"checkpoint directory does not exist or is not accessible.");
case NOT_FORMATTED:
// for backup node all directories may be unformatted initially
LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
LOG.info("Formatting ...");
sd.clearDirectory(); // create empty current
break;
case NORMAL:
break;
default: // recovery is possible
sd.doRecover(curState);
}
if(curState != StorageState.NOT_FORMATTED) {
sd.read(); // read and verify consistency with other directories
}
} catch(IOException ioe) {
sd.unlock();
throw ioe;
}
}
}
/**
* Reset storage directories.
* <p>
* Unlock the storage.
* Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
* and recreate empty <code>current</code>.
* @throws IOException
*/
synchronized void reset() throws IOException {
// reset NameSpace tree
FSDirectory fsDir = getFSNamesystem().dir;
fsDir.reset();
// unlock, close and rename storage directories
storage.unlockAll();
// recover from unsuccessful checkpoint if necessary
recoverCreateRead(storage.getImageDirectories(),
storage.getEditsDirectories());
// rename and recreate
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
// rename current to lastcheckpoint.tmp
storage.moveCurrent(sd);
}
}
/**
* Load checkpoint from local files only if the memory state is empty.<br>
* Set new checkpoint time received from the name-node.<br>
* Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
* @throws IOException
*/
void loadCheckpoint(CheckpointSignature sig) throws IOException {
// load current image and journal if it is not in memory already
if(!editLog.isOpen())
editLog.open();
FSDirectory fsDir = getFSNamesystem().dir;
if(fsDir.isEmpty()) {
Iterator<StorageDirectory> itImage
= storage.dirIterator(NameNodeDirType.IMAGE);
Iterator<StorageDirectory> itEdits
= storage.dirIterator(NameNodeDirType.EDITS);
if(!itImage.hasNext() || ! itEdits.hasNext())
throw new IOException("Could not locate checkpoint directories");
StorageDirectory sdName = itImage.next();
StorageDirectory sdEdits = itEdits.next();
getFSDirectoryRootLock().writeLock();
try { // load image under rootDir lock
loadFSImage(NNStorage.getStorageFile(sdName, NameNodeFile.IMAGE));
} finally {
getFSDirectoryRootLock().writeUnlock();
}
loadFSEdits(sdEdits);
}
// set storage fields
storage.setStorageInfo(sig);
storage.setImageDigest(sig.imageDigest);
storage.setCheckpointTime(sig.checkpointTime);
}
/**
* Save meta-data into fsimage files.
* and create empty edits.
*/
void saveCheckpoint() throws IOException {
saveNamespace(false);
}
private FSDirectory getFSDirectoryRootLock() {
return getFSNamesystem().dir;
}
static File getJSpoolDir(StorageDirectory sd) {
return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
}
static File getJSpoolFile(StorageDirectory sd) {
return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
}
/**
* Journal writer journals new meta-data state.
* <ol>
* <li> If Journal Spool state is OFF then journal records (edits)
* are applied directly to meta-data state in memory and are written
* to the edits file(s).</li>
* <li> If Journal Spool state is INPROGRESS then records are only
* written to edits.new file, which is called Spooling.</li>
* <li> Journal Spool state WAIT blocks journaling until the
* Journal Spool reader finalizes merging of the spooled data and
* switches to applying journal to memory.</li>
* </ol>
* @param length length of data.
* @param data serialized journal records.
* @throws IOException
* @see #convergeJournalSpool()
*/
synchronized void journal(int length, byte[] data) throws IOException {
assert backupInputStream.length() == 0 : "backup input stream is not empty";
try {
switch(jsState) {
case WAIT:
case OFF:
// wait until spooling is off
waitSpoolEnd();
// update NameSpace in memory
backupInputStream.setBytes(data);
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
int logVersion = storage.getLayoutVersion();
BufferedInputStream bin = new BufferedInputStream(backupInputStream);
DataInputStream in = new DataInputStream(bin);
Checksum checksum = null;
if (logVersion <= -28) { // support fsedits checksum
checksum = FSEditLog.getChecksum();
in = new DataInputStream(new CheckedInputStream(bin, checksum));
}
logLoader.loadEditRecords(logVersion, in, checksum, true);
getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
break;
case INPROGRESS:
break;
}
// write to files
editLog.logEdit(length, data);
editLog.logSync();
} finally {
backupInputStream.clear();
}
}
private synchronized void waitSpoolEnd() {
while(jsState == JSpoolState.WAIT) {
try {
wait();
} catch (InterruptedException e) {}
}
// now spooling should be off, verifying just in case
assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
}
/**
* Start journal spool.
* Switch to writing into edits.new instead of edits.
*
* edits.new for spooling is in separate directory "spool" rather than in
* "current" because the two directories should be independent.
* While spooling a checkpoint can happen and current will first
* move to lastcheckpoint.tmp and then to previous.checkpoint
* spool/edits.new will remain in place during that.
*/
synchronized void startJournalSpool(NamenodeRegistration nnReg)
throws IOException {
switch(jsState) {
case OFF:
break;
case INPROGRESS:
return;
case WAIT:
waitSpoolEnd();
}
// create journal spool directories
for (Iterator<StorageDirectory> it
= storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
StorageDirectory sd = it.next();
File jsDir = getJSpoolDir(sd);
if (!jsDir.exists() && !jsDir.mkdirs()) {
throw new IOException("Mkdirs failed to create "
+ jsDir.getCanonicalPath());
}
// create edit file if missing
File eFile = storage.getEditFile(sd);
if(!eFile.exists()) {
editLog.createEditLogFile(eFile);
}
}
if(!editLog.isOpen())
editLog.open();
// create streams pointing to the journal spool files
// subsequent journal records will go directly to the spool
editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
setCheckpointState(CheckpointStates.ROLLED_EDITS);
// set up spooling
if(backupInputStream == null)
backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
jsState = JSpoolState.INPROGRESS;
}
synchronized void setCheckpointTime(int length, byte[] data)
throws IOException {
assert backupInputStream.length() == 0 : "backup input stream is not empty";
try {
// unpack new checkpoint time
backupInputStream.setBytes(data);
DataInputStream in = backupInputStream.getDataInputStream();
byte op = in.readByte();
assert op == NamenodeProtocol.JA_CHECKPOINT_TIME;
LongWritable lw = new LongWritable();
lw.readFields(in);
storage.setCheckpointTimeInStorage(lw.get());
} finally {
backupInputStream.clear();
}
}
/**
* Merge Journal Spool to memory.<p>
* Journal Spool reader reads journal records from edits.new.
* When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
* This blocks journaling (see {@link #journal(int,byte[])}.
* The reader
* <ul>
* <li> reads remaining journal records if any,</li>
* <li> renames edits.new to edits,</li>
* <li> sets {@link JSpoolState} to OFF,</li>
* <li> and notifies the journaling thread.</li>
* </ul>
* Journaling resumes with applying new journal records to the memory state,
* and writing them into edits file(s).
*/
void convergeJournalSpool() throws IOException {
Iterator<StorageDirectory> itEdits
= storage.dirIterator(NameNodeDirType.EDITS);
if(! itEdits.hasNext())
throw new IOException("Could not locate checkpoint directories");
StorageDirectory sdEdits = itEdits.next();
int numEdits = 0;
File jSpoolFile = getJSpoolFile(sdEdits);
long startTime = now();
if(jSpoolFile.exists()) {
// load edits.new
EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
BufferedInputStream bin = new BufferedInputStream(edits);
DataInputStream in = new DataInputStream(bin);
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
int logVersion = logLoader.readLogVersion(in);
Checksum checksum = null;
if (logVersion <= -28) { // support fsedits checksum
checksum = FSEditLog.getChecksum();
in = new DataInputStream(new CheckedInputStream(bin, checksum));
}
numEdits += logLoader.loadEditRecords(logVersion, in, checksum, false);
// first time reached the end of spool
jsState = JSpoolState.WAIT;
numEdits += logLoader.loadEditRecords(logVersion,
in, checksum, true);
getFSNamesystem().dir.updateCountForINodeWithQuota();
edits.close();
}
FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
+ " of size " + jSpoolFile.length() + " edits # " + numEdits
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
// rename spool edits.new to edits making it in sync with the active node
// subsequent journal records will go directly to edits
editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
// write version file
resetVersion(false, storage.getImageDigest());
// wake up journal writer
synchronized(this) {
jsState = JSpoolState.OFF;
notifyAll();
}
// Rename lastcheckpoint.tmp to previous.checkpoint
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
storage.moveLastCheckpoint(sd);
}
}
}