| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.security.DigestOutputStream; |
| import java.security.MessageDigest; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.protocol.LayoutVersion; |
| import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; |
| import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; |
| import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection; |
| import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary; |
| import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection; |
| import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection; |
| import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection; |
| import org.apache.hadoop.hdfs.server.namenode.snapshot.FSImageFormatPBSnapshot; |
| import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; |
| import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; |
| import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; |
| import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; |
| import org.apache.hadoop.hdfs.util.MD5FileUtils; |
| import org.apache.hadoop.io.MD5Hash; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.io.compress.CompressorStream; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.io.LimitInputStream; |
| import com.google.protobuf.CodedOutputStream; |
| |
| /** |
| * Utility class to read / write fsimage in protobuf format. |
| */ |
| @InterfaceAudience.Private |
| public final class FSImageFormatProtobuf { |
| private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class); |
| |
| public static final class Loader implements FSImageFormat.AbstractLoader { |
| static final int MINIMUM_FILE_LENGTH = 8; |
| private final Configuration conf; |
| private final FSNamesystem fsn; |
| |
| private String[] stringTable; |
| |
| /** The MD5 sum of the loaded file */ |
| private MD5Hash imgDigest; |
| /** The transaction ID of the last edit represented by the loaded file */ |
| private long imgTxId; |
| |
| Loader(Configuration conf, FSNamesystem fsn) { |
| this.conf = conf; |
| this.fsn = fsn; |
| } |
| |
| @Override |
| public MD5Hash getLoadedImageMd5() { |
| return imgDigest; |
| } |
| |
| @Override |
| public long getLoadedImageTxId() { |
| return imgTxId; |
| } |
| |
| public String[] getStringTable() { |
| return stringTable; |
| } |
| |
| void load(File file) throws IOException { |
| long start = System.currentTimeMillis(); |
| imgDigest = MD5FileUtils.computeMd5ForFile(file); |
| RandomAccessFile raFile = new RandomAccessFile(file, "r"); |
| FileInputStream fin = new FileInputStream(file); |
| try { |
| loadInternal(raFile, fin); |
| long end = System.currentTimeMillis(); |
| LOG.info("Loaded FSImage in " + (end - start) / 1000 + " seconds."); |
| } finally { |
| fin.close(); |
| raFile.close(); |
| } |
| } |
| |
| private void loadInternal(RandomAccessFile raFile, FileInputStream fin) |
| throws IOException { |
| if (!FSImageUtil.checkFileFormat(raFile)) { |
| throw new IOException("Unrecognized file format"); |
| } |
| FileSummary summary = FSImageUtil.loadSummary(raFile); |
| |
| FileChannel channel = fin.getChannel(); |
| |
| FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader( |
| fsn, this); |
| FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader( |
| fsn, this); |
| |
| ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary |
| .getSectionsList()); |
| Collections.sort(sections, new Comparator<FileSummary.Section>() { |
| @Override |
| public int compare(FileSummary.Section s1, FileSummary.Section s2) { |
| SectionName n1 = SectionName.fromString(s1.getName()); |
| SectionName n2 = SectionName.fromString(s2.getName()); |
| if (n1 == null) { |
| return n2 == null ? 0 : -1; |
| } else if (n2 == null) { |
| return -1; |
| } else { |
| return n1.ordinal() - n2.ordinal(); |
| } |
| } |
| }); |
| |
| StartupProgress prog = NameNode.getStartupProgress(); |
| /** |
| * beginStep() and the endStep() calls do not match the boundary of the |
| * sections. This is because that the current implementation only allows |
| * a particular step to be started for once. |
| */ |
| Step currentStep = null; |
| |
| for (FileSummary.Section s : sections) { |
| channel.position(s.getOffset()); |
| InputStream in = new BufferedInputStream(new LimitInputStream(fin, |
| s.getLength())); |
| |
| in = FSImageUtil.wrapInputStreamForCompression(conf, |
| summary.getCodec(), in); |
| |
| String n = s.getName(); |
| |
| switch (SectionName.fromString(n)) { |
| case NS_INFO: |
| loadNameSystemSection(in); |
| break; |
| case STRING_TABLE: |
| loadStringTableSection(in); |
| break; |
| case INODE: { |
| currentStep = new Step(StepType.INODES); |
| prog.beginStep(Phase.LOADING_FSIMAGE, currentStep); |
| inodeLoader.loadINodeSection(in); |
| } |
| break; |
| case INODE_DIR: |
| inodeLoader.loadINodeDirectorySection(in); |
| break; |
| case FILES_UNDERCONSTRUCTION: |
| inodeLoader.loadFilesUnderConstructionSection(in); |
| break; |
| case SNAPSHOT: |
| snapshotLoader.loadSnapshotSection(in); |
| break; |
| case SNAPSHOT_DIFF: |
| snapshotLoader.loadSnapshotDiffSection(in); |
| break; |
| case SECRET_MANAGER: { |
| prog.endStep(Phase.LOADING_FSIMAGE, currentStep); |
| Step step = new Step(StepType.DELEGATION_TOKENS); |
| prog.beginStep(Phase.LOADING_FSIMAGE, step); |
| loadSecretManagerSection(in); |
| prog.endStep(Phase.LOADING_FSIMAGE, step); |
| } |
| break; |
| case CACHE_MANAGER: { |
| Step step = new Step(StepType.CACHE_POOLS); |
| prog.beginStep(Phase.LOADING_FSIMAGE, step); |
| loadCacheManagerSection(in); |
| prog.endStep(Phase.LOADING_FSIMAGE, step); |
| } |
| break; |
| default: |
| LOG.warn("Unregconized section " + n); |
| break; |
| } |
| } |
| } |
| |
| private void loadNameSystemSection(InputStream in) throws IOException { |
| NameSystemSection s = NameSystemSection.parseDelimitedFrom(in); |
| fsn.setGenerationStampV1(s.getGenstampV1()); |
| fsn.setGenerationStampV2(s.getGenstampV2()); |
| fsn.setGenerationStampV1Limit(s.getGenstampV1Limit()); |
| fsn.setLastAllocatedBlockId(s.getLastAllocatedBlockId()); |
| imgTxId = s.getTransactionId(); |
| } |
| |
| private void loadStringTableSection(InputStream in) throws IOException { |
| StringTableSection s = StringTableSection.parseDelimitedFrom(in); |
| stringTable = new String[s.getNumEntry() + 1]; |
| for (int i = 0; i < s.getNumEntry(); ++i) { |
| StringTableSection.Entry e = StringTableSection.Entry |
| .parseDelimitedFrom(in); |
| stringTable[e.getId()] = e.getStr(); |
| } |
| } |
| |
| private void loadSecretManagerSection(InputStream in) throws IOException { |
| SecretManagerSection s = SecretManagerSection.parseDelimitedFrom(in); |
| int numKeys = s.getNumKeys(), numTokens = s.getNumTokens(); |
| ArrayList<SecretManagerSection.DelegationKey> keys = Lists |
| .newArrayListWithCapacity(numKeys); |
| ArrayList<SecretManagerSection.PersistToken> tokens = Lists |
| .newArrayListWithCapacity(numTokens); |
| |
| for (int i = 0; i < numKeys; ++i) |
| keys.add(SecretManagerSection.DelegationKey.parseDelimitedFrom(in)); |
| |
| for (int i = 0; i < numTokens; ++i) |
| tokens.add(SecretManagerSection.PersistToken.parseDelimitedFrom(in)); |
| |
| fsn.loadSecretManagerState(s, keys, tokens); |
| } |
| |
| private void loadCacheManagerSection(InputStream in) throws IOException { |
| CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(in); |
| ArrayList<CachePoolInfoProto> pools = Lists.newArrayListWithCapacity(s |
| .getNumPools()); |
| ArrayList<CacheDirectiveInfoProto> directives = Lists |
| .newArrayListWithCapacity(s.getNumDirectives()); |
| for (int i = 0; i < s.getNumPools(); ++i) |
| pools.add(CachePoolInfoProto.parseDelimitedFrom(in)); |
| for (int i = 0; i < s.getNumDirectives(); ++i) |
| directives.add(CacheDirectiveInfoProto.parseDelimitedFrom(in)); |
| fsn.getCacheManager().loadState( |
| new CacheManager.PersistState(s, pools, directives)); |
| } |
| |
| } |
| |
| public static final class Saver { |
| private final SaveNamespaceContext context; |
| private long currentOffset = FSImageUtil.MAGIC_HEADER.length; |
| private MD5Hash savedDigest; |
| private StringMap stringMap = new StringMap(); |
| |
| private FileChannel fileChannel; |
| // OutputStream for the section data |
| private OutputStream sectionOutputStream; |
| private CompressionCodec codec; |
| private OutputStream underlyingOutputStream; |
| public static final int CHECK_CANCEL_INTERVAL = 4096; |
| |
| Saver(SaveNamespaceContext context) { |
| this.context = context; |
| } |
| |
| public MD5Hash getSavedDigest() { |
| return savedDigest; |
| } |
| |
| public SaveNamespaceContext getContext() { |
| return context; |
| } |
| |
| public void commitSection(FileSummary.Builder summary, SectionName name) |
| throws IOException { |
| long oldOffset = currentOffset; |
| flushSectionOutputStream(); |
| |
| if (codec != null) { |
| sectionOutputStream = codec.createOutputStream(underlyingOutputStream); |
| } else { |
| sectionOutputStream = underlyingOutputStream; |
| } |
| long length = fileChannel.position() - oldOffset; |
| summary.addSections(FileSummary.Section.newBuilder().setName(name.name) |
| .setLength(length).setOffset(currentOffset)); |
| currentOffset += length; |
| } |
| |
| private void flushSectionOutputStream() throws IOException { |
| if (codec != null) { |
| ((CompressorStream) sectionOutputStream).finish(); |
| } |
| sectionOutputStream.flush(); |
| } |
| |
| void save(File file, FSImageCompression compression) throws IOException { |
| FileOutputStream fout = new FileOutputStream(file); |
| fileChannel = fout.getChannel(); |
| try { |
| saveInternal(fout, compression, file.getAbsolutePath().toString()); |
| } finally { |
| fout.close(); |
| } |
| } |
| |
| private static void saveFileSummary(OutputStream out, FileSummary summary) |
| throws IOException { |
| summary.writeDelimitedTo(out); |
| int length = getOndiskTrunkSize(summary); |
| byte[] lengthBytes = new byte[4]; |
| ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length); |
| out.write(lengthBytes); |
| } |
| |
| private void saveInodes(FileSummary.Builder summary) throws IOException { |
| FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this, |
| summary); |
| |
| saver.serializeINodeSection(sectionOutputStream); |
| saver.serializeINodeDirectorySection(sectionOutputStream); |
| saver.serializeFilesUCSection(sectionOutputStream); |
| } |
| |
| private void saveSnapshots(FileSummary.Builder summary) throws IOException { |
| FSImageFormatPBSnapshot.Saver snapshotSaver = new FSImageFormatPBSnapshot.Saver( |
| this, summary, context, context.getSourceNamesystem()); |
| |
| snapshotSaver.serializeSnapshotSection(sectionOutputStream); |
| snapshotSaver.serializeSnapshotDiffSection(sectionOutputStream); |
| } |
| |
| private void saveInternal(FileOutputStream fout, |
| FSImageCompression compression, String filePath) throws IOException { |
| StartupProgress prog = NameNode.getStartupProgress(); |
| MessageDigest digester = MD5Hash.getDigester(); |
| |
| underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream( |
| fout), digester); |
| underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER); |
| |
| fileChannel = fout.getChannel(); |
| |
| FileSummary.Builder b = FileSummary.newBuilder() |
| .setOndiskVersion(FSImageUtil.FILE_VERSION) |
| .setLayoutVersion(LayoutVersion.getCurrentLayoutVersion()); |
| |
| codec = compression.getImageCodec(); |
| if (codec != null) { |
| b.setCodec(codec.getClass().getCanonicalName()); |
| sectionOutputStream = codec.createOutputStream(underlyingOutputStream); |
| } else { |
| sectionOutputStream = underlyingOutputStream; |
| } |
| |
| saveNameSystemSection(b); |
| // Check for cancellation right after serializing the name system section. |
| // Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace |
| // depends on this behavior. |
| context.checkCancelled(); |
| |
| Step step = new Step(StepType.INODES, filePath); |
| prog.beginStep(Phase.SAVING_CHECKPOINT, step); |
| saveInodes(b); |
| saveSnapshots(b); |
| prog.endStep(Phase.SAVING_CHECKPOINT, step); |
| |
| step = new Step(StepType.DELEGATION_TOKENS, filePath); |
| prog.beginStep(Phase.SAVING_CHECKPOINT, step); |
| saveSecretManagerSection(b); |
| prog.endStep(Phase.SAVING_CHECKPOINT, step); |
| |
| step = new Step(StepType.CACHE_POOLS, filePath); |
| prog.beginStep(Phase.SAVING_CHECKPOINT, step); |
| saveCacheManagerSection(b); |
| prog.endStep(Phase.SAVING_CHECKPOINT, step); |
| |
| saveStringTableSection(b); |
| |
| // We use the underlyingOutputStream to write the header. Therefore flush |
| // the buffered stream (which is potentially compressed) first. |
| flushSectionOutputStream(); |
| |
| FileSummary summary = b.build(); |
| saveFileSummary(underlyingOutputStream, summary); |
| underlyingOutputStream.close(); |
| savedDigest = new MD5Hash(digester.digest()); |
| } |
| |
| private void saveSecretManagerSection(FileSummary.Builder summary) |
| throws IOException { |
| final FSNamesystem fsn = context.getSourceNamesystem(); |
| DelegationTokenSecretManager.SecretManagerState state = fsn |
| .saveSecretManagerState(); |
| state.section.writeDelimitedTo(sectionOutputStream); |
| for (SecretManagerSection.DelegationKey k : state.keys) |
| k.writeDelimitedTo(sectionOutputStream); |
| |
| for (SecretManagerSection.PersistToken t : state.tokens) |
| t.writeDelimitedTo(sectionOutputStream); |
| |
| commitSection(summary, SectionName.SECRET_MANAGER); |
| } |
| |
| private void saveCacheManagerSection(FileSummary.Builder summary) |
| throws IOException { |
| final FSNamesystem fsn = context.getSourceNamesystem(); |
| CacheManager.PersistState state = fsn.getCacheManager().saveState(); |
| state.section.writeDelimitedTo(sectionOutputStream); |
| |
| for (CachePoolInfoProto p : state.pools) |
| p.writeDelimitedTo(sectionOutputStream); |
| |
| for (CacheDirectiveInfoProto p : state.directives) |
| p.writeDelimitedTo(sectionOutputStream); |
| |
| commitSection(summary, SectionName.CACHE_MANAGER); |
| } |
| |
| private void saveNameSystemSection(FileSummary.Builder summary) |
| throws IOException { |
| final FSNamesystem fsn = context.getSourceNamesystem(); |
| OutputStream out = sectionOutputStream; |
| NameSystemSection.Builder b = NameSystemSection.newBuilder() |
| .setGenstampV1(fsn.getGenerationStampV1()) |
| .setGenstampV1Limit(fsn.getGenerationStampV1Limit()) |
| .setGenstampV2(fsn.getGenerationStampV2()) |
| .setLastAllocatedBlockId(fsn.getLastAllocatedBlockId()) |
| .setTransactionId(context.getTxId()); |
| |
| // We use the non-locked version of getNamespaceInfo here since |
| // the coordinating thread of saveNamespace already has read-locked |
| // the namespace for us. If we attempt to take another readlock |
| // from the actual saver thread, there's a potential of a |
| // fairness-related deadlock. See the comments on HDFS-2223. |
| b.setNamespaceId(fsn.unprotectedGetNamespaceInfo().getNamespaceID()); |
| NameSystemSection s = b.build(); |
| s.writeDelimitedTo(out); |
| |
| commitSection(summary, SectionName.NS_INFO); |
| } |
| |
| private void saveStringTableSection(FileSummary.Builder summary) |
| throws IOException { |
| OutputStream out = sectionOutputStream; |
| StringTableSection.Builder b = StringTableSection.newBuilder() |
| .setNumEntry(stringMap.size()); |
| b.build().writeDelimitedTo(out); |
| for (Entry<String, Integer> e : stringMap.entrySet()) { |
| StringTableSection.Entry.Builder eb = StringTableSection.Entry |
| .newBuilder().setId(e.getValue()).setStr(e.getKey()); |
| eb.build().writeDelimitedTo(out); |
| } |
| commitSection(summary, SectionName.STRING_TABLE); |
| } |
| |
| public StringMap getStringMap() { |
| return stringMap; |
| } |
| } |
| |
| public static class StringMap { |
| private final Map<String, Integer> stringMap; |
| |
| public StringMap() { |
| stringMap = Maps.newHashMap(); |
| } |
| |
| int getStringId(String str) { |
| if (str == null) { |
| return 0; |
| } |
| Integer v = stringMap.get(str); |
| if (v == null) { |
| int nv = stringMap.size() + 1; |
| stringMap.put(str, nv); |
| return nv; |
| } |
| return v; |
| } |
| |
| int size() { |
| return stringMap.size(); |
| } |
| |
| Set<Entry<String, Integer>> entrySet() { |
| return stringMap.entrySet(); |
| } |
| } |
| |
| /** |
| * Supported section name. The order of the enum determines the order of |
| * loading. |
| */ |
| public enum SectionName { |
| NS_INFO("NS_INFO"), |
| STRING_TABLE("STRING_TABLE"), |
| INODE("INODE"), |
| SNAPSHOT("SNAPSHOT"), |
| INODE_DIR("INODE_DIR"), |
| FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"), |
| SNAPSHOT_DIFF("SNAPSHOT_DIFF"), |
| SECRET_MANAGER("SECRET_MANAGER"), |
| CACHE_MANAGER("CACHE_MANAGER"); |
| |
| private static final SectionName[] values = SectionName.values(); |
| |
| public static SectionName fromString(String name) { |
| for (SectionName n : values) { |
| if (n.name.equals(name)) |
| return n; |
| } |
| return null; |
| } |
| |
| private final String name; |
| |
| private SectionName(String name) { |
| this.name = name; |
| } |
| } |
| |
| private static int getOndiskTrunkSize(com.google.protobuf.GeneratedMessage s) { |
| return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize()) |
| + s.getSerializedSize(); |
| } |
| |
| private FSImageFormatProtobuf() { |
| } |
| } |