| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import java.io.BufferedInputStream; |
| import java.io.DataInput; |
| import java.io.DataInputStream; |
| import java.io.DataOutput; |
| import java.io.EOFException; |
| import java.io.FilterInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Arrays; |
| import java.util.zip.CheckedInputStream; |
| import java.util.zip.Checksum; |
| |
| import org.apache.hadoop.fs.ChecksumException; |
| import org.apache.hadoop.fs.Options.Rename; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.fs.permission.PermissionStatus; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.DatanodeID; |
| import org.apache.hadoop.hdfs.protocol.FSConstants; |
| import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
| import org.apache.hadoop.hdfs.protocol.LayoutVersion; |
| import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; |
| import static org.apache.hadoop.hdfs.server.common.Util.now; |
| import org.apache.hadoop.hdfs.server.common.GenerationStamp; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableFactories; |
| import org.apache.hadoop.io.WritableFactory; |
| import org.apache.hadoop.security.token.delegation.DelegationKey; |
| |
| public class FSEditLogLoader { |
| private final FSNamesystem fsNamesys; |
| |
| public FSEditLogLoader(FSNamesystem fsNamesys) { |
| this.fsNamesys = fsNamesys; |
| } |
| |
| /** |
| * Load an edit log, and apply the changes to the in-memory structure |
| * This is where we apply edits that we've been writing to disk all |
| * along. |
| */ |
| int loadFSEdits(EditLogInputStream edits) throws IOException { |
| long startTime = now(); |
| int numEdits = loadFSEdits(edits, true); |
| FSImage.LOG.info("Edits file " + edits.getName() |
| + " of size " + edits.length() + " edits # " + numEdits |
| + " loaded in " + (now()-startTime)/1000 + " seconds."); |
| return numEdits; |
| } |
| |
| /** |
| * Read the header of fsedit log |
| * @param in fsedit stream |
| * @return the edit log version number |
| * @throws IOException if error occurs |
| */ |
| int readLogVersion(DataInputStream in) throws IOException { |
| int logVersion = 0; |
| // Read log file version. Could be missing. |
| in.mark(4); |
| // If edits log is greater than 2G, available method will return negative |
| // numbers, so we avoid having to call available |
| boolean available = true; |
| try { |
| logVersion = in.readByte(); |
| } catch (EOFException e) { |
| available = false; |
| } |
| if (available) { |
| in.reset(); |
| logVersion = in.readInt(); |
| if (logVersion < FSConstants.LAYOUT_VERSION) // future version |
| throw new IOException( |
| "Unexpected version of the file system log file: " |
| + logVersion + ". Current version = " |
| + FSConstants.LAYOUT_VERSION + "."); |
| } |
| assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION : |
| "Unsupported version " + logVersion; |
| return logVersion; |
| } |
| |
| int loadFSEdits(EditLogInputStream edits, boolean closeOnExit) throws IOException { |
| BufferedInputStream bin = new BufferedInputStream(edits); |
| DataInputStream in = new DataInputStream(bin); |
| |
| int numEdits = 0; |
| int logVersion = 0; |
| |
| try { |
| logVersion = readLogVersion(in); |
| Checksum checksum = null; |
| if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) { |
| checksum = FSEditLog.getChecksum(); |
| in = new DataInputStream(new CheckedInputStream(bin, checksum)); |
| } |
| |
| numEdits = loadEditRecords(logVersion, in, checksum, false); |
| } finally { |
| if(closeOnExit) |
| in.close(); |
| } |
| if (logVersion != FSConstants.LAYOUT_VERSION) // other version |
| numEdits++; // save this image asap |
| return numEdits; |
| } |
| |
| @SuppressWarnings("deprecation") |
| int loadEditRecords(int logVersion, DataInputStream in, |
| Checksum checksum, boolean closeOnExit) throws IOException { |
| FSDirectory fsDir = fsNamesys.dir; |
| int numEdits = 0; |
| String clientName = null; |
| String clientMachine = null; |
| String path = null; |
| int numOpAdd = 0, numOpClose = 0, numOpDelete = 0, |
| numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0, |
| numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0, |
| numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, |
| numOpSymlink = 0, numOpGetDelegationToken = 0, |
| numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, |
| numOpUpdateMasterKey = 0, numOpReassignLease = 0, numOpOther = 0; |
| |
| // Keep track of the file offsets of the last several opcodes. |
| // This is handy when manually recovering corrupted edits files. |
| PositionTrackingInputStream tracker = new PositionTrackingInputStream(in); |
| in = new DataInputStream(tracker); |
| long recentOpcodeOffsets[] = new long[4]; |
| Arrays.fill(recentOpcodeOffsets, -1); |
| |
| try { |
| try { |
| while (true) { |
| long timestamp = 0; |
| long mtime = 0; |
| long atime = 0; |
| long blockSize = 0; |
| FSEditLogOpCodes opCode; |
| try { |
| if (checksum != null) { |
| checksum.reset(); |
| } |
| in.mark(1); |
| byte opCodeByte = in.readByte(); |
| opCode = FSEditLogOpCodes.fromByte(opCodeByte); |
| if (opCode == FSEditLogOpCodes.OP_INVALID) { |
| in.reset(); // reset back to end of file if somebody reads it again |
| break; // no more transactions |
| } |
| } catch (EOFException e) { |
| break; // no more transactions |
| } |
| recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] = |
| tracker.getPos(); |
| numEdits++; |
| switch (opCode) { |
| case OP_ADD: |
| case OP_CLOSE: { |
| // versions > 0 support per file replication |
| // get name and replication |
| int length = in.readInt(); |
| if (-7 == logVersion && length != 3|| |
| -17 < logVersion && logVersion < -7 && length != 4 || |
| logVersion <= -17 && length != 5) { |
| throw new IOException("Incorrect data format." + |
| " logVersion is " + logVersion + |
| " but writables.length is " + |
| length + ". "); |
| } |
| path = FSImageSerialization.readString(in); |
| short replication = fsNamesys.adjustReplication(readShort(in)); |
| mtime = readLong(in); |
| if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) { |
| atime = readLong(in); |
| } |
| if (logVersion < -7) { |
| blockSize = readLong(in); |
| } |
| // get blocks |
| boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD); |
| BlockInfo blocks[] = |
| readBlocks(in, logVersion, isFileUnderConstruction, replication); |
| |
| // Older versions of HDFS does not store the block size in inode. |
| // If the file has more than one block, use the size of the |
| // first block as the blocksize. Otherwise use the default |
| // block size. |
| if (-8 <= logVersion && blockSize == 0) { |
| if (blocks.length > 1) { |
| blockSize = blocks[0].getNumBytes(); |
| } else { |
| long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0); |
| blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first); |
| } |
| } |
| |
| PermissionStatus permissions = fsNamesys.getUpgradePermission(); |
| if (logVersion <= -11) { |
| permissions = PermissionStatus.read(in); |
| } |
| |
| // clientname, clientMachine and block locations of last block. |
| if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) { |
| clientName = FSImageSerialization.readString(in); |
| clientMachine = FSImageSerialization.readString(in); |
| if (-13 <= logVersion) { |
| readDatanodeDescriptorArray(in); |
| } |
| } else { |
| clientName = ""; |
| clientMachine = ""; |
| } |
| |
| // The open lease transaction re-creates a file if necessary. |
| // Delete the file if it already exists. |
| if (FSNamesystem.LOG.isDebugEnabled()) { |
| FSNamesystem.LOG.debug(opCode + ": " + path + |
| " numblocks : " + blocks.length + |
| " clientHolder " + clientName + |
| " clientMachine " + clientMachine); |
| } |
| |
| fsDir.unprotectedDelete(path, mtime); |
| |
| // add to the file tree |
| INodeFile node = (INodeFile)fsDir.unprotectedAddFile( |
| path, permissions, |
| blocks, replication, |
| mtime, atime, blockSize); |
| if (isFileUnderConstruction) { |
| numOpAdd++; |
| // |
| // Replace current node with a INodeUnderConstruction. |
| // Recreate in-memory lease record. |
| // |
| INodeFileUnderConstruction cons = new INodeFileUnderConstruction( |
| node.getLocalNameBytes(), |
| node.getReplication(), |
| node.getModificationTime(), |
| node.getPreferredBlockSize(), |
| node.getBlocks(), |
| node.getPermissionStatus(), |
| clientName, |
| clientMachine, |
| null); |
| fsDir.replaceNode(path, node, cons); |
| fsNamesys.leaseManager.addLease(cons.getClientName(), path); |
| } |
| break; |
| } |
| case OP_SET_REPLICATION: { |
| numOpSetRepl++; |
| path = FSImageSerialization.readString(in); |
| short replication = fsNamesys.adjustReplication(readShort(in)); |
| fsDir.unprotectedSetReplication(path, replication, null); |
| break; |
| } |
| case OP_CONCAT_DELETE: { |
| numOpConcatDelete++; |
| int length = in.readInt(); |
| if (length < 3) { // trg, srcs.., timestam |
| throw new IOException("Incorrect data format. " |
| + "Mkdir operation."); |
| } |
| String trg = FSImageSerialization.readString(in); |
| int srcSize = length - 1 - 1; //trg and timestamp |
| String [] srcs = new String [srcSize]; |
| for(int i=0; i<srcSize;i++) { |
| srcs[i]= FSImageSerialization.readString(in); |
| } |
| timestamp = readLong(in); |
| fsDir.unprotectedConcat(trg, srcs); |
| break; |
| } |
| case OP_RENAME_OLD: { |
| numOpRenameOld++; |
| int length = in.readInt(); |
| if (length != 3) { |
| throw new IOException("Incorrect data format. " |
| + "Mkdir operation."); |
| } |
| String s = FSImageSerialization.readString(in); |
| String d = FSImageSerialization.readString(in); |
| timestamp = readLong(in); |
| HdfsFileStatus dinfo = fsDir.getFileInfo(d, false); |
| fsDir.unprotectedRenameTo(s, d, timestamp); |
| fsNamesys.changeLease(s, d, dinfo); |
| break; |
| } |
| case OP_DELETE: { |
| numOpDelete++; |
| int length = in.readInt(); |
| if (length != 2) { |
| throw new IOException("Incorrect data format. " |
| + "delete operation."); |
| } |
| path = FSImageSerialization.readString(in); |
| timestamp = readLong(in); |
| fsDir.unprotectedDelete(path, timestamp); |
| break; |
| } |
| case OP_MKDIR: { |
| numOpMkDir++; |
| PermissionStatus permissions = fsNamesys.getUpgradePermission(); |
| int length = in.readInt(); |
| if (-17 < logVersion && length != 2 || |
| logVersion <= -17 && length != 3) { |
| throw new IOException("Incorrect data format. " |
| + "Mkdir operation."); |
| } |
| path = FSImageSerialization.readString(in); |
| timestamp = readLong(in); |
| |
| // The disk format stores atimes for directories as well. |
| // However, currently this is not being updated/used because of |
| // performance reasons. |
| if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) { |
| atime = readLong(in); |
| } |
| |
| if (logVersion <= -11) { |
| permissions = PermissionStatus.read(in); |
| } |
| fsDir.unprotectedMkdir(path, permissions, timestamp); |
| break; |
| } |
| case OP_SET_GENSTAMP: { |
| numOpSetGenStamp++; |
| long lw = in.readLong(); |
| fsNamesys.setGenerationStamp(lw); |
| break; |
| } |
| case OP_DATANODE_ADD: { |
| numOpOther++; |
| //Datanodes are not persistent any more. |
| FSImageSerialization.DatanodeImage.skipOne(in); |
| break; |
| } |
| case OP_DATANODE_REMOVE: { |
| numOpOther++; |
| DatanodeID nodeID = new DatanodeID(); |
| nodeID.readFields(in); |
| //Datanodes are not persistent any more. |
| break; |
| } |
| case OP_SET_PERMISSIONS: { |
| numOpSetPerm++; |
| fsDir.unprotectedSetPermission( |
| FSImageSerialization.readString(in), FsPermission.read(in)); |
| break; |
| } |
| case OP_SET_OWNER: { |
| numOpSetOwner++; |
| if (logVersion > -11) |
| throw new IOException("Unexpected opCode " + opCode |
| + " for version " + logVersion); |
| fsDir.unprotectedSetOwner(FSImageSerialization.readString(in), |
| FSImageSerialization.readString_EmptyAsNull(in), |
| FSImageSerialization.readString_EmptyAsNull(in)); |
| break; |
| } |
| case OP_SET_NS_QUOTA: { |
| fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), |
| readLongWritable(in), |
| FSConstants.QUOTA_DONT_SET); |
| break; |
| } |
| case OP_CLEAR_NS_QUOTA: { |
| fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), |
| FSConstants.QUOTA_RESET, |
| FSConstants.QUOTA_DONT_SET); |
| break; |
| } |
| |
| case OP_SET_QUOTA: |
| fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), |
| readLongWritable(in), |
| readLongWritable(in)); |
| |
| break; |
| |
| case OP_TIMES: { |
| numOpTimes++; |
| int length = in.readInt(); |
| if (length != 3) { |
| throw new IOException("Incorrect data format. " |
| + "times operation."); |
| } |
| path = FSImageSerialization.readString(in); |
| mtime = readLong(in); |
| atime = readLong(in); |
| fsDir.unprotectedSetTimes(path, mtime, atime, true); |
| break; |
| } |
| case OP_SYMLINK: { |
| numOpSymlink++; |
| int length = in.readInt(); |
| if (length != 4) { |
| throw new IOException("Incorrect data format. " |
| + "symlink operation."); |
| } |
| path = FSImageSerialization.readString(in); |
| String value = FSImageSerialization.readString(in); |
| mtime = readLong(in); |
| atime = readLong(in); |
| PermissionStatus perm = PermissionStatus.read(in); |
| fsDir.unprotectedSymlink(path, value, mtime, atime, perm); |
| break; |
| } |
| case OP_RENAME: { |
| numOpRename++; |
| int length = in.readInt(); |
| if (length != 3) { |
| throw new IOException("Incorrect data format. " |
| + "Mkdir operation."); |
| } |
| String s = FSImageSerialization.readString(in); |
| String d = FSImageSerialization.readString(in); |
| timestamp = readLong(in); |
| Rename[] options = readRenameOptions(in); |
| HdfsFileStatus dinfo = fsDir.getFileInfo(d, false); |
| fsDir.unprotectedRenameTo(s, d, timestamp, options); |
| fsNamesys.changeLease(s, d, dinfo); |
| break; |
| } |
| case OP_GET_DELEGATION_TOKEN: { |
| numOpGetDelegationToken++; |
| DelegationTokenIdentifier delegationTokenId = |
| new DelegationTokenIdentifier(); |
| delegationTokenId.readFields(in); |
| long expiryTime = readLong(in); |
| fsNamesys.getDelegationTokenSecretManager() |
| .addPersistedDelegationToken(delegationTokenId, expiryTime); |
| break; |
| } |
| case OP_RENEW_DELEGATION_TOKEN: { |
| numOpRenewDelegationToken++; |
| DelegationTokenIdentifier delegationTokenId = |
| new DelegationTokenIdentifier(); |
| delegationTokenId.readFields(in); |
| long expiryTime = readLong(in); |
| fsNamesys.getDelegationTokenSecretManager() |
| .updatePersistedTokenRenewal(delegationTokenId, expiryTime); |
| break; |
| } |
| case OP_CANCEL_DELEGATION_TOKEN: { |
| numOpCancelDelegationToken++; |
| DelegationTokenIdentifier delegationTokenId = |
| new DelegationTokenIdentifier(); |
| delegationTokenId.readFields(in); |
| fsNamesys.getDelegationTokenSecretManager() |
| .updatePersistedTokenCancellation(delegationTokenId); |
| break; |
| } |
| case OP_UPDATE_MASTER_KEY: { |
| numOpUpdateMasterKey++; |
| DelegationKey delegationKey = new DelegationKey(); |
| delegationKey.readFields(in); |
| fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey( |
| delegationKey); |
| break; |
| } |
| case OP_REASSIGN_LEASE: { |
| numOpReassignLease++; |
| String leaseHolder = FSImageSerialization.readString(in); |
| path = FSImageSerialization.readString(in); |
| String newHolder = FSImageSerialization.readString(in); |
| Lease lease = fsNamesys.leaseManager.getLease(leaseHolder); |
| INodeFileUnderConstruction pendingFile = |
| (INodeFileUnderConstruction) fsDir.getFileINode(path); |
| fsNamesys.reassignLeaseInternal(lease, path, newHolder, pendingFile); |
| break; |
| } |
| default: { |
| throw new IOException("Never seen opCode " + opCode); |
| } |
| } |
| validateChecksum(in, checksum, numEdits); |
| } |
| } catch (IOException ex) { |
| check203UpgradeFailure(logVersion, ex); |
| } finally { |
| if(closeOnExit) |
| in.close(); |
| } |
| } catch (Throwable t) { |
| // Catch Throwable because in the case of a truly corrupt edits log, any |
| // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.) |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Error replaying edit log at offset " + tracker.getPos()); |
| if (recentOpcodeOffsets[0] != -1) { |
| Arrays.sort(recentOpcodeOffsets); |
| sb.append("\nRecent opcode offsets:"); |
| for (long offset : recentOpcodeOffsets) { |
| if (offset != -1) { |
| sb.append(' ').append(offset); |
| } |
| } |
| } |
| String errorMessage = sb.toString(); |
| FSImage.LOG.error(errorMessage); |
| throw new IOException(errorMessage, t); |
| } |
| if (FSImage.LOG.isDebugEnabled()) { |
| FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose |
| + " numOpDelete = " + numOpDelete |
| + " numOpRenameOld = " + numOpRenameOld |
| + " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir |
| + " numOpSetPerm = " + numOpSetPerm |
| + " numOpSetOwner = " + numOpSetOwner |
| + " numOpSetGenStamp = " + numOpSetGenStamp |
| + " numOpTimes = " + numOpTimes |
| + " numOpConcatDelete = " + numOpConcatDelete |
| + " numOpRename = " + numOpRename |
| + " numOpGetDelegationToken = " + numOpGetDelegationToken |
| + " numOpRenewDelegationToken = " + numOpRenewDelegationToken |
| + " numOpCancelDelegationToken = " + numOpCancelDelegationToken |
| + " numOpUpdateMasterKey = " + numOpUpdateMasterKey |
| + " numOpReassignLease = " + numOpReassignLease |
| + " numOpOther = " + numOpOther); |
| } |
| return numEdits; |
| } |
| |
| /** |
| * Validate a transaction's checksum |
| */ |
| private static void validateChecksum( |
| DataInputStream in, Checksum checksum, int tid) |
| throws IOException { |
| if (checksum != null) { |
| int calculatedChecksum = (int)checksum.getValue(); |
| int readChecksum = in.readInt(); // read in checksum |
| if (readChecksum != calculatedChecksum) { |
| throw new ChecksumException( |
| "Transaction " + tid + " is corrupt. Calculated checksum is " + |
| calculatedChecksum + " but read checksum " + readChecksum, tid); |
| } |
| } |
| } |
| |
| /** |
| * A class to read in blocks stored in the old format. The only two |
| * fields in the block were blockid and length. |
| */ |
| static class BlockTwo implements Writable { |
| long blkid; |
| long len; |
| |
| static { // register a ctor |
| WritableFactories.setFactory |
| (BlockTwo.class, |
| new WritableFactory() { |
| public Writable newInstance() { return new BlockTwo(); } |
| }); |
| } |
| |
| |
| BlockTwo() { |
| blkid = 0; |
| len = 0; |
| } |
| ///////////////////////////////////// |
| // Writable |
| ///////////////////////////////////// |
| public void write(DataOutput out) throws IOException { |
| out.writeLong(blkid); |
| out.writeLong(len); |
| } |
| |
| public void readFields(DataInput in) throws IOException { |
| this.blkid = in.readLong(); |
| this.len = in.readLong(); |
| } |
| } |
| |
| /** This method is defined for compatibility reason. */ |
| static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in |
| ) throws IOException { |
| DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()]; |
| for (int i = 0; i < locations.length; i++) { |
| locations[i] = new DatanodeDescriptor(); |
| locations[i].readFieldsFromFSEditLog(in); |
| } |
| return locations; |
| } |
| |
| static private short readShort(DataInputStream in) throws IOException { |
| return Short.parseShort(FSImageSerialization.readString(in)); |
| } |
| |
| static private long readLong(DataInputStream in) throws IOException { |
| return Long.parseLong(FSImageSerialization.readString(in)); |
| } |
| |
| // a place holder for reading a long |
| private static final LongWritable longWritable = new LongWritable(); |
| |
| /** Read an integer from an input stream */ |
| private static long readLongWritable(DataInputStream in) throws IOException { |
| synchronized (longWritable) { |
| longWritable.readFields(in); |
| return longWritable.get(); |
| } |
| } |
| |
| static Rename[] readRenameOptions(DataInputStream in) throws IOException { |
| BytesWritable writable = new BytesWritable(); |
| writable.readFields(in); |
| |
| byte[] bytes = writable.getBytes(); |
| Rename[] options = new Rename[bytes.length]; |
| |
| for (int i = 0; i < bytes.length; i++) { |
| options[i] = Rename.valueOf(bytes[i]); |
| } |
| return options; |
| } |
| |
| static private BlockInfo[] readBlocks( |
| DataInputStream in, |
| int logVersion, |
| boolean isFileUnderConstruction, |
| short replication) throws IOException { |
| int numBlocks = in.readInt(); |
| BlockInfo[] blocks = new BlockInfo[numBlocks]; |
| Block blk = new Block(); |
| BlockTwo oldblk = new BlockTwo(); |
| for (int i = 0; i < numBlocks; i++) { |
| if (logVersion <= -14) { |
| blk.readFields(in); |
| } else { |
| oldblk.readFields(in); |
| blk.set(oldblk.blkid, oldblk.len, |
| GenerationStamp.GRANDFATHER_GENERATION_STAMP); |
| } |
| if(isFileUnderConstruction && i == numBlocks-1) |
| blocks[i] = new BlockInfoUnderConstruction(blk, replication); |
| else |
| blocks[i] = new BlockInfo(blk, replication); |
| } |
| return blocks; |
| } |
| |
| /** |
| * Throw appropriate exception during upgrade from 203, when editlog loading |
| * could fail due to opcode conflicts. |
| */ |
| private void check203UpgradeFailure(int logVersion, IOException ex) |
| throws IOException { |
| // 0.20.203 version version has conflicting opcodes with the later releases. |
| // The editlog must be emptied by restarting the namenode, before proceeding |
| // with the upgrade. |
| if (Storage.is203LayoutVersion(logVersion) |
| && logVersion != FSConstants.LAYOUT_VERSION) { |
| String msg = "During upgrade failed to load the editlog version " |
| + logVersion + " from release 0.20.203. Please go back to the old " |
| + " release and restart the namenode. This empties the editlog " |
| + " and saves the namespace. Resume the upgrade after this step."; |
| throw new IOException(msg, ex); |
| } else { |
| throw ex; |
| } |
| } |
| |
| /** |
| * Stream wrapper that keeps track of the current file position. |
| */ |
| private static class PositionTrackingInputStream extends FilterInputStream { |
| private long curPos = 0; |
| private long markPos = -1; |
| |
| public PositionTrackingInputStream(InputStream is) { |
| super(is); |
| } |
| |
| public int read() throws IOException { |
| int ret = super.read(); |
| if (ret != -1) curPos++; |
| return ret; |
| } |
| |
| public int read(byte[] data) throws IOException { |
| int ret = super.read(data); |
| if (ret > 0) curPos += ret; |
| return ret; |
| } |
| |
| public int read(byte[] data, int offset, int length) throws IOException { |
| int ret = super.read(data, offset, length); |
| if (ret > 0) curPos += ret; |
| return ret; |
| } |
| |
| public void mark(int limit) { |
| super.mark(limit); |
| markPos = curPos; |
| } |
| |
| public void reset() throws IOException { |
| if (markPos == -1) { |
| throw new IOException("Not marked!"); |
| } |
| super.reset(); |
| curPos = markPos; |
| markPos = -1; |
| } |
| |
| public long getPos() { |
| return curPos; |
| } |
| } |
| } |