blob: 9e9aa93b5db26faff4e66cf8bc95445894f92b83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.PermissionStatus;
import com.google.protobuf.ByteString;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.ChunkedArrayList;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
import static org.apache.hadoop.util.Time.now;
class FSDirWriteFileOp {
private FSDirWriteFileOp() {}
static boolean unprotectedRemoveBlock(
FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
Block block) throws IOException {
// modify file-> block and blocksMap
// fileNode should be under construction
BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
if (uc == null) {
return false;
}
fsd.getBlockManager().removeBlockFromMap(block);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
+" block is removed from the file system");
}
// update space consumed
fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
uc.getReplication(), true);
return true;
}
/**
* Persist the block list for the inode.
*/
static void persistBlocks(
FSDirectory fsd, String path, INodeFile file, boolean logRetryCache) {
throw new IllegalStateException("Unimplemented");
}
static void persistBlocks(
RWTransaction tx, String path, FlatINode inode) {
FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class);
Preconditions.checkArgument(f != null && f.inConstruction());
tx.logUpdateBlocks(path, f);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"persistBlocks: " + path + " with " + f.numBlocks() + " " +
"blocks is persisted to the file system");
}
}
static void abandonBlock(
FSDirectory fsd, FSPermissionChecker pc, ExtendedBlock b, long fileId,
String src, String holder) throws IOException {
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
final INode inode;
final INodesInPath iip;
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
iip = fsd.getINodesInPath(src, true);
inode = iip.getLastINode();
} else {
inode = fsd.getInode(fileId);
iip = INodesInPath.fromINode(inode);
if (inode != null) {
src = iip.getPath();
}
}
FSNamesystem fsn = fsd.getFSNamesystem();
final INodeFile file = fsn.checkLease(src, holder, inode, fileId);
Preconditions.checkState(file.isUnderConstruction());
Block localBlock = ExtendedBlock.getLocalBlock(b);
fsd.writeLock();
try {
// Remove the block from the pending creates list
if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
return;
}
} finally {
fsd.writeUnlock();
}
persistBlocks(fsd, src, file, false);
}
static void checkBlock(FSNamesystem fsn, ExtendedBlock block)
throws IOException {
String bpId = fsn.getBlockPoolId();
if (block != null && !bpId.equals(block.getBlockPoolId())) {
throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
+ " - expected " + bpId);
}
}
/**
* Part I of getAdditionalBlock().
* Analyze the state of the file under read lock to determine if the client
* can add a new block, detect potential retries, lease mismatches,
* and minimal replication of the penultimate block.
*
* Generate target DataNode locations for the new block,
* but do not create the new block yet.
*/
static ValidateAddBlockResult validateAddBlock(
FSNamesystem fsn, FSPermissionChecker pc,
String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
final long blockSize;
final int replication;
final byte storagePolicyID;
String clientMachine;
FSDirectory fsd = fsn.getFSDirectory();
try (ROTransaction tx = fsd.newROTransaction().begin()) {
FileState fileState = analyzeFileState(tx, fsn, src, fileId, clientName,
previous, onRetryBlock);
final FlatINode pendingFile = fileState.inode;
FlatINodeFileFeature f = pendingFile.feature(FlatINodeFileFeature.class);
// Check if the penultimate block is minimally replicated
if (!fsn.checkFileProgress(src, f, false)) {
throw new NotReplicatedYetException("Not replicated yet: " + src);
}
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
// This is a retry. No need to generate new locations.
// Use the last block if it has locations.
return null;
}
if (f.numBlocks() >= fsn.maxBlocksPerFile) {
throw new IOException("File has reached the limit on maximum number of"
+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+ "): " + f.numBlocks() + " >= " + fsn.maxBlocksPerFile);
}
blockSize = f.blockSize();
clientMachine = f.clientMachine();
replication = f.replication();
storagePolicyID = f.storagePolicyId();
return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
clientMachine);
}
}
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
DatanodeStorageInfo[] locs, long offset) throws IOException {
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
locs, offset, false);
fsn.getBlockManager().setBlockToken(lBlk,
BlockTokenIdentifier.AccessMode.WRITE);
return lBlk;
}
/**
* Part II of getAdditionalBlock().
* Should repeat the same analysis of the file state as in Part 1,
* but under the write lock.
* If the conditions still hold, then allocate a new block with
* the new targets, add it to the INode and to the BlocksMap.
*/
static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
long fileId, String clientName, ExtendedBlock previous,
DatanodeStorageInfo[] targets) throws IOException {
long offset;
// Run the full analysis again, since things could have changed
// while chooseTarget() was executing.
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FSDirectory fsd = fsn.getFSDirectory();
BlockManager bm = fsn.getBlockManager();
try (RWTransaction tx = fsd.newRWTransaction().begin()) {
FileState fileState = analyzeFileState(tx, fsn, src, fileId, clientName,
previous, onRetryBlock);
final FlatINode inode = fileState.inode;
FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
src = fileState.path;
if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations.
return onRetryBlock[0];
} else {
// add new chosen targets to already allocated block and return
Block lastBlock = file.lastBlock();
BlockInfoContiguous lastBlockInFile = bm.getStoredBlock(lastBlock);
((BlockInfoContiguousUnderConstruction) lastBlockInFile)
.setExpectedLocations(targets);
offset = file.fileSize();
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
}
}
// commit the last block and complete it if it has minimum replicas
FlatINodeFileFeature.Builder newFile = fsn.commitOrCompleteLastBlock(
file, ExtendedBlock.getLocalBlock(previous));
// allocate new block, record block locations in INode.
Block newBlock = fsn.createNewBlock();
saveAllocatedBlock(fsn, src, inode, newBlock, targets);
FlatINode newInode = persistNewBlock(tx, src, inode, newFile, newBlock);
offset = newInode.<FlatINodeFileFeature>feature(
FlatINodeFileFeature.class).fileSize();
// TODO: Update quota
// check quota limits and updated space consumed
// fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
// fileINode.getFileReplication(), true);
tx.commit();
// Return located block
return makeLocatedBlock(fsn, newBlock, targets, offset);
}
}
static DatanodeStorageInfo[] chooseTargetForNewBlock(
BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[]
favoredNodes, ValidateAddBlockResult r) throws IOException {
Node clientNode = bm.getDatanodeManager()
.getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
clientNode = getClientNode(bm, r.clientMachine);
}
Set<Node> excludedNodesSet = null;
if (excludedNodes != null) {
excludedNodesSet = new HashSet<>(excludedNodes.length);
Collections.addAll(excludedNodesSet, excludedNodes);
}
List<String> favoredNodesList = (favoredNodes == null) ? null
: Arrays.asList(favoredNodes);
// choose targets for the new block to be allocated.
return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID);
}
/**
* Resolve clientmachine address to get a network location path
*/
static Node getClientNode(BlockManager bm, String clientMachine) {
List<String> hosts = new ArrayList<>(1);
hosts.add(clientMachine);
List<String> rName = bm.getDatanodeManager()
.resolveNetworkLocation(hosts);
Node clientNode = null;
if (rName != null) {
// Able to resolve clientMachine mapping.
// Create a temp node to findout the rack local nodes
clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
+ clientMachine);
}
return clientNode;
}
/**
* Create a new file or overwrite an existing file<br>
*
* Once the file is create the client then allocates a new block with the next
* call using {@link ClientProtocol#addBlock}.
* <p>
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create}
*/
static HdfsFileStatus startFile(
FSNamesystem fsn, FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,
EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean logRetryEntry)
throws IOException {
assert fsn.hasWriteLock();
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
CipherSuite suite = null;
CryptoProtocolVersion version = null;
KeyProviderCryptoExtension.EncryptedKeyVersion edek = null;
if (ezInfo != null) {
edek = ezInfo.edek;
suite = ezInfo.suite;
version = ezInfo.protocolVersion;
}
boolean isRawPath = FSDirectory.isReservedRawName(src);
FSDirectory fsd = fsn.getFSDirectory();
final StringMap ugid = fsd.ugid();
try (RWTransaction tx = fsd.newRWTransaction().begin()) {
Resolver.Result paths = Resolver.resolve(tx, src);
if (paths.invalidPath()) {
throw new InvalidPathException(src);
}
final FlatINodesInPath iip = paths.inodesInPath();
// Verify that the destination does not exist as a directory already.
if (paths.ok()) {
FlatINode inode = paths.inodesInPath().getLastINode();
if (inode.isDirectory()) {
throw new FileAlreadyExistsException(src +
" already exists as a directory");
}
if (fsd.isPermissionEnabled()) {
if (overwrite) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
}
}
if (fsd.isPermissionEnabled()) {
/*
* To overwrite existing file, need to check 'w' permission
* of parent (equals to ancestor in this case)
*/
fsd.checkAncestorAccess(pc, paths, FsAction.WRITE);
}
if (!createParent && FlatNSUtil.hasNextLevelInPath(paths.src, paths
.offset)) {
throw new FileNotFoundException(paths.src.substring(0, paths.offset));
}
if (paths.notFound() && !create) {
throw new FileNotFoundException("Can't overwrite non-existent " +
src + " for client " + clientMachine);
}
// TODO: Handle encryption
FileEncryptionInfo feInfo = null;
// final EncryptionZone zone = fsd.getEZForPath(iip);
// if (zone != null) {
// // The path is now within an EZ, but we're missing encryption parameters
// if (suite == null || edek == null) {
// throw new RetryStartFileException();
// }
// // Path is within an EZ and we have provided encryption parameters.
// // Make sure that the generated EDEK matches the settings of the EZ.
// final String ezKeyName = zone.getKeyName();
// if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
// throw new RetryStartFileException();
// }
// feInfo = new FileEncryptionInfo(suite, version,
// edek.getEncryptedKeyVersion().getMaterial(),
// edek.getEncryptedKeyIv(),
// ezKeyName, edek.getEncryptionKeyVersionName());
// }
if (paths.ok()) {
if (overwrite) {
// TODO
List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
long ret = FSDirDeleteOp.delete(tx, paths, toRemoveBlocks,
toRemoveUCFiles, now());
if (ret >= 0) {
FSDirDeleteOp.incrDeletedFileCount(ret);
fsn.removeLeases(toRemoveUCFiles);
}
} else {
// TODO
// If lease soft limit time is expired, recover the lease
// fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
// src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
}
}
fsn.checkFsObjectLimit();
paths = Resolver.resolve(tx, src);
Map.Entry<FlatINodesInPath, String> parent = FSDirMkdirOp
.createAncestorDirectories(tx, fsd, paths, permissions);
long newId = tx.allocateNewInodeId();
FlatINodeFileFeature.Builder fileFeatureBuilder = new FlatINodeFileFeature
.Builder()
.replication(replication)
.blockSize(blockSize)
.inConstruction(true)
.clientName(holder)
.clientMachine(clientMachine);
setNewINodeStoragePolicy(fsn.getBlockManager(), fileFeatureBuilder,
isLazyPersist);
int userId = tx.getStringId(permissions.getUserName());
FlatINode parentINode = parent.getKey().getLastINode();
int groupId = permissions.getGroupName() == null
? parentINode.groupId()
: tx.getStringId(permissions.getGroupName());
FlatINodeFileFeature fileFeature = FlatINodeFileFeature.wrap(
fileFeatureBuilder.build());
ByteString b = new FlatINode.Builder()
.id(newId)
.type(FlatINode.Type.FILE)
.parentId(parentINode.id())
.mtime(now())
.userId(userId)
.groupId(groupId)
.permission(permissions.getPermission().toShort())
.addFeature(fileFeature)
.build();
FlatINode newNode = FlatINode.wrap(b);
// TODO: check .reserved path, quotas and ACL
byte[] localName = parent.getValue().getBytes(Charsets.UTF_8);
tx.putINode(newId, b);
tx.putChild(parentINode.id(), ByteBuffer.wrap(localName), newId);
ByteString newParent = new FlatINode.Builder().mergeFrom(parentINode)
.mtime(now()).build();
tx.putINode(parentINode.id(), newParent);
fsn.leaseManager.addLease(holder, newId);
// if (feInfo != null) {
// fsd.setFileEncryptionInfo(src, feInfo);
// newNode = fsd.getInode(newNode.getId()).asFile();
// }
tx.logOpenFile(fsd.ugid(), src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newId + " " + holder);
}
tx.commit();
return FSDirStatAndListingOp.createFileStatus(
tx, fsd, newNode, localName, fileFeature.storagePolicyId());
}
}
static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
FSPermissionChecker pc, String src,
CryptoProtocolVersion[] supportedVersions)
throws IOException {
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSDirectory fsd = fsn.getFSDirectory();
src = fsd.resolvePath(pc, src, pathComponents);
INodesInPath iip = fsd.getINodesInPath4Write(src);
// Nothing to do if the path is not within an EZ
final EncryptionZone zone = fsd.getEZForPath(iip);
if (zone == null) {
return null;
}
CryptoProtocolVersion protocolVersion = fsn.chooseProtocolVersion(
zone, supportedVersions);
CipherSuite suite = zone.getSuite();
String ezKeyName = zone.getKeyName();
Preconditions.checkNotNull(protocolVersion);
Preconditions.checkNotNull(suite);
Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
"Chose an UNKNOWN CipherSuite!");
Preconditions.checkNotNull(ezKeyName);
return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName);
}
static INodeFile addFileForEditLog(
FSDirectory fsd, long id, INodesInPath existing, byte[] localName,
PermissionStatus permissions, List<AclEntry> aclEntries,
List<XAttr> xAttrs, short replication, long modificationTime, long atime,
long preferredBlockSize, boolean underConstruction, String clientName,
String clientMachine, byte storagePolicyId) {
final INodeFile newNode;
assert fsd.hasWriteLock();
if (underConstruction) {
newNode = newINodeFile(id, permissions, modificationTime, modificationTime, replication,
preferredBlockSize,
storagePolicyId);
newNode.toUnderConstruction(clientName, clientMachine);
} else {
newNode = newINodeFile(id, permissions, modificationTime,
atime, replication,
preferredBlockSize,
storagePolicyId);
}
newNode.setLocalName(localName);
try {
INodesInPath iip = fsd.addINode(existing, newNode);
if (iip != null) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
}
if (xAttrs != null) {
XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
}
return newNode;
}
} catch (IOException e) {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"DIR* FSDirectory.unprotectedAddFile: exception when add "
+ existing.getPath() + " to the file system", e);
}
}
return null;
}
/**
* Add a block to the file. Returns a reference to the added block.
*/
private static BlockInfoContiguous addBlock(
BlockManager bm, String path, FlatINode inode, Block block,
DatanodeStorageInfo[] targets) throws IOException {
FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class);
Preconditions.checkState(f.inConstruction());
// associate new last block for the file
BlockInfoContiguousUnderConstruction blockInfo =
new BlockInfoContiguousUnderConstruction(
block, f.replication(),
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
targets);
bm.addBlockCollection(blockInfo, inode.id());
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: " + path
+ " with " + block + " block is added to the in-memory file system");
}
return blockInfo;
}
private static FileState analyzeFileState(
Transaction tx, FSNamesystem fsn, String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
throws IOException {
assert fsn.hasReadLock();
BlockManager bm = fsn.getBlockManager();
checkBlock(fsn, previous);
onRetryBlock[0] = null;
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
final Resolver.Result paths;
if (true || fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
paths = Resolver.resolve(tx, src);
} else {
// Newer clients pass the inode ID, so we can just get the inode
// directly.
paths = Resolver.resolveById(tx, fileId);
}
if (paths.invalidPath()) {
throw new InvalidPathException(src);
} else if (paths.notFound()) {
throw new FileNotFoundException(src);
}
FlatINode inode = paths.inodesInPath().getLastINode();
fsn.checkLease(src, clientName, inode);
FlatINodeFileFeature pendingFile = inode.feature(FlatINodeFileFeature.class);
BlockInfoContiguous lastBlockInFile = pendingFile.lastBlock() == null ?
null : bm.getStoredBlock(pendingFile.lastBlock());
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
// four possibilities:
// 1) This is the first block allocation of an append() pipeline
// which started appending exactly at or exceeding the block boundary.
// In this case, the client isn't passed the previous block,
// so it makes the allocateBlock() call with previous=null.
// We can distinguish this since the last block of the file
// will be exactly a full block.
// 2) This is a retry from a client that missed the response of a
// prior getAdditionalBlock() call, perhaps because of a network
// timeout, or because of an HA failover. In that case, we know
// by the fact that the client is re-issuing the RPC that it
// never began to write to the old block. Hence it is safe to
// to return the existing block.
// 3) This is an entirely bogus request/bug -- we should error out
// rather than potentially appending a new block with an empty
// one in the middle, etc
// 4) This is a retry from a client that timed out while
// the prior getAdditionalBlock() is still being processed,
// currently working on chooseTarget().
// There are no means to distinguish between the first and
// the second attempts in Part I, because the first one hasn't
// changed the namesystem state yet.
// We run this analysis again in Part II where case 4 is impossible.
BlockInfoContiguous penultimateBlock = bm.getStoredBlock(
pendingFile.penultimateBlock());
if (previous == null &&
lastBlockInFile != null &&
lastBlockInFile.getNumBytes() >= pendingFile.blockSize() &&
lastBlockInFile.isComplete()) {
// Case 1
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
" writing to a file with a complete previous block: src=" +
src + " lastBlock=" + lastBlockInFile);
}
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
if (lastBlockInFile.getNumBytes() != 0) {
throw new IOException(
"Request looked like a retry to allocate block " +
lastBlockInFile + " but it already contains " +
lastBlockInFile.getNumBytes() + " bytes");
}
// Case 2
// Return the last block.
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
"caught retry for allocation of a new block in " +
src + ". Returning previously allocated block " + lastBlockInFile);
long offset = pendingFile.fileSize();
BlockInfoContiguousUnderConstruction lastBlockUC =
(BlockInfoContiguousUnderConstruction) lastBlockInFile;
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
lastBlockUC.getExpectedStorageLocations(), offset);
return new FileState(inode, src);
} else {
// Case 3
throw new IOException("Cannot allocate block in " + src + ": " +
"passed 'previous' block " + previous + " does not match actual " +
"last block in file " + lastBlockInFile);
}
}
return new FileState(inode, src);
}
static boolean completeFile(
FSNamesystem fsn, FSPermissionChecker pc, final String srcArg,
String holder, ExtendedBlock last, long fileId) throws IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
checkBlock(fsn, last);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsn.dir.resolvePath(pc, src, pathComponents);
boolean success = completeFileInternal(fsn, src, holder,
ExtendedBlock.getLocalBlock(last),
fileId);
if (success) {
NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
+ " is closed by " + holder);
}
return success;
}
private static boolean completeFileInternal(
FSNamesystem fsn, String src, String holder, Block last, long fileId)
throws IOException {
assert fsn.hasWriteLock();
FSDirectory fsd = fsn.getFSDirectory();
try (RWTransaction tx = fsd.newRWTransaction().begin()) {
final Resolver.Result paths;
if (true || fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
paths = Resolver.resolve(tx, src);
} else {
// Newer clients pass the inode ID, so we can just get the inode
// directly.
paths = Resolver.resolveById(tx, fileId);
}
if (paths.invalidPath()) {
throw new InvalidPathException(src);
} else if (paths.notFound()) {
throw new FileNotFoundException(src);
}
FlatINode inode = paths.inodesInPath().getLastINode();
FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
try {
fsn.checkLease(src, holder, inode);
} catch (LeaseExpiredException lee) {
if (file != null && !file.inConstruction()) {
// This could be a retry RPC - i.e the client tried to close
// the file, but missed the RPC response. Thus, it is trying
// again to close the file. If the file still exists and
// the client's view of the last block matches the actual
// last block, then we'll treat it as a successful close.
// See HDFS-3031.
final Block realLastBlock = file.lastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* completeFile: request from "
+ holder + " to complete inode " + fileId + "(" + src + ") "
+ "which is already closed. But, it appears to be an RPC "
+ "retry. Returning success");
return true;
}
}
throw lee;
}
// Check the state of the penultimate block. It should be completed
// before attempting to complete the last one.
if (!fsn.checkFileProgress(src, file, false)) {
return false;
}
// commit the last block and complete it if it has minimum replicas
FlatINodeFileFeature.Builder newFile =
fsn.commitOrCompleteLastBlock(file, last);
if (!fsn.checkFileProgress(src, file, true)) {
return false;
}
FlatINode.Builder newINode = new FlatINode.Builder().mergeFrom(inode);
fsn.finalizeINodeFileUnderConstruction(tx, src, newINode, newFile);
tx.commit();
return true;
}
}
private static INodeFile newINodeFile(
long id, PermissionStatus permissions, long mtime, long atime,
short replication, long preferredBlockSize, byte storagePolicyId) {
return new INodeFile(id, null, permissions, mtime, atime,
BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
storagePolicyId);
}
/**
* Persist the new block (the last block of the given file).
*/
private static FlatINode persistNewBlock(
RWTransaction tx, String path, FlatINode inode,
FlatINodeFileFeature.Builder newFile, Block newBlock) {
Preconditions.checkArgument(newFile.inConstruction());
newFile.addBlock(newBlock);
FlatINodeFileFeature newFeature = FlatINodeFileFeature.wrap(newFile.build());
FlatINode.Builder builder = new FlatINode.Builder()
.mergeFrom(inode).replaceFeature(newFeature);
ByteString newFileBytes = builder.build();
FlatINode newInode = FlatINode.wrap(newFileBytes);
tx.putINode(inode.id(), newFileBytes);
tx.logAddBlock(path, newFeature);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("persistNewBlock: " + path
+ " with new block " + newBlock + ", current total block count is "
+ newFeature.numBlocks());
}
return newInode;
}
/**
* Save allocated block at the given pending filename
*
* @param fsn FSNamesystem
* @param src path to the file
* @param inode the file
* @param newBlock newly allocated block to be save
* @param targets target datanodes where replicas of the new block is placed
* @throws QuotaExceededException If addition of block exceeds space quota
*/
private static void saveAllocatedBlock(
FSNamesystem fsn, String src, FlatINode inode, Block newBlock,
DatanodeStorageInfo[] targets)
throws IOException {
assert fsn.hasWriteLock();
BlockManager bm = fsn.getBlockManager();
BlockInfoContiguous b = addBlock(bm, src, inode, newBlock, targets);
NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
DatanodeStorageInfo.incrementBlocksScheduled(targets);
}
private static void setNewINodeStoragePolicy(
BlockManager bm, FlatINodeFileFeature.Builder file, boolean isLazyPersist)
throws IOException {
if (isLazyPersist) {
BlockStoragePolicy lpPolicy =
bm.getStoragePolicy("LAZY_PERSIST");
// Set LAZY_PERSIST storage policy if the flag was passed to
// CreateFile.
if (lpPolicy == null) {
throw new HadoopIllegalArgumentException(
"The LAZY_PERSIST storage policy has been disabled " +
"by the administrator.");
}
file.storagePolicyId(lpPolicy.getId());
} else {
// TODO: handle effective storage policy id
// BlockStoragePolicy effectivePolicy =
// bm.getStoragePolicy(parent.getLastINode().storagePolicyId());
//
// if (effectivePolicy != null &&
// effectivePolicy.isCopyOnCreateFile()) {
// // Copy effective policy from ancestor directory to current file.
// file.storagePolicyId(effectivePolicy.getId());
// }
}
}
private static class FileState {
final FlatINode inode;
final String path;
FileState(FlatINode inode, String fullPath) {
this.inode = inode;
this.path = fullPath;
}
}
static class ValidateAddBlockResult {
final long blockSize;
final int replication;
final byte storagePolicyID;
final String clientMachine;
ValidateAddBlockResult(
long blockSize, int replication, byte storagePolicyID,
String clientMachine) {
this.blockSize = blockSize;
this.replication = replication;
this.storagePolicyID = storagePolicyID;
this.clientMachine = clientMachine;
}
}
static class EncryptionKeyInfo {
final CryptoProtocolVersion protocolVersion;
final CipherSuite suite;
final String ezKeyName;
KeyProviderCryptoExtension.EncryptedKeyVersion edek;
EncryptionKeyInfo(
CryptoProtocolVersion protocolVersion, CipherSuite suite,
String ezKeyName) {
this.protocolVersion = protocolVersion;
this.suite = suite;
this.ezKeyName = ezKeyName;
}
}
}