blob: 306ecef3ae97c9a5d4afd38d245f8da88ae8f974 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.LongWritable;
public class BackupStorage extends FSImage {
// Names of the journal spool directory and the spool file
private static final String STORAGE_JSPOOL_DIR = "jspool";
private static final String STORAGE_JSPOOL_FILE =
NameNodeFile.EDITS_NEW.getName();
/** Backup input stream for loading edits into memory */
private EditLogBackupInputStream backupInputStream;
/** Is journal spooling in progress */
volatile JSpoolState jsState;
static enum JSpoolState {
OFF,
INPROGRESS,
WAIT;
}
/**
*/
BackupStorage() {
super();
jsState = JSpoolState.OFF;
}
@Override
public boolean isConversionNeeded(StorageDirectory sd) {
return false;
}
/**
* Analyze backup storage directories for consistency.<br>
* Recover from incomplete checkpoints if required.<br>
* Read VERSION and fstime files if exist.<br>
* Do not load image or edits.
*
* @param imageDirs list of image directories as URI.
* @param editsDirs list of edits directories URI.
* @throws IOException if the node should shutdown.
*/
void recoverCreateRead(Collection<URI> imageDirs,
Collection<URI> editsDirs) throws IOException {
setStorageDirectories(imageDirs, editsDirs);
this.checkpointTime = 0L;
for(Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState;
try {
curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
// sd is locked but not opened
switch(curState) {
case NON_EXISTENT:
// fail if any of the configured storage dirs are inaccessible
throw new InconsistentFSStateException(sd.getRoot(),
"checkpoint directory does not exist or is not accessible.");
case NOT_FORMATTED:
// for backup node all directories may be unformatted initially
LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
LOG.info("Formatting ...");
sd.clearDirectory(); // create empty current
break;
case NORMAL:
break;
default: // recovery is possible
sd.doRecover(curState);
}
if(curState != StorageState.NOT_FORMATTED) {
sd.read(); // read and verify consistency with other directories
}
} catch(IOException ioe) {
sd.unlock();
throw ioe;
}
}
}
/**
* Reset storage directories.
* <p>
* Unlock the storage.
* Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
* and recreate empty <code>current</code>.
* @throws IOException
*/
synchronized void reset() throws IOException {
// reset NameSpace tree
FSDirectory fsDir = getFSNamesystem().dir;
fsDir.reset();
// unlock, close and rename storage directories
unlockAll();
// recover from unsuccessful checkpoint if necessary
recoverCreateRead(getImageDirectories(), getEditsDirectories());
// rename and recreate
for(StorageDirectory sd : storageDirs) {
// rename current to lastcheckpoint.tmp
moveCurrent(sd);
}
}
/**
* Load checkpoint from local files only if the memory state is empty.<br>
* Set new checkpoint time received from the name-node.<br>
* Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
* @throws IOException
*/
void loadCheckpoint(CheckpointSignature sig) throws IOException {
// load current image and journal if it is not in memory already
if(!editLog.isOpen())
editLog.open();
FSDirectory fsDir = getFSNamesystem().dir;
if(fsDir.isEmpty()) {
Iterator<StorageDirectory> itImage = dirIterator(NameNodeDirType.IMAGE);
Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS);
if(!itImage.hasNext() || ! itEdits.hasNext())
throw new IOException("Could not locate checkpoint directories");
StorageDirectory sdName = itImage.next();
StorageDirectory sdEdits = itEdits.next();
synchronized(getFSDirectoryRootLock()) { // load image under rootDir lock
loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
}
loadFSEdits(sdEdits);
}
// set storage fields
setStorageInfo(sig);
checkpointTime = sig.checkpointTime;
}
/**
* Save meta-data into fsimage files.
* and create empty edits.
*/
void saveCheckpoint() throws IOException {
saveNamespace(false);
}
private Object getFSDirectoryRootLock() {
return getFSNamesystem().dir.rootDir;
}
static File getJSpoolDir(StorageDirectory sd) {
return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
}
static File getJSpoolFile(StorageDirectory sd) {
return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
}
/**
* Journal writer journals new meta-data state.
* <ol>
* <li> If Journal Spool state is OFF then journal records (edits)
* are applied directly to meta-data state in memory and are written
* to the edits file(s).</li>
* <li> If Journal Spool state is INPROGRESS then records are only
* written to edits.new file, which is called Spooling.</li>
* <li> Journal Spool state WAIT blocks journaling until the
* Journal Spool reader finalizes merging of the spooled data and
* switches to applying journal to memory.</li>
* </ol>
* @param length length of data.
* @param data serialized journal records.
* @throws IOException
* @see #convergeJournalSpool()
*/
synchronized void journal(int length, byte[] data) throws IOException {
assert backupInputStream.length() == 0 : "backup input stream is not empty";
try {
switch(jsState) {
case WAIT:
case OFF:
// wait until spooling is off
waitSpoolEnd();
// update NameSpace in memory
backupInputStream.setBytes(data);
editLog.loadEditRecords(getLayoutVersion(),
backupInputStream.getDataInputStream(), true);
getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
break;
case INPROGRESS:
break;
}
// write to files
editLog.logEdit(length, data);
editLog.logSync();
} finally {
backupInputStream.clear();
}
}
private synchronized void waitSpoolEnd() {
while(jsState == JSpoolState.WAIT) {
try {
wait();
} catch (InterruptedException e) {}
}
// now spooling should be off, verifying just in case
assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
}
/**
* Start journal spool.
* Switch to writing into edits.new instead of edits.
*
* edits.new for spooling is in separate directory "spool" rather than in
* "current" because the two directories should be independent.
* While spooling a checkpoint can happen and current will first
* move to lastcheckpoint.tmp and then to previous.checkpoint
* spool/edits.new will remain in place during that.
*/
synchronized void startJournalSpool(NamenodeRegistration nnReg)
throws IOException {
switch(jsState) {
case OFF:
break;
case INPROGRESS:
return;
case WAIT:
waitSpoolEnd();
}
// create journal spool directories
for(Iterator<StorageDirectory> it =
dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
StorageDirectory sd = it.next();
File jsDir = getJSpoolDir(sd);
if (!jsDir.exists() && !jsDir.mkdirs()) {
throw new IOException("Mkdirs failed to create "
+ jsDir.getCanonicalPath());
}
// create edit file if missing
File eFile = getEditFile(sd);
if(!eFile.exists()) {
editLog.createEditLogFile(eFile);
}
}
if(!editLog.isOpen())
editLog.open();
// create streams pointing to the journal spool files
// subsequent journal records will go directly to the spool
editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
setCheckpointState(CheckpointStates.ROLLED_EDITS);
// set up spooling
if(backupInputStream == null)
backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
jsState = JSpoolState.INPROGRESS;
}
synchronized void setCheckpointTime(int length, byte[] data)
throws IOException {
assert backupInputStream.length() == 0 : "backup input stream is not empty";
try {
// unpack new checkpoint time
backupInputStream.setBytes(data);
DataInputStream in = backupInputStream.getDataInputStream();
byte op = in.readByte();
assert op == NamenodeProtocol.JA_CHECKPOINT_TIME;
LongWritable lw = new LongWritable();
lw.readFields(in);
setCheckpointTime(lw.get());
} finally {
backupInputStream.clear();
}
}
/**
* Merge Journal Spool to memory.<p>
* Journal Spool reader reads journal records from edits.new.
* When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
* This blocks journaling (see {@link #journal(int,byte[])}.
* The reader
* <ul>
* <li> reads remaining journal records if any,</li>
* <li> renames edits.new to edits,</li>
* <li> sets {@link JSpoolState} to OFF,</li>
* <li> and notifies the journaling thread.</li>
* </ul>
* Journaling resumes with applying new journal records to the memory state,
* and writing them into edits file(s).
*/
void convergeJournalSpool() throws IOException {
Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS);
if(! itEdits.hasNext())
throw new IOException("Could not locate checkpoint directories");
StorageDirectory sdEdits = itEdits.next();
int numEdits = 0;
File jSpoolFile = getJSpoolFile(sdEdits);
long startTime = FSNamesystem.now();
if(jSpoolFile.exists()) {
// load edits.new
EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
DataInputStream in = edits.getDataInputStream();
numEdits += editLog.loadFSEdits(in, false);
// first time reached the end of spool
jsState = JSpoolState.WAIT;
numEdits += editLog.loadEditRecords(getLayoutVersion(), in, true);
getFSNamesystem().dir.updateCountForINodeWithQuota();
edits.close();
}
FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
+ " of size " + jSpoolFile.length() + " edits # " + numEdits
+ " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
// rename spool edits.new to edits making it in sync with the active node
// subsequent journal records will go directly to edits
editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
// write version file
resetVersion(false);
// wake up journal writer
synchronized(this) {
jsState = JSpoolState.OFF;
notifyAll();
}
// Rename lastcheckpoint.tmp to previous.checkpoint
for(StorageDirectory sd : storageDirs) {
moveLastCheckpoint(sd);
}
}
}