blob: 14a5f8fa307658f27f312c0ee3ef1ad5ff510008 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Charsets;
import com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection.DirEntry;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hdfs.server.namenode.FSImageUtil.MAGIC_HEADER;
/**
* Utility crawling an existing hierarchical FileSystem and emitting
* a valid FSImage/NN storage.
*/
// TODO: generalize to types beyond FileRegion
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ImageWriter implements Closeable {
private static final int ONDISK_VERSION = 1;
private static final int LAYOUT_VERSION = -64; // see NameNodeLayoutVersion
private final Path outdir;
private final FileSystem outfs;
private final File dirsTmp;
private final OutputStream dirs;
private final File inodesTmp;
private final OutputStream inodes;
private final MessageDigest digest;
private final FSImageCompression compress;
private final long startBlock;
private final long startInode;
private final UGIResolver ugis;
private final BlockAliasMap.Writer<FileRegion> blocks;
private final BlockResolver blockIds;
private final Map<Long, DirEntry.Builder> dircache;
private final TrackedOutputStream<DigestOutputStream> raw;
private boolean closed = false;
private long curSec;
private long curBlock;
private final AtomicLong curInode;
private final FileSummary.Builder summary = FileSummary.newBuilder()
.setOndiskVersion(ONDISK_VERSION)
.setLayoutVersion(LAYOUT_VERSION);
private final String blockPoolID;
public static Options defaults() {
return new Options();
}
@SuppressWarnings("unchecked")
public ImageWriter(Options opts) throws IOException {
final OutputStream out;
if (null == opts.outStream) {
FileSystem fs = opts.outdir.getFileSystem(opts.getConf());
outfs = (fs instanceof LocalFileSystem)
? ((LocalFileSystem)fs).getRaw()
: fs;
Path tmp = opts.outdir;
if (!outfs.mkdirs(tmp)) {
throw new IOException("Failed to create output dir: " + tmp);
}
try (NNStorage stor = new NNStorage(opts.getConf(),
Arrays.asList(tmp.toUri()), Arrays.asList(tmp.toUri()))) {
NamespaceInfo info = NNStorage.newNamespaceInfo();
if (info.getLayoutVersion() != LAYOUT_VERSION) {
throw new IllegalStateException("Incompatible layout " +
info.getLayoutVersion() + " (expected " + LAYOUT_VERSION);
}
// set the cluster id, if given
if (opts.clusterID.length() > 0) {
info.setClusterID(opts.clusterID);
}
// if block pool id is given
if (opts.blockPoolID.length() > 0) {
info.setBlockPoolID(opts.blockPoolID);
}
stor.format(info);
blockPoolID = info.getBlockPoolID();
}
outdir = new Path(tmp, "current");
out = outfs.create(new Path(outdir, "fsimage_0000000000000000000"));
} else {
outdir = null;
outfs = null;
out = opts.outStream;
blockPoolID = "";
}
digest = MD5Hash.getDigester();
raw = new TrackedOutputStream<>(new DigestOutputStream(
new BufferedOutputStream(out), digest));
compress = opts.compress;
CompressionCodec codec = compress.getImageCodec();
if (codec != null) {
summary.setCodec(codec.getClass().getCanonicalName());
}
startBlock = opts.startBlock;
curBlock = startBlock;
startInode = opts.startInode;
curInode = new AtomicLong(startInode);
dircache = Collections.synchronizedMap(new DirEntryCache(opts.maxdircache));
ugis = null == opts.ugis
? ReflectionUtils.newInstance(opts.ugisClass, opts.getConf())
: opts.ugis;
BlockAliasMap<FileRegion> fmt = null == opts.blocks
? ReflectionUtils.newInstance(opts.aliasMap, opts.getConf())
: opts.blocks;
blocks = fmt.getWriter(null, blockPoolID);
blockIds = null == opts.blockIds
? ReflectionUtils.newInstance(opts.blockIdsClass, opts.getConf())
: opts.blockIds;
// create directory and inode sections as side-files.
// The details are written to files to avoid keeping them in memory.
FileOutputStream dirsTmpStream = null;
try {
dirsTmp = File.createTempFile("fsimg_dir", null);
dirsTmp.deleteOnExit();
dirsTmpStream = new FileOutputStream(dirsTmp);
dirs = beginSection(dirsTmpStream);
} catch (IOException e) {
IOUtils.cleanupWithLogger(null, raw, dirsTmpStream);
throw e;
}
try {
inodesTmp = File.createTempFile("fsimg_inode", null);
inodesTmp.deleteOnExit();
inodes = new FileOutputStream(inodesTmp);
} catch (IOException e) {
IOUtils.cleanupWithLogger(null, raw, dirsTmpStream, dirs);
throw e;
}
raw.write(MAGIC_HEADER);
curSec = raw.pos;
assert raw.pos == MAGIC_HEADER.length;
}
public void accept(TreePath e) throws IOException {
assert e.getParentId() < curInode.get();
// allocate ID
long id = curInode.getAndIncrement();
e.accept(id);
assert e.getId() < curInode.get();
INode n = e.toINode(ugis, blockIds, blocks);
writeInode(n);
if (e.getParentId() > 0) {
// add DirEntry to map, which may page out entries
DirEntry.Builder de = DirEntry.newBuilder()
.setParent(e.getParentId())
.addChildren(e.getId());
dircache.put(e.getParentId(), de);
}
}
@SuppressWarnings("serial")
class DirEntryCache extends LinkedHashMap<Long, DirEntry.Builder> {
// should cache path to root, not evict LRCached
private final int nEntries;
DirEntryCache(int nEntries) {
this.nEntries = nEntries;
}
@Override
public DirEntry.Builder put(Long p, DirEntry.Builder b) {
DirEntry.Builder e = get(p);
if (null == e) {
return super.put(p, b);
}
// merge
e.addAllChildren(b.getChildrenList());
// not strictly conforming
return e;
}
@Override
protected boolean removeEldestEntry(Entry<Long, DirEntry.Builder> be) {
if (size() > nEntries) {
DirEntry d = be.getValue().build();
try {
writeDirEntry(d);
} catch (IOException e) {
throw new RuntimeException(e);
}
return true;
}
return false;
}
}
synchronized void writeInode(INode n) throws IOException {
n.writeDelimitedTo(inodes);
}
synchronized void writeDirEntry(DirEntry e) throws IOException {
e.writeDelimitedTo(dirs);
}
private static int getOndiskSize(com.google.protobuf.GeneratedMessage s) {
return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize())
+ s.getSerializedSize();
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
for (DirEntry.Builder b : dircache.values()) {
DirEntry e = b.build();
writeDirEntry(e);
}
dircache.clear();
// close side files
IOUtils.cleanupWithLogger(null, dirs, inodes, blocks);
if (null == dirs || null == inodes) {
// init failed
if (raw != null) {
raw.close();
}
return;
}
try {
writeNameSystemSection();
writeINodeSection();
writeDirSection();
writeStringTableSection();
// write summary directly to raw
FileSummary s = summary.build();
s.writeDelimitedTo(raw);
int length = getOndiskSize(s);
byte[] lengthBytes = new byte[4];
ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
raw.write(lengthBytes);
} finally {
raw.close();
}
writeMD5("fsimage_0000000000000000000");
closed = true;
}
/**
* Write checksum for image file. Pulled from MD5Utils/internals. Awkward to
* reuse existing tools/utils.
*/
void writeMD5(String imagename) throws IOException {
if (null == outdir) {
return;
}
MD5Hash md5 = new MD5Hash(digest.digest());
String digestString = StringUtils.byteToHexString(md5.getDigest());
Path chk = new Path(outdir, imagename + ".md5");
try (OutputStream out = outfs.create(chk)) {
String md5Line = digestString + " *" + imagename + "\n";
out.write(md5Line.getBytes(Charsets.UTF_8));
}
}
OutputStream beginSection(OutputStream out) throws IOException {
CompressionCodec codec = compress.getImageCodec();
if (null == codec) {
return out;
}
return codec.createOutputStream(out);
}
void endSection(OutputStream out, SectionName name) throws IOException {
CompressionCodec codec = compress.getImageCodec();
if (codec != null) {
((CompressorStream)out).finish();
}
out.flush();
long length = raw.pos - curSec;
summary.addSections(FileSummary.Section.newBuilder()
.setName(name.toString()) // not strictly correct, but name not visible
.setOffset(curSec).setLength(length));
curSec += length;
}
void writeNameSystemSection() throws IOException {
NameSystemSection.Builder b = NameSystemSection.newBuilder()
.setGenstampV1(1000)
.setGenstampV1Limit(0)
.setGenstampV2(1001)
.setLastAllocatedBlockId(blockIds.lastId())
.setTransactionId(0);
NameSystemSection s = b.build();
OutputStream sec = beginSection(raw);
s.writeDelimitedTo(sec);
endSection(sec, SectionName.NS_INFO);
}
void writeINodeSection() throws IOException {
// could reset dict to avoid compression cost in close
INodeSection.Builder b = INodeSection.newBuilder()
.setNumInodes(curInode.get() - startInode)
.setLastInodeId(curInode.get());
INodeSection s = b.build();
OutputStream sec = beginSection(raw);
s.writeDelimitedTo(sec);
// copy inodes
try (FileInputStream in = new FileInputStream(inodesTmp)) {
IOUtils.copyBytes(in, sec, 4096, false);
}
endSection(sec, SectionName.INODE);
}
void writeDirSection() throws IOException {
// No header, so dirs can be written/compressed independently
OutputStream sec = raw;
// copy dirs
try (FileInputStream in = new FileInputStream(dirsTmp)) {
IOUtils.copyBytes(in, sec, 4096, false);
}
endSection(sec, SectionName.INODE_DIR);
}
void writeFilesUCSection() throws IOException {
FilesUnderConstructionSection.Builder b =
FilesUnderConstructionSection.newBuilder();
FilesUnderConstructionSection s = b.build();
OutputStream sec = beginSection(raw);
s.writeDelimitedTo(sec);
endSection(sec, SectionName.FILES_UNDERCONSTRUCTION);
}
void writeSnapshotDiffSection() throws IOException {
SnapshotDiffSection.Builder b = SnapshotDiffSection.newBuilder();
SnapshotDiffSection s = b.build();
OutputStream sec = beginSection(raw);
s.writeDelimitedTo(sec);
endSection(sec, SectionName.SNAPSHOT_DIFF);
}
void writeSecretManagerSection() throws IOException {
SecretManagerSection.Builder b = SecretManagerSection.newBuilder()
.setCurrentId(0)
.setTokenSequenceNumber(0);
SecretManagerSection s = b.build();
OutputStream sec = beginSection(raw);
s.writeDelimitedTo(sec);
endSection(sec, SectionName.SECRET_MANAGER);
}
void writeCacheManagerSection() throws IOException {
CacheManagerSection.Builder b = CacheManagerSection.newBuilder()
.setNumPools(0)
.setNumDirectives(0)
.setNextDirectiveId(1);
CacheManagerSection s = b.build();
OutputStream sec = beginSection(raw);
s.writeDelimitedTo(sec);
endSection(sec, SectionName.CACHE_MANAGER);
}
void writeStringTableSection() throws IOException {
StringTableSection.Builder b = StringTableSection.newBuilder();
Map<Integer, String> u = ugis.ugiMap();
b.setNumEntry(u.size());
StringTableSection s = b.build();
OutputStream sec = beginSection(raw);
s.writeDelimitedTo(sec);
for (Map.Entry<Integer, String> e : u.entrySet()) {
StringTableSection.Entry.Builder x =
StringTableSection.Entry.newBuilder()
.setId(e.getKey())
.setStr(e.getValue());
x.build().writeDelimitedTo(sec);
}
endSection(sec, SectionName.STRING_TABLE);
}
@Override
public synchronized String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{ codec=\"").append(compress.getImageCodec());
sb.append("\", startBlock=").append(startBlock);
sb.append(", curBlock=").append(curBlock);
sb.append(", startInode=").append(startInode);
sb.append(", curInode=").append(curInode);
sb.append(", ugi=").append(ugis);
sb.append(", blockIds=").append(blockIds);
sb.append(", offset=").append(raw.pos);
sb.append(" }");
return sb.toString();
}
static class TrackedOutputStream<T extends OutputStream>
extends FilterOutputStream {
private long pos = 0L;
TrackedOutputStream(T out) {
super(out);
}
@SuppressWarnings("unchecked")
public T getInner() {
return (T) out;
}
@Override
public void write(int b) throws IOException {
out.write(b);
++pos;
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
pos += len;
}
@Override
public void flush() throws IOException {
super.flush();
}
@Override
public void close() throws IOException {
super.close();
}
}
/**
* Configurable options for image generation mapping pluggable components.
*/
public static class Options implements Configurable {
public static final String START_INODE = "hdfs.image.writer.start.inode";
public static final String CACHE_ENTRY = "hdfs.image.writer.cache.entries";
public static final String UGI_CLASS = "hdfs.image.writer.ugi.class";
public static final String BLOCK_RESOLVER_CLASS =
"hdfs.image.writer.blockresolver.class";
private Path outdir;
private Configuration conf;
private OutputStream outStream;
private int maxdircache;
private long startBlock;
private long startInode;
private UGIResolver ugis;
private Class<? extends UGIResolver> ugisClass;
private BlockAliasMap<FileRegion> blocks;
private String clusterID;
private String blockPoolID;
@SuppressWarnings("rawtypes")
private Class<? extends BlockAliasMap> aliasMap;
private BlockResolver blockIds;
private Class<? extends BlockResolver> blockIdsClass;
private FSImageCompression compress =
FSImageCompression.createNoopCompression();
protected Options() {
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
String def = new File("hdfs/name").toURI().toString();
outdir = new Path(conf.get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, def));
startBlock = conf.getLong(FixedBlockResolver.START_BLOCK, (1L << 30) + 1);
startInode = conf.getLong(START_INODE, (1L << 14) + 1);
maxdircache = conf.getInt(CACHE_ENTRY, 100);
ugisClass = conf.getClass(UGI_CLASS,
SingleUGIResolver.class, UGIResolver.class);
aliasMap = conf.getClass(
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
NullBlockAliasMap.class, BlockAliasMap.class);
blockIdsClass = conf.getClass(BLOCK_RESOLVER_CLASS,
FixedBlockResolver.class, BlockResolver.class);
clusterID = "";
blockPoolID = "";
}
@Override
public Configuration getConf() {
return conf;
}
public Options output(String out) {
this.outdir = new Path(out);
return this;
}
public Options outStream(OutputStream outStream) {
this.outStream = outStream;
return this;
}
public Options codec(String codec) throws IOException {
this.compress = FSImageCompression.createCompression(getConf(), codec);
return this;
}
public Options cache(int nDirEntries) {
this.maxdircache = nDirEntries;
return this;
}
public Options ugi(UGIResolver ugis) {
this.ugis = ugis;
return this;
}
public Options ugi(Class<? extends UGIResolver> ugisClass) {
this.ugisClass = ugisClass;
return this;
}
public Options blockIds(BlockResolver blockIds) {
this.blockIds = blockIds;
return this;
}
public Options blockIds(Class<? extends BlockResolver> blockIdsClass) {
this.blockIdsClass = blockIdsClass;
return this;
}
public Options blocks(BlockAliasMap<FileRegion> blocks) {
this.blocks = blocks;
return this;
}
@SuppressWarnings("rawtypes")
public Options blocks(Class<? extends BlockAliasMap> blocksClass) {
this.aliasMap = blocksClass;
return this;
}
public Options clusterID(String clusterID) {
this.clusterID = clusterID;
return this;
}
public Options blockPoolID(String blockPoolID) {
this.blockPoolID = blockPoolID;
return this;
}
}
}