| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ClosedChannelException; |
| import java.util.EnumSet; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.hadoop.HadoopIllegalArgumentException; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.crypto.CryptoProtocolVersion; |
| import org.apache.hadoop.fs.CanSetDropBehind; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FSOutputSummer; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FileEncryptionInfo; |
| import org.apache.hadoop.fs.ParentNotDirectoryException; |
| import org.apache.hadoop.fs.StreamCapabilities; |
| import org.apache.hadoop.fs.Syncable; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; |
| import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; |
| import org.apache.hadoop.hdfs.client.impl.DfsClientConf; |
| import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; |
| import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; |
| import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; |
| import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; |
| import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; |
| import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; |
| import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; |
| import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; |
| import org.apache.hadoop.hdfs.server.namenode.SafeModeException; |
| import org.apache.hadoop.hdfs.util.ByteArrayManager; |
| import org.apache.hadoop.io.EnumSetWritable; |
| import org.apache.hadoop.io.MultipleIOException; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.DataChecksum; |
| import org.apache.hadoop.util.DataChecksum.Type; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Time; |
| import org.apache.htrace.core.TraceScope; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| |
| /**************************************************************** |
| * DFSOutputStream creates files from a stream of bytes. |
| * |
| * The client application writes data that is cached internally by |
| * this stream. Data is broken up into packets, each packet is |
| * typically 64K in size. A packet comprises of chunks. Each chunk |
| * is typically 512 bytes and has an associated checksum with it. |
| * |
| * When a client application fills up the currentPacket, it is |
| * enqueued into the dataQueue of DataStreamer. DataStreamer is a |
| * thread that picks up packets from the dataQueue and sends it to |
| * the first datanode in the pipeline. |
| * |
| ****************************************************************/ |
| @InterfaceAudience.Private |
| public class DFSOutputStream extends FSOutputSummer |
| implements Syncable, CanSetDropBehind, StreamCapabilities { |
| static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class); |
| /** |
| * Number of times to retry creating a file when there are transient |
| * errors (typically related to encryption zones and KeyProvider operations). |
| */ |
| @VisibleForTesting |
| static final int CREATE_RETRY_COUNT = 10; |
| @VisibleForTesting |
| static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS = |
| CryptoProtocolVersion.supported(); |
| |
| protected final DFSClient dfsClient; |
| protected final ByteArrayManager byteArrayManager; |
| // closed is accessed by different threads under different locks. |
| protected volatile boolean closed = false; |
| |
| protected final String src; |
| protected final long fileId; |
| protected final long blockSize; |
| protected final int bytesPerChecksum; |
| |
| protected DFSPacket currentPacket = null; |
| protected DataStreamer streamer; |
| protected int packetSize = 0; // write packet size, not including the header. |
| protected int chunksPerPacket = 0; |
| protected long lastFlushOffset = 0; // offset when flush was invoked |
| private long initialFileSize = 0; // at time of file open |
| private final short blockReplication; // replication factor of file |
| protected boolean shouldSyncBlock = false; // force blocks to disk upon close |
| private final EnumSet<AddBlockFlag> addBlockFlags; |
| protected final AtomicReference<CachingStrategy> cachingStrategy; |
| private FileEncryptionInfo fileEncryptionInfo; |
| private int writePacketSize; |
| |
| /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ |
| protected DFSPacket createPacket(int packetSize, int chunksPerPkt, |
| long offsetInBlock, long seqno, boolean lastPacketInBlock) |
| throws InterruptedIOException { |
| final byte[] buf; |
| final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; |
| |
| try { |
| buf = byteArrayManager.newByteArray(bufferSize); |
| } catch (InterruptedException ie) { |
| final InterruptedIOException iioe = new InterruptedIOException( |
| "seqno=" + seqno); |
| iioe.initCause(ie); |
| throw iioe; |
| } |
| |
| return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno, |
| getChecksumSize(), lastPacketInBlock); |
| } |
| |
| @Override |
| protected void checkClosed() throws IOException { |
| if (isClosed()) { |
| getStreamer().getLastException().throwException4Close(); |
| } |
| } |
| |
| // |
| // returns the list of targets, if any, that is being currently used. |
| // |
| @VisibleForTesting |
| public synchronized DatanodeInfo[] getPipeline() { |
| if (getStreamer().streamerClosed()) { |
| return null; |
| } |
| DatanodeInfo[] currentNodes = getStreamer().getNodes(); |
| if (currentNodes == null) { |
| return null; |
| } |
| DatanodeInfo[] value = new DatanodeInfo[currentNodes.length]; |
| System.arraycopy(currentNodes, 0, value, 0, currentNodes.length); |
| return value; |
| } |
| |
| /** |
| * @return the object for computing checksum. |
| * The type is NULL if checksum is not computed. |
| */ |
| private static DataChecksum getChecksum4Compute(DataChecksum checksum, |
| HdfsFileStatus stat) { |
| if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) { |
| // do not compute checksum for writing to single replica to memory |
| return DataChecksum.newDataChecksum(Type.NULL, |
| checksum.getBytesPerChecksum()); |
| } |
| return checksum; |
| } |
| |
| private DFSOutputStream(DFSClient dfsClient, String src, |
| EnumSet<CreateFlag> flag, |
| Progressable progress, HdfsFileStatus stat, DataChecksum checksum) { |
| super(getChecksum4Compute(checksum, stat)); |
| this.dfsClient = dfsClient; |
| this.src = src; |
| this.fileId = stat.getFileId(); |
| this.blockSize = stat.getBlockSize(); |
| this.blockReplication = stat.getReplication(); |
| this.fileEncryptionInfo = stat.getFileEncryptionInfo(); |
| this.cachingStrategy = new AtomicReference<>( |
| dfsClient.getDefaultWriteCachingStrategy()); |
| this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class); |
| if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) { |
| this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE); |
| } |
| if (progress != null) { |
| DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream " |
| +"{}", src); |
| } |
| |
| initWritePacketSize(); |
| |
| this.bytesPerChecksum = checksum.getBytesPerChecksum(); |
| if (bytesPerChecksum <= 0) { |
| throw new HadoopIllegalArgumentException( |
| "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0"); |
| } |
| if (blockSize % bytesPerChecksum != 0) { |
| throw new HadoopIllegalArgumentException("Invalid values: " |
| + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY |
| + " (=" + bytesPerChecksum + ") must divide block size (=" + |
| blockSize + ")."); |
| } |
| this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); |
| } |
| |
| /** |
| * Ensures the configured writePacketSize never exceeds |
| * PacketReceiver.MAX_PACKET_SIZE. |
| */ |
| private void initWritePacketSize() { |
| writePacketSize = dfsClient.getConf().getWritePacketSize(); |
| if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) { |
| LOG.warn( |
| "Configured write packet exceeds {} bytes as max," |
| + " using {} bytes.", |
| PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE); |
| writePacketSize = PacketReceiver.MAX_PACKET_SIZE; |
| } |
| } |
| |
| /** Construct a new output stream for creating a file. */ |
| protected DFSOutputStream(DFSClient dfsClient, String src, |
| HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, |
| DataChecksum checksum, String[] favoredNodes, boolean createStreamer) { |
| this(dfsClient, src, flag, progress, stat, checksum); |
| this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); |
| |
| computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), |
| bytesPerChecksum); |
| |
| if (createStreamer) { |
| streamer = new DataStreamer(stat, null, dfsClient, src, progress, |
| checksum, cachingStrategy, byteArrayManager, favoredNodes, |
| addBlockFlags); |
| } |
| } |
| |
| static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, |
| FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, |
| short replication, long blockSize, Progressable progress, |
| DataChecksum checksum, String[] favoredNodes, String ecPolicyName) |
| throws IOException { |
| try (TraceScope ignored = |
| dfsClient.newPathTraceScope("newStreamForCreate", src)) { |
| HdfsFileStatus stat = null; |
| |
| // Retry the create if we get a RetryStartFileException up to a maximum |
| // number of times |
| boolean shouldRetry = true; |
| int retryCount = CREATE_RETRY_COUNT; |
| while (shouldRetry) { |
| shouldRetry = false; |
| try { |
| stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, |
| new EnumSetWritable<>(flag), createParent, replication, |
| blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); |
| break; |
| } catch (RemoteException re) { |
| IOException e = re.unwrapRemoteException( |
| AccessControlException.class, |
| DSQuotaExceededException.class, |
| QuotaByStorageTypeExceededException.class, |
| FileAlreadyExistsException.class, |
| FileNotFoundException.class, |
| ParentNotDirectoryException.class, |
| NSQuotaExceededException.class, |
| RetryStartFileException.class, |
| SafeModeException.class, |
| UnresolvedPathException.class, |
| SnapshotAccessControlException.class, |
| UnknownCryptoProtocolVersionException.class); |
| if (e instanceof RetryStartFileException) { |
| if (retryCount > 0) { |
| shouldRetry = true; |
| retryCount--; |
| } else { |
| throw new IOException("Too many retries because of encryption" + |
| " zone operations", e); |
| } |
| } else { |
| throw e; |
| } |
| } |
| } |
| Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); |
| final DFSOutputStream out; |
| if(stat.getErasureCodingPolicy() != null) { |
| out = new DFSStripedOutputStream(dfsClient, src, stat, |
| flag, progress, checksum, favoredNodes); |
| } else { |
| out = new DFSOutputStream(dfsClient, src, stat, |
| flag, progress, checksum, favoredNodes, true); |
| } |
| out.start(); |
| return out; |
| } |
| } |
| |
| /** Construct a new output stream for append. */ |
| private DFSOutputStream(DFSClient dfsClient, String src, |
| EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, |
| HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) |
| throws IOException { |
| this(dfsClient, src, flags, progress, stat, checksum); |
| initialFileSize = stat.getLen(); // length of file when opened |
| this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); |
| |
| boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK); |
| |
| this.fileEncryptionInfo = stat.getFileEncryptionInfo(); |
| |
| // The last partial block of the file has to be filled. |
| if (!toNewBlock && lastBlock != null) { |
| // indicate that we are appending to an existing block |
| streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, |
| checksum, cachingStrategy, byteArrayManager); |
| getStreamer().setBytesCurBlock(lastBlock.getBlockSize()); |
| adjustPacketChunkSize(stat); |
| getStreamer().setPipelineInConstruction(lastBlock); |
| } else { |
| computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), |
| bytesPerChecksum); |
| streamer = new DataStreamer(stat, |
| lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src, |
| progress, checksum, cachingStrategy, byteArrayManager, favoredNodes, |
| addBlockFlags); |
| } |
| } |
| |
| private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{ |
| |
| long usedInLastBlock = stat.getLen() % blockSize; |
| int freeInLastBlock = (int)(blockSize - usedInLastBlock); |
| |
| // calculate the amount of free space in the pre-existing |
| // last crc chunk |
| int usedInCksum = (int)(stat.getLen() % bytesPerChecksum); |
| int freeInCksum = bytesPerChecksum - usedInCksum; |
| |
| // if there is space in the last block, then we have to |
| // append to that block |
| if (freeInLastBlock == blockSize) { |
| throw new IOException("The last block for file " + |
| src + " is full."); |
| } |
| |
| if (usedInCksum > 0 && freeInCksum > 0) { |
| // if there is space in the last partial chunk, then |
| // setup in such a way that the next packet will have only |
| // one chunk that fills up the partial chunk. |
| // |
| computePacketChunkSize(0, freeInCksum); |
| setChecksumBufSize(freeInCksum); |
| getStreamer().setAppendChunk(true); |
| } else { |
| // if the remaining space in the block is smaller than |
| // that expected size of of a packet, then create |
| // smaller size packet. |
| // |
| computePacketChunkSize( |
| Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock), |
| bytesPerChecksum); |
| } |
| } |
| |
| static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, |
| EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, |
| HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) |
| throws IOException { |
| if(stat.getErasureCodingPolicy() != null) { |
| throw new IOException( |
| "Not support appending to a striping layout file yet."); |
| } |
| try (TraceScope ignored = |
| dfsClient.newPathTraceScope("newStreamForAppend", src)) { |
| final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, |
| progress, lastBlock, stat, checksum, favoredNodes); |
| out.start(); |
| return out; |
| } |
| } |
| |
| protected void computePacketChunkSize(int psize, int csize) { |
| final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN; |
| final int chunkSize = csize + getChecksumSize(); |
| chunksPerPacket = Math.max(bodySize/chunkSize, 1); |
| packetSize = chunkSize*chunksPerPacket; |
| DFSClient.LOG.debug("computePacketChunkSize: src={}, chunkSize={}, " |
| + "chunksPerPacket={}, packetSize={}", |
| src, chunkSize, chunksPerPacket, packetSize); |
| } |
| |
| protected TraceScope createWriteTraceScope() { |
| return dfsClient.newPathTraceScope("DFSOutputStream#write", src); |
| } |
| |
| // @see FSOutputSummer#writeChunk() |
| @Override |
| protected synchronized void writeChunk(byte[] b, int offset, int len, |
| byte[] checksum, int ckoff, int cklen) throws IOException { |
| writeChunkPrepare(len, ckoff, cklen); |
| |
| currentPacket.writeChecksum(checksum, ckoff, cklen); |
| currentPacket.writeData(b, offset, len); |
| currentPacket.incNumChunks(); |
| getStreamer().incBytesCurBlock(len); |
| |
| // If packet is full, enqueue it for transmission |
| if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || |
| getStreamer().getBytesCurBlock() == blockSize) { |
| enqueueCurrentPacketFull(); |
| } |
| } |
| |
| /* write the data chunk in <code>buffer</code> staring at |
| * <code>buffer.position</code> with |
| * a length of <code>len > 0</code>, and its checksum |
| */ |
| protected synchronized void writeChunk(ByteBuffer buffer, int len, |
| byte[] checksum, int ckoff, int cklen) throws IOException { |
| writeChunkPrepare(len, ckoff, cklen); |
| |
| currentPacket.writeChecksum(checksum, ckoff, cklen); |
| currentPacket.writeData(buffer, len); |
| currentPacket.incNumChunks(); |
| getStreamer().incBytesCurBlock(len); |
| |
| // If packet is full, enqueue it for transmission |
| if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || |
| getStreamer().getBytesCurBlock() == blockSize) { |
| enqueueCurrentPacketFull(); |
| } |
| } |
| |
| private synchronized void writeChunkPrepare(int buflen, |
| int ckoff, int cklen) throws IOException { |
| dfsClient.checkOpen(); |
| checkClosed(); |
| |
| if (buflen > bytesPerChecksum) { |
| throw new IOException("writeChunk() buffer size is " + buflen + |
| " is larger than supported bytesPerChecksum " + |
| bytesPerChecksum); |
| } |
| if (cklen != 0 && cklen != getChecksumSize()) { |
| throw new IOException("writeChunk() checksum size is supposed to be " + |
| getChecksumSize() + " but found to be " + cklen); |
| } |
| |
| if (currentPacket == null) { |
| currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer() |
| .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false); |
| DFSClient.LOG.debug("WriteChunk allocating new packet seqno={}," |
| + " src={}, packetSize={}, chunksPerPacket={}, bytesCurBlock={}", |
| currentPacket.getSeqno(), src, packetSize, chunksPerPacket, |
| getStreamer().getBytesCurBlock() + ", " + this); |
| } |
| } |
| |
| void enqueueCurrentPacket() throws IOException { |
| getStreamer().waitAndQueuePacket(currentPacket); |
| currentPacket = null; |
| } |
| |
| synchronized void enqueueCurrentPacketFull() throws IOException { |
| LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," |
| + " appendChunk={}, {}", currentPacket, src, getStreamer() |
| .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), |
| getStreamer()); |
| enqueueCurrentPacket(); |
| adjustChunkBoundary(); |
| endBlock(); |
| } |
| |
| /** create an empty packet to mark the end of the block. */ |
| void setCurrentPacketToEmpty() throws InterruptedIOException { |
| currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), |
| getStreamer().getAndIncCurrentSeqno(), true); |
| currentPacket.setSyncBlock(shouldSyncBlock); |
| } |
| |
| /** |
| * If the reopened file did not end at chunk boundary and the above |
| * write filled up its partial chunk. Tell the summer to generate full |
| * crc chunks from now on. |
| */ |
| protected void adjustChunkBoundary() { |
| if (getStreamer().getAppendChunk() && |
| getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) { |
| getStreamer().setAppendChunk(false); |
| resetChecksumBufSize(); |
| } |
| |
| if (!getStreamer().getAppendChunk()) { |
| final int psize = (int) Math |
| .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize); |
| computePacketChunkSize(psize, bytesPerChecksum); |
| } |
| } |
| |
| /** |
| * Used in test only. |
| */ |
| @VisibleForTesting |
| void setAppendChunk(final boolean appendChunk) { |
| getStreamer().setAppendChunk(appendChunk); |
| } |
| |
| /** |
| * Used in test only. |
| */ |
| @VisibleForTesting |
| void setBytesCurBlock(final long bytesCurBlock) { |
| getStreamer().setBytesCurBlock(bytesCurBlock); |
| } |
| |
| /** |
| * if encountering a block boundary, send an empty packet to |
| * indicate the end of block and reset bytesCurBlock. |
| * |
| * @throws IOException |
| */ |
| void endBlock() throws IOException { |
| if (getStreamer().getBytesCurBlock() == blockSize) { |
| setCurrentPacketToEmpty(); |
| enqueueCurrentPacket(); |
| getStreamer().setBytesCurBlock(0); |
| lastFlushOffset = 0; |
| } |
| } |
| |
| @Override |
| public boolean hasCapability(String capability) { |
| switch (StringUtils.toLowerCase(capability)) { |
| case StreamCapabilities.HSYNC: |
| case StreamCapabilities.HFLUSH: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| /** |
| * Flushes out to all replicas of the block. The data is in the buffers |
| * of the DNs but not necessarily in the DN's OS buffers. |
| * |
| * It is a synchronous operation. When it returns, |
| * it guarantees that flushed data become visible to new readers. |
| * It is not guaranteed that data has been flushed to |
| * persistent store on the datanode. |
| * Block allocations are persisted on namenode. |
| */ |
| @Override |
| public void hflush() throws IOException { |
| try (TraceScope ignored = dfsClient.newPathTraceScope("hflush", src)) { |
| flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); |
| } |
| } |
| |
| @Override |
| public void hsync() throws IOException { |
| try (TraceScope ignored = dfsClient.newPathTraceScope("hsync", src)) { |
| flushOrSync(true, EnumSet.noneOf(SyncFlag.class)); |
| } |
| } |
| |
| /** |
| * The expected semantics is all data have flushed out to all replicas |
| * and all replicas have done posix fsync equivalent - ie the OS has |
| * flushed it to the disk device (but the disk may have it in its cache). |
| * |
| * Note that only the current block is flushed to the disk device. |
| * To guarantee durable sync across block boundaries the stream should |
| * be created with {@link CreateFlag#SYNC_BLOCK}. |
| * |
| * @param syncFlags |
| * Indicate the semantic of the sync. Currently used to specify |
| * whether or not to update the block length in NameNode. |
| */ |
| public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { |
| try (TraceScope ignored = dfsClient.newPathTraceScope("hsync", src)) { |
| flushOrSync(true, syncFlags); |
| } |
| } |
| |
| /** |
| * Flush/Sync buffered data to DataNodes. |
| * |
| * @param isSync |
| * Whether or not to require all replicas to flush data to the disk |
| * device |
| * @param syncFlags |
| * Indicate extra detailed semantic of the flush/sync. Currently |
| * mainly used to specify whether or not to update the file length in |
| * the NameNode |
| * @throws IOException |
| */ |
| private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags) |
| throws IOException { |
| dfsClient.checkOpen(); |
| checkClosed(); |
| try { |
| long toWaitFor; |
| long lastBlockLength = -1L; |
| boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH); |
| boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK); |
| synchronized (this) { |
| // flush checksum buffer, but keep checksum buffer intact if we do not |
| // need to end the current block |
| int numKept = flushBuffer(!endBlock, true); |
| // bytesCurBlock potentially incremented if there was buffered data |
| |
| DFSClient.LOG.debug("DFSClient flush(): bytesCurBlock={}, " |
| + "lastFlushOffset={}, createNewBlock={}", |
| getStreamer().getBytesCurBlock(), lastFlushOffset, endBlock); |
| // Flush only if we haven't already flushed till this offset. |
| if (lastFlushOffset != getStreamer().getBytesCurBlock()) { |
| assert getStreamer().getBytesCurBlock() > lastFlushOffset; |
| // record the valid offset of this flush |
| lastFlushOffset = getStreamer().getBytesCurBlock(); |
| if (isSync && currentPacket == null && !endBlock) { |
| // Nothing to send right now, |
| // but sync was requested. |
| // Send an empty packet if we do not end the block right now |
| currentPacket = createPacket(packetSize, chunksPerPacket, |
| getStreamer().getBytesCurBlock(), getStreamer() |
| .getAndIncCurrentSeqno(), false); |
| } |
| } else { |
| if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) { |
| // Nothing to send right now, |
| // and the block was partially written, |
| // and sync was requested. |
| // So send an empty sync packet if we do not end the block right |
| // now |
| currentPacket = createPacket(packetSize, chunksPerPacket, |
| getStreamer().getBytesCurBlock(), getStreamer() |
| .getAndIncCurrentSeqno(), false); |
| } else if (currentPacket != null) { |
| // just discard the current packet since it is already been sent. |
| currentPacket.releaseBuffer(byteArrayManager); |
| currentPacket = null; |
| } |
| } |
| if (currentPacket != null) { |
| currentPacket.setSyncBlock(isSync); |
| enqueueCurrentPacket(); |
| } |
| if (endBlock && getStreamer().getBytesCurBlock() > 0) { |
| // Need to end the current block, thus send an empty packet to |
| // indicate this is the end of the block and reset bytesCurBlock |
| currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), |
| getStreamer().getAndIncCurrentSeqno(), true); |
| currentPacket.setSyncBlock(shouldSyncBlock || isSync); |
| enqueueCurrentPacket(); |
| getStreamer().setBytesCurBlock(0); |
| lastFlushOffset = 0; |
| } else { |
| // Restore state of stream. Record the last flush offset |
| // of the last full chunk that was flushed. |
| getStreamer().setBytesCurBlock( |
| getStreamer().getBytesCurBlock() - numKept); |
| } |
| |
| toWaitFor = getStreamer().getLastQueuedSeqno(); |
| } // end synchronized |
| |
| getStreamer().waitForAckedSeqno(toWaitFor); |
| |
| // update the block length first time irrespective of flag |
| if (updateLength || getStreamer().getPersistBlocks().get()) { |
| synchronized (this) { |
| if (!getStreamer().streamerClosed() |
| && getStreamer().getBlock() != null) { |
| lastBlockLength = getStreamer().getBlock().getNumBytes(); |
| } |
| } |
| } |
| // If 1) any new blocks were allocated since the last flush, or 2) to |
| // update length in NN is required, then persist block locations on |
| // namenode. |
| if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) { |
| try { |
| dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, |
| lastBlockLength); |
| } catch (IOException ioe) { |
| DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, |
| ioe); |
| // If we got an error here, it might be because some other thread |
| // called close before our hflush completed. In that case, we should |
| // throw an exception that the stream is closed. |
| checkClosed(); |
| // If we aren't closed but failed to sync, we should expose that to |
| // the caller. |
| throw ioe; |
| } |
| } |
| |
| synchronized(this) { |
| if (!getStreamer().streamerClosed()) { |
| getStreamer().setHflush(); |
| } |
| } |
| } catch (InterruptedIOException interrupt) { |
| // This kind of error doesn't mean that the stream itself is broken - just |
| // the flushing thread got interrupted. So, we shouldn't close down the |
| // writer, but instead just propagate the error |
| throw interrupt; |
| } catch (IOException e) { |
| DFSClient.LOG.warn("Error while syncing", e); |
| synchronized (this) { |
| if (!isClosed()) { |
| getStreamer().getLastException().set(e); |
| closeThreads(true); |
| } |
| } |
| throw e; |
| } |
| } |
| |
| /** |
| * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}. |
| */ |
| @Deprecated |
| public synchronized int getNumCurrentReplicas() throws IOException { |
| return getCurrentBlockReplication(); |
| } |
| |
| /** |
| * Note that this is not a public API; |
| * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead. |
| * |
| * @return the number of valid replicas of the current block |
| */ |
| public synchronized int getCurrentBlockReplication() throws IOException { |
| dfsClient.checkOpen(); |
| checkClosed(); |
| if (getStreamer().streamerClosed()) { |
| return blockReplication; // no pipeline, return repl factor of file |
| } |
| DatanodeInfo[] currentNodes = getStreamer().getNodes(); |
| if (currentNodes == null) { |
| return blockReplication; // no pipeline, return repl factor of file |
| } |
| return currentNodes.length; |
| } |
| |
| /** |
| * Waits till all existing data is flushed and confirmations |
| * received from datanodes. |
| */ |
| protected void flushInternal() throws IOException { |
| long toWaitFor = flushInternalWithoutWaitingAck(); |
| getStreamer().waitForAckedSeqno(toWaitFor); |
| } |
| |
| protected synchronized void start() { |
| getStreamer().start(); |
| } |
| |
| /** |
| * Aborts this output stream and releases any system |
| * resources associated with this stream. |
| */ |
| void abort() throws IOException { |
| final MultipleIOException.Builder b = new MultipleIOException.Builder(); |
| synchronized (this) { |
| if (isClosed()) { |
| return; |
| } |
| getStreamer().getLastException().set( |
| new IOException("Lease timeout of " |
| + (dfsClient.getConf().getHdfsTimeout() / 1000) |
| + " seconds expired.")); |
| |
| try { |
| closeThreads(true); |
| } catch (IOException e) { |
| b.add(e); |
| } |
| } |
| final IOException ioe = b.build(); |
| if (ioe != null) { |
| throw ioe; |
| } |
| } |
| |
| boolean isClosed() { |
| return closed || getStreamer().streamerClosed(); |
| } |
| |
| void setClosed() { |
| closed = true; |
| dfsClient.endFileLease(fileId); |
| getStreamer().release(); |
| } |
| |
| // shutdown datastreamer and responseprocessor threads. |
| // interrupt datastreamer if force is true |
| protected void closeThreads(boolean force) throws IOException { |
| try { |
| getStreamer().close(force); |
| getStreamer().join(); |
| getStreamer().closeSocket(); |
| } catch (InterruptedException e) { |
| throw new IOException("Failed to shutdown streamer"); |
| } finally { |
| getStreamer().setSocketToNull(); |
| setClosed(); |
| } |
| } |
| |
| /** |
| * Closes this output stream and releases any system |
| * resources associated with this stream. |
| */ |
| @Override |
| public void close() throws IOException { |
| final MultipleIOException.Builder b = new MultipleIOException.Builder(); |
| synchronized (this) { |
| try (TraceScope ignored = dfsClient.newPathTraceScope( |
| "DFSOutputStream#close", src)) { |
| closeImpl(); |
| } catch (IOException e) { |
| b.add(e); |
| } |
| } |
| final IOException ioe = b.build(); |
| if (ioe != null) { |
| throw ioe; |
| } |
| } |
| |
| protected synchronized void closeImpl() throws IOException { |
| if (isClosed()) { |
| LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]", |
| closed, getStreamer().streamerClosed()); |
| try { |
| getStreamer().getLastException().check(true); |
| } catch (IOException ioe) { |
| cleanupAndRethrowIOException(ioe); |
| } finally { |
| if (!closed) { |
| // If stream is not closed but streamer closed, clean up the stream. |
| // Most importantly, end the file lease. |
| closeThreads(true); |
| } |
| } |
| return; |
| } |
| |
| try { |
| flushBuffer(); // flush from all upper layers |
| |
| if (currentPacket != null) { |
| enqueueCurrentPacket(); |
| } |
| |
| if (getStreamer().getBytesCurBlock() != 0) { |
| setCurrentPacketToEmpty(); |
| } |
| |
| try { |
| flushInternal(); // flush all data to Datanodes |
| } catch (IOException ioe) { |
| cleanupAndRethrowIOException(ioe); |
| } |
| completeFile(); |
| } catch (ClosedChannelException ignored) { |
| } finally { |
| // Failures may happen when flushing data. |
| // Streamers may keep waiting for the new block information. |
| // Thus need to force closing these threads. |
| // Don't need to call setClosed() because closeThreads(true) |
| // calls setClosed() in the finally block. |
| closeThreads(true); |
| } |
| } |
| |
| private void completeFile() throws IOException { |
| // get last block before destroying the streamer |
| ExtendedBlock lastBlock = getStreamer().getBlock(); |
| try (TraceScope ignored = |
| dfsClient.getTracer().newScope("completeFile")) { |
| completeFile(lastBlock); |
| } |
| } |
| |
| /** |
| * Determines whether an IOException thrown needs extra cleanup on the stream. |
| * Space quota exceptions will be thrown when getting new blocks, so the |
| * open HDFS file need to be closed. |
| * |
| * @param ioe the IOException |
| * @return whether the stream needs cleanup for the given IOException |
| */ |
| private boolean exceptionNeedsCleanup(IOException ioe) { |
| return ioe instanceof DSQuotaExceededException |
| || ioe instanceof QuotaByStorageTypeExceededException; |
| } |
| |
| private void cleanupAndRethrowIOException(IOException ioe) |
| throws IOException { |
| if (exceptionNeedsCleanup(ioe)) { |
| final MultipleIOException.Builder b = new MultipleIOException.Builder(); |
| b.add(ioe); |
| try { |
| completeFile(); |
| } catch (IOException e) { |
| b.add(e); |
| throw b.build(); |
| } |
| } |
| throw ioe; |
| } |
| |
| // should be called holding (this) lock since setTestFilename() may |
| // be called during unit tests |
| protected void completeFile(ExtendedBlock last) throws IOException { |
| long localstart = Time.monotonicNow(); |
| final DfsClientConf conf = dfsClient.getConf(); |
| long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); |
| boolean fileComplete = false; |
| int retries = conf.getNumBlockWriteLocateFollowingRetry(); |
| while (!fileComplete) { |
| fileComplete = |
| dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId); |
| if (!fileComplete) { |
| final int hdfsTimeout = conf.getHdfsTimeout(); |
| if (!dfsClient.clientRunning |
| || (hdfsTimeout > 0 |
| && localstart + hdfsTimeout < Time.monotonicNow())) { |
| String msg = "Unable to close file because dfsclient " + |
| " was unable to contact the HDFS servers. clientRunning " + |
| dfsClient.clientRunning + " hdfsTimeout " + hdfsTimeout; |
| DFSClient.LOG.info(msg); |
| throw new IOException(msg); |
| } |
| try { |
| if (retries == 0) { |
| throw new IOException("Unable to close file because the last block" |
| + last + " does not have enough number of replicas."); |
| } |
| retries--; |
| Thread.sleep(sleeptime); |
| sleeptime *= 2; |
| if (Time.monotonicNow() - localstart > 5000) { |
| DFSClient.LOG.info("Could not complete " + src + " retrying..."); |
| } |
| } catch (InterruptedException ie) { |
| DFSClient.LOG.warn("Caught exception ", ie); |
| } |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| public void setArtificialSlowdown(long period) { |
| getStreamer().setArtificialSlowdown(period); |
| } |
| |
| @VisibleForTesting |
| public synchronized void setChunksPerPacket(int value) { |
| chunksPerPacket = Math.min(chunksPerPacket, value); |
| packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket; |
| } |
| |
| /** |
| * Returns the size of a file as it was when this stream was opened |
| */ |
| public long getInitialLen() { |
| return initialFileSize; |
| } |
| |
| protected EnumSet<AddBlockFlag> getAddBlockFlags() { |
| return addBlockFlags; |
| } |
| |
| /** |
| * @return the FileEncryptionInfo for this stream, or null if not encrypted. |
| */ |
| public FileEncryptionInfo getFileEncryptionInfo() { |
| return fileEncryptionInfo; |
| } |
| |
| /** |
| * Returns the access token currently used by streamer, for testing only |
| */ |
| synchronized Token<BlockTokenIdentifier> getBlockToken() { |
| return getStreamer().getBlockToken(); |
| } |
| |
| protected long flushInternalWithoutWaitingAck() throws IOException { |
| long toWaitFor; |
| synchronized (this) { |
| dfsClient.checkOpen(); |
| checkClosed(); |
| // |
| // If there is data in the current buffer, send it across |
| // |
| getStreamer().queuePacket(currentPacket); |
| currentPacket = null; |
| toWaitFor = getStreamer().getLastQueuedSeqno(); |
| } |
| return toWaitFor; |
| } |
| |
| @Override |
| public void setDropBehind(Boolean dropBehind) throws IOException { |
| CachingStrategy prevStrategy, nextStrategy; |
| // CachingStrategy is immutable. So build a new CachingStrategy with the |
| // modifications we want, and compare-and-swap it in. |
| do { |
| prevStrategy = this.cachingStrategy.get(); |
| nextStrategy = new CachingStrategy.Builder(prevStrategy). |
| setDropBehind(dropBehind).build(); |
| } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy)); |
| } |
| |
| @VisibleForTesting |
| ExtendedBlock getBlock() { |
| return getStreamer().getBlock(); |
| } |
| |
| @VisibleForTesting |
| public long getFileId() { |
| return fileId; |
| } |
| |
| /** |
| * Return the source of stream. |
| */ |
| String getSrc() { |
| return src; |
| } |
| |
| /** |
| * Returns the data streamer object. |
| */ |
| protected DataStreamer getStreamer() { |
| return streamer; |
| } |
| |
| @Override |
| public String toString() { |
| return getClass().getSimpleName() + ":" + streamer; |
| } |
| |
| static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, |
| DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId, |
| String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags) |
| throws IOException { |
| final DfsClientConf conf = dfsClient.getConf(); |
| int retries = conf.getNumBlockWriteLocateFollowingRetry(); |
| long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); |
| long localstart = Time.monotonicNow(); |
| while (true) { |
| try { |
| return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, |
| excludedNodes, fileId, favoredNodes, allocFlags); |
| } catch (RemoteException e) { |
| IOException ue = e.unwrapRemoteException(FileNotFoundException.class, |
| AccessControlException.class, |
| NSQuotaExceededException.class, |
| DSQuotaExceededException.class, |
| QuotaByStorageTypeExceededException.class, |
| UnresolvedPathException.class); |
| if (ue != e) { |
| throw ue; // no need to retry these exceptions |
| } |
| if (NotReplicatedYetException.class.getName() |
| .equals(e.getClassName())) { |
| if (retries == 0) { |
| throw e; |
| } else { |
| --retries; |
| LOG.info("Exception while adding a block", e); |
| long elapsed = Time.monotonicNow() - localstart; |
| if (elapsed > 5000) { |
| LOG.info("Waiting for replication for " + (elapsed / 1000) |
| + " seconds"); |
| } |
| try { |
| LOG.warn("NotReplicatedYetException sleeping " + src |
| + " retries left " + retries); |
| Thread.sleep(sleeptime); |
| sleeptime *= 2; |
| } catch (InterruptedException ie) { |
| LOG.warn("Caught exception", ie); |
| } |
| } |
| } else { |
| throw e; |
| } |
| } |
| } |
| } |
| } |