blob: 4c8c2f4b2f8a7a3cec562618dc0edef629920d52 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.ChunkedArrayList;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
import static org.apache.hadoop.util.Time.now;
class FSDirWriteFileOp {
private FSDirWriteFileOp() {}
static boolean unprotectedRemoveBlock(
FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
Block block) throws IOException {
// modify file-> block and blocksMap
// fileNode should be under construction
BlockInfo uc = fileNode.removeLastBlock(block);
if (uc == null) {
return false;
}
if (uc.getUnderConstructionFeature() != null) {
DatanodeStorageInfo.decrementBlocksScheduled(uc
.getUnderConstructionFeature().getExpectedStorageLocations());
}
fsd.getBlockManager().removeBlockFromMap(uc);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
+" block is removed from the file system");
}
// update space consumed
fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
fileNode.getPreferredBlockReplication(), true);
return true;
}
/**
* Persist the block list for the inode.
*/
static void persistBlocks(
FSDirectory fsd, String path, INodeFile file, boolean logRetryCache) {
assert fsd.getFSNamesystem().hasWriteLock();
Preconditions.checkArgument(file.isUnderConstruction());
fsd.getEditLog().logUpdateBlocks(path, file, logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("persistBlocks: " + path
+ " with " + file.getBlocks().length + " blocks is persisted to" +
" the file system");
}
}
static void abandonBlock(
FSDirectory fsd, FSPermissionChecker pc, ExtendedBlock b, long fileId,
String src, String holder) throws IOException {
final INodesInPath iip = fsd.resolvePath(pc, src, fileId);
src = iip.getPath();
FSNamesystem fsn = fsd.getFSNamesystem();
final INodeFile file = fsn.checkLease(iip, holder, fileId);
Preconditions.checkState(file.isUnderConstruction());
Block localBlock = ExtendedBlock.getLocalBlock(b);
fsd.writeLock();
try {
// Remove the block from the pending creates list
if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
return;
}
} finally {
fsd.writeUnlock();
}
persistBlocks(fsd, src, file, false);
}
static void checkBlock(FSNamesystem fsn, ExtendedBlock block)
throws IOException {
String bpId = fsn.getBlockPoolId();
if (block != null && !bpId.equals(block.getBlockPoolId())) {
throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
+ " - expected " + bpId);
}
}
/**
* Part I of getAdditionalBlock().
* Analyze the state of the file under read lock to determine if the client
* can add a new block, detect potential retries, lease mismatches,
* and minimal replication of the penultimate block.
*
* Generate target DataNode locations for the new block,
* but do not create the new block yet.
*/
static ValidateAddBlockResult validateAddBlock(
FSNamesystem fsn, FSPermissionChecker pc,
String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
final long blockSize;
final int replication;
final byte storagePolicyID;
String clientMachine;
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
previous, onRetryBlock);
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
// This is a retry. No need to generate new locations.
// Use the last block if it has locations.
return null;
}
final INodeFile pendingFile = fileState.inode;
if (!fsn.checkFileProgress(src, pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet: " + src);
}
if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
throw new IOException("File has reached the limit on maximum number of"
+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+ "): " + pendingFile.getBlocks().length + " >= "
+ fsn.maxBlocksPerFile);
}
blockSize = pendingFile.getPreferredBlockSize();
clientMachine = pendingFile.getFileUnderConstructionFeature()
.getClientMachine();
replication = pendingFile.getFileReplication();
storagePolicyID = pendingFile.getStoragePolicyID();
return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
clientMachine);
}
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
DatanodeStorageInfo[] locs, long offset) throws IOException {
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
locs, offset, false);
fsn.getFSDirectory().getBlockManager()
.setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE);
return lBlk;
}
/**
* Part II of getAdditionalBlock().
* Should repeat the same analysis of the file state as in Part 1,
* but under the write lock.
* If the conditions still hold, then allocate a new block with
* the new targets, add it to the INode and to the BlocksMap.
*/
static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
long fileId, String clientName, ExtendedBlock previous,
DatanodeStorageInfo[] targets) throws IOException {
long offset;
// Run the full analysis again, since things could have changed
// while chooseTarget() was executing.
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
previous, onRetryBlock);
final INodeFile pendingFile = fileState.inode;
src = fileState.path;
if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations.
return onRetryBlock[0];
} else {
// add new chosen targets to already allocated block and return
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
lastBlockInFile.getUnderConstructionFeature().
setExpectedLocations(lastBlockInFile, targets);
offset = pendingFile.computeFileSize();
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
}
}
// commit the last block and complete it if it has minimum replicas
fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
ExtendedBlock.getLocalBlock(previous));
// allocate new block, record block locations in INode.
Block newBlock = fsn.createNewBlock();
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
persistNewBlock(fsn, src, pendingFile);
offset = pendingFile.computeFileSize();
// Return located block
return makeLocatedBlock(fsn, newBlock, targets, offset);
}
static DatanodeStorageInfo[] chooseTargetForNewBlock(
BlockManager bm, String src, DatanodeInfo[] excludedNodes,
String[] favoredNodes, EnumSet<AddBlockFlag> flags,
ValidateAddBlockResult r) throws IOException {
Node clientNode = bm.getDatanodeManager()
.getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
clientNode = getClientNode(bm, r.clientMachine);
}
Set<Node> excludedNodesSet = null;
if (excludedNodes != null) {
excludedNodesSet = new HashSet<>(excludedNodes.length);
Collections.addAll(excludedNodesSet, excludedNodes);
}
List<String> favoredNodesList = (favoredNodes == null) ? null
: Arrays.asList(favoredNodes);
// choose targets for the new block to be allocated.
return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID, flags);
}
/**
* Resolve clientmachine address to get a network location path
*/
static Node getClientNode(BlockManager bm, String clientMachine) {
List<String> hosts = new ArrayList<>(1);
hosts.add(clientMachine);
List<String> rName = bm.getDatanodeManager()
.resolveNetworkLocation(hosts);
Node clientNode = null;
if (rName != null) {
// Able to resolve clientMachine mapping.
// Create a temp node to findout the rack local nodes
clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
+ clientMachine);
}
return clientNode;
}
static INodesInPath resolvePathForStartFile(FSDirectory dir,
FSPermissionChecker pc, String src, EnumSet<CreateFlag> flag,
boolean createParent) throws IOException {
INodesInPath iip = dir.resolvePath(pc, src, DirOp.CREATE);
if (dir.isPermissionEnabled()) {
dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
}
INode inode = iip.getLastINode();
if (inode != null) {
// Verify that the destination does not exist as a directory already.
if (inode.isDirectory()) {
throw new FileAlreadyExistsException(iip.getPath() +
" already exists as a directory");
}
// Verifies it's indeed a file and perms allow overwrite
INodeFile.valueOf(inode, src);
if (dir.isPermissionEnabled() && flag.contains(CreateFlag.OVERWRITE)) {
dir.checkPathAccess(pc, iip, FsAction.WRITE);
}
} else {
if (!createParent) {
dir.verifyParentDir(iip);
}
if (!flag.contains(CreateFlag.CREATE)) {
throw new FileNotFoundException("Can't overwrite non-existent " + src);
}
}
return iip;
}
/**
* Create a new file or overwrite an existing file<br>
*
* Once the file is create the client then allocates a new block with the next
* call using {@link ClientProtocol#addBlock}.
* <p>
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create}
*/
static HdfsFileStatus startFile(
FSNamesystem fsn, INodesInPath iip,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,
FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean logRetryEntry)
throws IOException {
assert fsn.hasWriteLock();
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
final String src = iip.getPath();
FSDirectory fsd = fsn.getFSDirectory();
if (iip.getLastINode() != null) {
if (overwrite) {
List<INode> toRemoveINodes = new ChunkedArrayList<>();
List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
toRemoveINodes, toRemoveUCFiles, now());
if (ret >= 0) {
iip = INodesInPath.replace(iip, iip.length() - 1, null);
FSDirDeleteOp.incrDeletedFileCount(ret);
fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
}
} else {
// If lease soft limit time is expired, recover the lease
fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
}
}
fsn.checkFsObjectLimit();
INodeFile newNode = null;
INodesInPath parent =
FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
if (parent != null) {
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine);
newNode = iip != null ? iip.getLastINode().asFile() : null;
}
if (newNode == null) {
throw new IOException("Unable to add " + src + " to namespace");
}
fsn.leaseManager.addLease(
newNode.getFileUnderConstructionFeature().getClientName(),
newNode.getId());
if (feInfo != null) {
FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo);
}
setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newNode.getId() + " " + holder);
}
return FSDirStatAndListingOp.getFileInfo(fsd, iip);
}
static INodeFile addFileForEditLog(
FSDirectory fsd, long id, INodesInPath existing, byte[] localName,
PermissionStatus permissions, List<AclEntry> aclEntries,
List<XAttr> xAttrs, short replication, long modificationTime, long atime,
long preferredBlockSize, boolean underConstruction, String clientName,
String clientMachine, byte storagePolicyId) {
final INodeFile newNode;
assert fsd.hasWriteLock();
if (underConstruction) {
newNode = newINodeFile(id, permissions, modificationTime,
modificationTime, replication,
preferredBlockSize,
storagePolicyId);
newNode.toUnderConstruction(clientName, clientMachine);
} else {
newNode = newINodeFile(id, permissions, modificationTime,
atime, replication,
preferredBlockSize,
storagePolicyId);
}
newNode.setLocalName(localName);
try {
INodesInPath iip = fsd.addINode(existing, newNode);
if (iip != null) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
}
if (xAttrs != null) {
XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
}
return newNode;
}
} catch (IOException e) {
NameNode.stateChangeLog.warn(
"DIR* FSDirectory.unprotectedAddFile: exception when add " + existing
.getPath() + " to the file system", e);
if (e instanceof FSLimitException.MaxDirectoryItemsExceededException) {
NameNode.stateChangeLog.warn("Please increase "
+ "dfs.namenode.fs-limits.max-directory-items and make it "
+ "consistent across all NameNodes.");
}
}
return null;
}
/**
* Add a block to the file. Returns a reference to the added block.
*/
private static BlockInfo addBlock(
FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
DatanodeStorageInfo[] targets) throws IOException {
fsd.writeLock();
try {
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
fileINode.getFileReplication(), true);
// associate new last block for the file
BlockInfo blockInfo = new BlockInfoContiguous(block,
fileINode.getFileReplication());
blockInfo.convertToBlockUnderConstruction(
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
fileINode.addBlock(blockInfo);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
+ path + " with " + block
+ " block is added to the in-memory "
+ "file system");
}
return blockInfo;
} finally {
fsd.writeUnlock();
}
}
/**
* Add the given filename to the fs.
* @return the new INodesInPath instance that contains the new INode
*/
private static INodesInPath addFile(
FSDirectory fsd, INodesInPath existing, byte[] localName,
PermissionStatus permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine)
throws IOException {
long modTime = now();
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replication, preferredBlockSize);
newNode.setLocalName(localName);
newNode.toUnderConstruction(clientName, clientMachine);
INodesInPath newiip;
fsd.writeLock();
try {
newiip = fsd.addINode(existing, newNode);
} finally {
fsd.writeUnlock();
}
if (newiip == null) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
existing.getPath() + "/" + DFSUtil.bytes2String(localName));
return null;
}
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addFile: " +
DFSUtil.bytes2String(localName) + " is added");
}
return newiip;
}
private static FileState analyzeFileState(
FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
throws IOException {
assert fsn.hasReadLock();
String src = iip.getPath();
checkBlock(fsn, previous);
onRetryBlock[0] = null;
fsn.checkNameNodeSafeMode("Cannot add block to " + src);
// have we exceeded the configured limit of fs objects.
fsn.checkFsObjectLimit();
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
final INodeFile file = fsn.checkLease(iip, clientName, fileId);
BlockInfo lastBlockInFile = file.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
// four possibilities:
// 1) This is the first block allocation of an append() pipeline
// which started appending exactly at or exceeding the block boundary.
// In this case, the client isn't passed the previous block,
// so it makes the allocateBlock() call with previous=null.
// We can distinguish this since the last block of the file
// will be exactly a full block.
// 2) This is a retry from a client that missed the response of a
// prior getAdditionalBlock() call, perhaps because of a network
// timeout, or because of an HA failover. In that case, we know
// by the fact that the client is re-issuing the RPC that it
// never began to write to the old block. Hence it is safe to
// to return the existing block.
// 3) This is an entirely bogus request/bug -- we should error out
// rather than potentially appending a new block with an empty
// one in the middle, etc
// 4) This is a retry from a client that timed out while
// the prior getAdditionalBlock() is still being processed,
// currently working on chooseTarget().
// There are no means to distinguish between the first and
// the second attempts in Part I, because the first one hasn't
// changed the namesystem state yet.
// We run this analysis again in Part II where case 4 is impossible.
BlockInfo penultimateBlock = file.getPenultimateBlock();
if (previous == null &&
lastBlockInFile != null &&
lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() &&
lastBlockInFile.isComplete()) {
// Case 1
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
" writing to a file with a complete previous block: src=" +
src + " lastBlock=" + lastBlockInFile);
}
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
if (lastBlockInFile.getNumBytes() != 0) {
throw new IOException(
"Request looked like a retry to allocate block " +
lastBlockInFile + " but it already contains " +
lastBlockInFile.getNumBytes() + " bytes");
}
// Case 2
// Return the last block.
NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " +
"allocation of a new block in " + src + ". Returning previously" +
" allocated block " + lastBlockInFile);
long offset = file.computeFileSize();
BlockUnderConstructionFeature uc =
lastBlockInFile.getUnderConstructionFeature();
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
uc.getExpectedStorageLocations(), offset);
return new FileState(file, src, iip);
} else {
// Case 3
throw new IOException("Cannot allocate block in " + src + ": " +
"passed 'previous' block " + previous + " does not match actual " +
"last block in file " + lastBlockInFile);
}
}
return new FileState(file, src, iip);
}
static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
final String srcArg, String holder, ExtendedBlock last, long fileId)
throws IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
checkBlock(fsn, last);
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
boolean success = completeFileInternal(fsn, iip, holder,
ExtendedBlock.getLocalBlock(last),
fileId);
if (success) {
NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
+ " is closed by " + holder);
}
return success;
}
private static boolean completeFileInternal(
FSNamesystem fsn, INodesInPath iip,
String holder, Block last, long fileId)
throws IOException {
assert fsn.hasWriteLock();
final String src = iip.getPath();
final INodeFile pendingFile;
INode inode = null;
try {
inode = iip.getLastINode();
pendingFile = fsn.checkLease(iip, holder, fileId);
} catch (LeaseExpiredException lee) {
if (inode != null && inode.isFile() &&
!inode.asFile().isUnderConstruction()) {
// This could be a retry RPC - i.e the client tried to close
// the file, but missed the RPC response. Thus, it is trying
// again to close the file. If the file still exists and
// the client's view of the last block matches the actual
// last block, then we'll treat it as a successful close.
// See HDFS-3031.
final Block realLastBlock = inode.asFile().getLastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* completeFile: " +
"request from " + holder + " to complete inode " + fileId +
"(" + src + ") which is already closed. But, it appears to be " +
"an RPC retry. Returning success");
return true;
}
}
throw lee;
}
// Check the state of the penultimate block. It should be completed
// before attempting to complete the last one.
if (!fsn.checkFileProgress(src, pendingFile, false)) {
return false;
}
// commit the last block and complete it if it has minimum replicas
fsn.commitOrCompleteLastBlock(pendingFile, iip, last);
if (!fsn.checkFileProgress(src, pendingFile, true)) {
return false;
}
fsn.addCommittedBlocksToPending(pendingFile);
fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.CURRENT_STATE_ID, true);
return true;
}
private static INodeFile newINodeFile(
long id, PermissionStatus permissions, long mtime, long atime,
short replication, long preferredBlockSize, byte storagePolicyId) {
return new INodeFile(id, null, permissions, mtime, atime,
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
storagePolicyId);
}
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
long mtime, long atime, short replication, long preferredBlockSize) {
return newINodeFile(id, permissions, mtime, atime, replication,
preferredBlockSize, (byte)0);
}
/**
* Persist the new block (the last block of the given file).
*/
private static void persistNewBlock(
FSNamesystem fsn, String path, INodeFile file) {
Preconditions.checkArgument(file.isUnderConstruction());
fsn.getEditLog().logAddBlock(path, file);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("persistNewBlock: "
+ path + " with new block " + file.getLastBlock().toString()
+ ", current total block count is " + file.getBlocks().length);
}
}
/**
* Save allocated block at the given pending filename
*
* @param fsn FSNamesystem
* @param src path to the file
* @param inodesInPath representing each of the components of src.
* The last INode is the INode for {@code src} file.
* @param newBlock newly allocated block to be save
* @param targets target datanodes where replicas of the new block is placed
* @throws QuotaExceededException If addition of block exceeds space quota
*/
static void saveAllocatedBlock(
FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
DatanodeStorageInfo[] targets)
throws IOException {
assert fsn.hasWriteLock();
BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock,
targets);
logAllocatedBlock(src, b);
DatanodeStorageInfo.incrementBlocksScheduled(targets);
}
private static void logAllocatedBlock(String src, BlockInfo b) {
if (!NameNode.stateChangeLog.isInfoEnabled()) {
return;
}
StringBuilder sb = new StringBuilder(150);
sb.append("BLOCK* allocate ");
b.appendStringTo(sb);
sb.append(", ");
BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
if (uc != null) {
uc.appendUCPartsConcise(sb);
}
sb.append(" for " + src);
NameNode.stateChangeLog.info(sb.toString());
}
private static void setNewINodeStoragePolicy(BlockManager bm,
INodesInPath iip, boolean isLazyPersist) throws IOException {
INodeFile inode = iip.getLastINode().asFile();
if (isLazyPersist) {
BlockStoragePolicy lpPolicy =
bm.getStoragePolicy("LAZY_PERSIST");
// Set LAZY_PERSIST storage policy if the flag was passed to
// CreateFile.
if (lpPolicy == null) {
throw new HadoopIllegalArgumentException(
"The LAZY_PERSIST storage policy has been disabled " +
"by the administrator.");
}
inode.setStoragePolicyID(lpPolicy.getId(),
iip.getLatestSnapshotId());
} else {
BlockStoragePolicy effectivePolicy =
bm.getStoragePolicy(inode.getStoragePolicyID());
if (effectivePolicy != null &&
effectivePolicy.isCopyOnCreateFile()) {
// Copy effective policy from ancestor directory to current file.
inode.setStoragePolicyID(effectivePolicy.getId(),
iip.getLatestSnapshotId());
}
}
}
private static class FileState {
final INodeFile inode;
final String path;
final INodesInPath iip;
FileState(INodeFile inode, String fullPath, INodesInPath iip) {
this.inode = inode;
this.path = fullPath;
this.iip = iip;
}
}
static class ValidateAddBlockResult {
final long blockSize;
final int replication;
final byte storagePolicyID;
final String clientMachine;
ValidateAddBlockResult(
long blockSize, int replication, byte storagePolicyID,
String clientMachine) {
this.blockSize = blockSize;
this.replication = replication;
this.storagePolicyID = storagePolicyID;
this.clientMachine = clientMachine;
}
}
}