blob: 7127d112df30c287691ff792aa6ed1f8e83b68d8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp;
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType;
import java.io.EOFException;
import java.io.IOException;
/**
* Abstract class for File System based store.
*
* @param <M> manager filesystem store.Currently nodelabel will use
* CommonNodeLabelManager.
*/
public abstract class AbstractFSNodeStore<M> {
protected static final Log LOG = LogFactory.getLog(AbstractFSNodeStore.class);
private StoreType storeType;
private FSDataOutputStream editlogOs;
private Path editLogPath;
private StoreSchema schema;
protected M manager;
protected Path fsWorkingPath;
protected FileSystem fs;
public AbstractFSNodeStore(StoreType storeType) {
this.storeType = storeType;
}
protected void initStore(Configuration conf, Path fsStorePath,
StoreSchema schma, M mgr) throws IOException {
this.schema = schma;
this.fsWorkingPath = fsStorePath;
this.manager = mgr;
initFileSystem(conf);
// mkdir of root dir path
fs.mkdirs(fsWorkingPath);
LOG.info("Created store directory :" + fsWorkingPath);
}
/**
* Filesystem store schema define the log name and mirror name.
*/
public static class StoreSchema {
private String editLogName;
private String mirrorName;
public StoreSchema(String editLogName, String mirrorName) {
this.editLogName = editLogName;
this.mirrorName = mirrorName;
}
}
public void initFileSystem(Configuration conf) throws IOException {
Configuration confCopy = new Configuration(conf);
fs = fsWorkingPath.getFileSystem(confCopy);
// if it's local file system, use RawLocalFileSystem instead of
// LocalFileSystem, the latter one doesn't support append.
if (fs.getScheme().equals("file")) {
fs = ((LocalFileSystem) fs).getRaw();
}
}
protected void writeToLog(FSNodeStoreLogOp op) throws IOException {
try {
ensureAppendEditLogFile();
editlogOs.writeInt(op.getOpCode());
op.write(editlogOs, manager);
} finally {
ensureCloseEditlogFile();
}
}
protected void ensureAppendEditLogFile() throws IOException {
editlogOs = fs.append(editLogPath);
}
protected void ensureCloseEditlogFile() throws IOException {
editlogOs.close();
}
protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath)
throws IOException {
// If mirror.new exists, read from mirror.new
Path mirrorToRead = fs.exists(newMirrorPath) ?
newMirrorPath :
fs.exists(oldMirrorPath) ? oldMirrorPath : null;
if (mirrorToRead != null) {
try (FSDataInputStream is = fs.open(mirrorToRead)) {
StoreOp op = FSStoreOpHandler.getMirrorOp(storeType);
op.recover(is, manager);
}
}
}
protected StoreType getStoreType() {
return storeType;
}
public Path getFsWorkingPath() {
return fsWorkingPath;
}
protected void recoverFromStore() throws IOException {
/*
* Steps of recover
* 1) Read from last mirror (from mirror or mirror.old)
* 2) Read from last edit log, and apply such edit log
* 3) Write new mirror to mirror.writing
* 4) Rename mirror to mirror.old
* 5) Move mirror.writing to mirror
* 6) Remove mirror.old
* 7) Remove edit log and create a new empty edit log
*/
// Open mirror from serialized file
Path mirrorPath = new Path(fsWorkingPath, schema.mirrorName);
Path oldMirrorPath = new Path(fsWorkingPath, schema.mirrorName + ".old");
loadFromMirror(mirrorPath, oldMirrorPath);
// Open and process editlog
editLogPath = new Path(fsWorkingPath, schema.editLogName);
loadManagerFromEditLog(editLogPath);
// Serialize current mirror to mirror.writing
Path writingMirrorPath =
new Path(fsWorkingPath, schema.mirrorName + ".writing");
try(FSDataOutputStream os = fs.create(writingMirrorPath, true)){
StoreOp op = FSStoreOpHandler.getMirrorOp(storeType);
op.write(os, manager);
}
// Move mirror to mirror.old
if (fs.exists(mirrorPath)) {
fs.delete(oldMirrorPath, false);
fs.rename(mirrorPath, oldMirrorPath);
}
// move mirror.writing to mirror
fs.rename(writingMirrorPath, mirrorPath);
fs.delete(writingMirrorPath, false);
// remove mirror.old
fs.delete(oldMirrorPath, false);
// create a new editlog file
editlogOs = fs.create(editLogPath, true);
editlogOs.close();
LOG.info("Finished write mirror at:" + mirrorPath.toString());
LOG.info("Finished create editlog file at:" + editLogPath.toString());
}
protected void loadManagerFromEditLog(Path editPath) throws IOException {
if (!fs.exists(editPath)) {
return;
}
try (FSDataInputStream is = fs.open(editPath)) {
while (true) {
try {
StoreOp storeOp = FSStoreOpHandler.get(is.readInt(), storeType);
storeOp.recover(is, manager);
} catch (EOFException e) {
// EOF hit, break
break;
}
}
}
}
public FileSystem getFs() {
return fs;
}
public void setFs(FileSystem fs) {
this.fs = fs;
}
protected void closeFSStore() {
IOUtils.closeStreams(fs, editlogOs);
}
}