blob: 56c4f527bfea88384ac133f0f71dde1c40f151e1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
/****************************************************************
* DFSOutputStream creates files from a stream of bytes.
*
* The client application writes data that is cached internally by
* this stream. Data is broken up into packets, each packet is
* typically 64K in size. A packet comprises of chunks. Each chunk
* is typically 512 bytes and has an associated checksum with it.
*
* When a client application fills up the currentPacket, it is
* enqueued into dataQueue. The DataStreamer thread picks up
* packets from the dataQueue, sends it to the first datanode in
* the pipeline and moves it from the dataQueue to the ackQueue.
* The ResponseProcessor receives acks from the datanodes. When an
* successful ack for a packet is received from all datanodes, the
* ResponseProcessor removes the corresponding packet from the
* ackQueue.
*
* In case of error, all outstanding packets and moved from
* ackQueue. A new pipeline is setup by eliminating the bad
* datanode from the original pipeline. The DataStreamer now
* starts sending packets from the dataQueue.
****************************************************************/
class DFSOutputStream extends FSOutputSummer implements Syncable {
/**
*
*/
private final DFSClient dfsClient;
private Configuration conf;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s;
// closed is accessed by different threads under different locks.
private volatile boolean closed = false;
private String src;
private final long blockSize;
private final DataChecksum checksum;
// both dataQueue and ackQueue are protected by dataQueue lock
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
private Packet currentPacket = null;
private DataStreamer streamer;
private long currentSeqno = 0;
private long bytesCurBlock = 0; // bytes writen in current block
private int packetSize = 0; // write packet size, including the header.
private int chunksPerPacket = 0;
private volatile IOException lastException = null;
private long artificialSlowdown = 0;
private long lastFlushOffset = -1; // offset when flush was invoked
//persist blocks on namenode
private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
private volatile boolean appendChunk = false; // appending to existing partial block
private long initialFileSize = 0; // at time of file open
private Progressable progress;
private short blockReplication; // replication factor of file
private class Packet {
ByteBuffer buffer; // only one of buf and buffer is non-null
byte[] buf;
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block
boolean lastPacketInBlock; // is this the last packet in block?
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
int dataStart;
int dataPos;
int checksumStart;
int checksumPos;
private static final long HEART_BEAT_SEQNO = -1L;
/**
* create a heartbeat packet
*/
Packet() {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO;
buffer = null;
int packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
buf = new byte[packetSize];
checksumStart = dataStart = packetSize;
checksumPos = checksumStart;
dataPos = dataStart;
maxChunks = 0;
}
// create a new packet
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = offsetInBlock;
this.seqno = currentSeqno;
currentSeqno++;
buffer = null;
buf = new byte[pktSize];
checksumStart = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
checksumPos = checksumStart;
dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
dataPos = dataStart;
maxChunks = chunksPerPkt;
}
void writeData(byte[] inarray, int off, int len) {
if ( dataPos + len > buf.length) {
throw new BufferOverflowException();
}
System.arraycopy(inarray, off, buf, dataPos, len);
dataPos += len;
}
void writeChecksum(byte[] inarray, int off, int len) {
if (checksumPos + len > dataStart) {
throw new BufferOverflowException();
}
System.arraycopy(inarray, off, buf, checksumPos, len);
checksumPos += len;
}
/**
* Returns ByteBuffer that contains one full packet, including header.
*/
ByteBuffer getBuffer() {
/* Once this is called, no more data can be added to the packet.
* setting 'buf' to null ensures that.
* This is called only when the packet is ready to be sent.
*/
if (buffer != null) {
return buffer;
}
//prepare the header and close any gap between checksum and data.
int dataLen = dataPos - dataStart;
int checksumLen = checksumPos - checksumStart;
if (checksumPos != dataStart) {
/* move the checksum to cover the gap.
* This can happen for the last packet.
*/
System.arraycopy(buf, checksumStart, buf,
dataStart - checksumLen , checksumLen);
}
int pktLen = DFSClient.SIZE_OF_INTEGER + dataLen + checksumLen;
//normally dataStart == checksumPos, i.e., offset is zero.
buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
DataNode.PKT_HEADER_LEN + pktLen);
buf = null;
buffer.mark();
/* write the header and data length.
* The format is described in comment before DataNode.BlockSender
*/
buffer.putInt(pktLen); // pktSize
buffer.putLong(offsetInBlock);
buffer.putLong(seqno);
buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
//end of pkt header
buffer.putInt(dataLen); // actual data length, excluding checksum.
buffer.reset();
return buffer;
}
// get the packet's last byte's offset in the block
long getLastByteOffsetBlock() {
return offsetInBlock + dataPos - dataStart;
}
/**
* Check if this packet is a heart beat packet
* @return true if the sequence number is HEART_BEAT_SEQNO
*/
private boolean isHeartbeatPacket() {
return seqno == HEART_BEAT_SEQNO;
}
public String toString() {
return "packet seqno:" + this.seqno +
" offsetInBlock:" + this.offsetInBlock +
" lastPacketInBlock:" + this.lastPacketInBlock +
" lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
}
}
//
// The DataStreamer class is responsible for sending data packets to the
// datanodes in the pipeline. It retrieves a new blockid and block locations
// from the namenode, and starts streaming packets to the pipeline of
// Datanodes. Every packet has a sequence number associated with
// it. When all the packets for a block are sent out and acks for each
// if them are received, the DataStreamer closes the current block.
//
class DataStreamer extends Daemon {
private volatile boolean streamerClosed = false;
private Block block; // its length is number of bytes acked
private BlockAccessToken accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
volatile boolean hasError = false;
volatile int errorIndex = -1;
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
/**
* Default construction for file create
*/
private DataStreamer() {
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
/**
* Construct a data streamer for append
* @param lastBlock last block of the file to be appended
* @param stat status of the file to be appended
* @param bytesPerChecksum number of bytes per checksum
* @throws IOException if error occurs
*/
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum) throws IOException {
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getAccessToken();
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
// calculate the amount of free space in the pre-existing
// last crc chunk
int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
int freeInCksum = bytesPerChecksum - usedInCksum;
// if there is space in the last block, then we have to
// append to that block
if (freeInLastBlock == blockSize) {
throw new IOException("The last block for file " +
src + " is full.");
}
if (usedInCksum > 0 && freeInCksum > 0) {
// if there is space in the last partial chunk, then
// setup in such a way that the next packet will have only
// one chunk that fills up the partial chunk.
//
computePacketChunkSize(0, freeInCksum);
resetChecksumChunk(freeInCksum);
appendChunk = true;
} else {
// if the remaining space in the block is smaller than
// that expected size of of a packet, then create
// smaller size packet.
//
computePacketChunkSize(Math.min(dfsClient.writePacketSize, freeInLastBlock),
bytesPerChecksum);
}
// setup pipeline to append to the last block XXX retries??
nodes = lastBlock.getLocations();
errorIndex = -1; // no errors yet.
if (nodes.length < 1) {
throw new IOException("Unable to retrieve blocks locations " +
" for last block " + block +
"of file " + src);
}
}
/**
* Initialize for data streaming
*/
private void initDataStreaming() {
this.setName("DataStreamer for file " + src +
" block " + block);
response = new ResponseProcessor(nodes);
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
}
private void endBlock() {
DFSClient.LOG.debug("Closing old block " + block);
this.setName("DataStreamer for file " + src);
closeResponder();
closeStream();
nodes = null;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
*/
public void run() {
long lastPacket = System.currentTimeMillis();
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
try {
response.close();
response.join();
response = null;
} catch (InterruptedException e) {
}
}
Packet one = null;
try {
// process datanode IO errors if any
boolean doSleep = false;
if (hasError && errorIndex>=0) {
doSleep = processDatanodeError();
}
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = System.currentTimeMillis();
while ((!streamerClosed && !hasError && dfsClient.clientRunning
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
now - lastPacket < dfsClient.socketTimeout/2)) || doSleep ) {
long timeout = dfsClient.socketTimeout/2 - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
}
doSleep = false;
now = System.currentTimeMillis();
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = new Packet(); // heartbeat packet
} else {
one = dataQueue.getFirst(); // regular data packet
}
}
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
DFSClient.LOG.debug("Allocating new block");
nodes = nextBlockOutputStream(src);
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
DFSClient.LOG.debug("Append to block " + block);
setupPipelineForAppendOrRecovery();
initDataStreaming();
}
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
lastByteOffsetInBlock +
" Aborting file " + src);
}
if (one.lastPacketInBlock) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
}
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
ByteBuffer buf = one.getBuffer();
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DataStreamer block " + block +
" sending packet " + one);
}
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
blockStream.flush();
lastPacket = System.currentTimeMillis();
if (one.isHeartbeatPacket()) { //heartbeat packet
}
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// Is this block full?
if (one.lastPacketInBlock) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
endBlock();
}
if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
DFSClient.LOG.warn("DataStreamer Exception: " +
StringUtils.stringifyException(e));
if (e instanceof IOException) {
setLastException((IOException)e);
}
hasError = true;
if (errorIndex == -1) { // not a datanode error
streamerClosed = true;
}
}
}
closeInternal();
}
private void closeInternal() {
closeResponder(); // close and join
closeStream();
streamerClosed = true;
closed = true;
synchronized (dataQueue) {
dataQueue.notifyAll();
}
}
/*
* close both streamer and DFSOutputStream, should be called only
* by an external thread and only after all data to be sent has
* been flushed to datanode.
*
* Interrupt this data streamer if force is true
*
* @param force if this data stream is forced to be closed
*/
void close(boolean force) {
streamerClosed = true;
synchronized (dataQueue) {
dataQueue.notifyAll();
}
if (force) {
this.interrupt();
}
}
private void closeResponder() {
if (response != null) {
try {
response.close();
response.join();
} catch (InterruptedException e) {
} finally {
response = null;
}
}
}
private void closeStream() {
if (blockStream != null) {
try {
blockStream.close();
} catch (IOException e) {
} finally {
blockStream = null;
}
}
if (blockReplyStream != null) {
try {
blockReplyStream.close();
} catch (IOException e) {
} finally {
blockReplyStream = null;
}
}
}
//
// Processes reponses from the datanodes. A packet is removed
// from the ackQueue when its response arrives.
//
private class ResponseProcessor extends Daemon {
private volatile boolean responderClosed = false;
private DatanodeInfo[] targets = null;
private boolean isLastPacketInBlock = false;
ResponseProcessor (DatanodeInfo[] targets) {
this.targets = targets;
}
public void run() {
setName("ResponseProcessor for block " + block);
PipelineAck ack = new PipelineAck();
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
// read an ack from the pipeline
ack.readFields(blockReplyStream);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient " + ack);
}
long seqno = ack.getSeqno();
// processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final DataTransferProtocol.Status reply = ack.getReply(i);
if (reply != SUCCESS) {
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
" from datanode " +
targets[i].getName());
}
}
assert seqno != PipelineAck.UNKOWN_SEQNO :
"Ack for unkown seqno should be a failed ack: " + ack;
if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
}
// a success ack for a data packet
Packet one = null;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
if (one.seqno != seqno) {
throw new IOException("Responseprocessor: Expecting seqno " +
" for block " + block +
one.seqno + " but received " + seqno);
}
isLastPacketInBlock = one.lastPacketInBlock;
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
synchronized (dataQueue) {
ackQueue.removeFirst();
dataQueue.notifyAll();
}
} catch (Exception e) {
if (!responderClosed) {
if (e instanceof IOException) {
setLastException((IOException)e);
}
hasError = true;
errorIndex = errorIndex==-1 ? 0 : errorIndex;
synchronized (dataQueue) {
dataQueue.notifyAll();
}
DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception " +
" for block " + block +
StringUtils.stringifyException(e));
responderClosed = true;
}
}
}
}
void close() {
responderClosed = true;
this.interrupt();
}
}
// If this stream has encountered any errors so far, shutdown
// threads and mark stream as closed. Returns true if we should
// sleep for a while after returning from this call.
//
private boolean processDatanodeError() throws IOException {
if (response != null) {
DFSClient.LOG.info("Error Recovery for block " + block +
" waiting for responder to exit. ");
return true;
}
closeStream();
// move packets from ack queue to front of the data queue
synchronized (dataQueue) {
dataQueue.addAll(0, ackQueue);
ackQueue.clear();
}
boolean doSleep = setupPipelineForAppendOrRecovery();
if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
synchronized (dataQueue) {
dataQueue.remove(); // remove the end of block packet
dataQueue.notifyAll();
}
endBlock();
} else {
initDataStreaming();
}
}
return doSleep;
}
/**
* Open a DataOutputStream to a DataNode pipeline so that
* it can be written to.
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
*/
private boolean setupPipelineForAppendOrRecovery() throws IOException {
// check number of datanodes
if (nodes == null || nodes.length == 0) {
String msg = "Could not get block locations. " + "Source file \""
+ src + "\" - Aborting...";
DFSClient.LOG.warn(msg);
setLastException(new IOException(msg));
streamerClosed = true;
return false;
}
boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
// any datanodes
//
if (errorIndex >= 0) {
StringBuilder pipelineMsg = new StringBuilder();
for (int j = 0; j < nodes.length; j++) {
pipelineMsg.append(nodes[j].getName());
if (j < nodes.length - 1) {
pipelineMsg.append(", ");
}
}
if (nodes.length <= 1) {
lastException = new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting...");
streamerClosed = true;
return false;
}
DFSClient.LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex].getName());
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
newnodes.length-errorIndex);
nodes = newnodes;
hasError = false;
lastException = null;
errorIndex = -1;
}
// get a new generation stamp and an access token
LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getAccessToken();
// set up the pipeline again with the remaining nodes
success = createBlockOutputStream(nodes, newGS, isRecovery);
}
if (success) {
// update pipeline at the namenode
Block newBlock = new Block(
block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, nodes);
// update client side generation stamp
block = newBlock;
}
return false; // do not sleep, continue processing
}
/**
* Open a DataOutputStream to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
int count = conf.getInt("dfs.client.block.write.retries", 3);
boolean success = false;
do {
hasError = false;
lastException = null;
errorIndex = -1;
success = false;
long startTime = System.currentTimeMillis();
DatanodeInfo[] w = excludedNodes.toArray(
new DatanodeInfo[excludedNodes.size()]);
lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
block = lb.getBlock();
block.setNumBytes(0);
accessToken = lb.getAccessToken();
nodes = lb.getLocations();
//
// Connect to first DataNode in the list.
//
success = createBlockOutputStream(nodes, 0L, false);
if (!success) {
DFSClient.LOG.info("Abandoning block " + block);
dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
block = null;
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.add(nodes[errorIndex]);
}
} while (!success && --count >= 0);
if (!success) {
throw new IOException("Unable to create new block.");
}
return nodes;
}
// connects to the first datanode in the pipeline
// Returns true if success, otherwise return failure.
//
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag) {
DataTransferProtocol.Status pipelineStatus = SUCCESS;
String firstBadLink = "";
if (DFSClient.LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
DFSClient.LOG.debug("pipeline = " + nodes[i].getName());
}
}
// persist blocks on namenode on next flush
persistBlocks.set(true);
try {
DFSClient.LOG.debug("Connecting to " + nodes[0].getName());
InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
s = dfsClient.socketFactory.createSocket();
int timeoutValue = dfsClient.getDatanodeReadTimeout(nodes.length);
NetUtils.connect(s, target, timeoutValue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
DFSClient.LOG.debug("Send buf size " + s.getSendBufferSize());
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
//
// Xmit header info to datanode
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(s, writeTimeout),
DataNode.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
// send the request
DataTransferProtocol.Sender.opWriteBlock(out,
block.getBlockId(), block.getGenerationStamp(),
nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes, accessToken);
checksum.writeHeader(out);
out.flush();
// receive ack for connect
pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
firstBadLink = Text.readString(blockReplyStream);
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == ERROR_ACCESS_TOKEN) {
throw new InvalidAccessTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
} else {
throw new IOException("Bad connect ack with firstBadLink as "
+ firstBadLink);
}
}
blockStream = out;
return true; // success
} catch (IOException ie) {
DFSClient.LOG.info("Exception in createBlockOutputStream " + ie);
// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
if (nodes[i].getName().equals(firstBadLink)) {
errorIndex = i;
break;
}
}
} else {
errorIndex = 0;
}
hasError = true;
setLastException(ie);
blockReplyStream = null;
return false; // error
}
}
private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes)
throws IOException, UnresolvedLinkException {
int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
while (true) {
try {
return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
AccessControlException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
UnresolvedPathException.class);
if (ue != e) {
throw ue; // no need to retry these exceptions
}
if (NotReplicatedYetException.class.getName().
equals(e.getClassName())) {
if (retries == 0) {
throw e;
} else {
--retries;
DFSClient.LOG.info(StringUtils.stringifyException(e));
if (System.currentTimeMillis() - localstart > 5000) {
DFSClient.LOG.info("Waiting for replication for "
+ (System.currentTimeMillis() - localstart) / 1000
+ " seconds");
}
try {
DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
+ " retries left " + retries);
Thread.sleep(sleeptime);
sleeptime *= 2;
} catch (InterruptedException ie) {
}
}
} else {
throw e;
}
}
}
}
}
Block getBlock() {
return block;
}
DatanodeInfo[] getNodes() {
return nodes;
}
BlockAccessToken getAccessToken() {
return accessToken;
}
private void setLastException(IOException e) {
if (lastException == null) {
lastException = e;
}
}
}
private void isClosed() throws IOException {
if (closed) {
IOException e = lastException;
throw e != null ? e : new IOException("DFSOutputStream is closed");
}
}
//
// returns the list of targets, if any, that is being currently used.
//
synchronized DatanodeInfo[] getPipeline() {
if (streamer == null) {
return null;
}
DatanodeInfo[] currentNodes = streamer.getNodes();
if (currentNodes == null) {
return null;
}
DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
for (int i = 0; i < currentNodes.length; i++) {
value[i] = currentNodes[i];
}
return value;
}
private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
int bytesPerChecksum, short replication) throws IOException {
super(new PureJavaCrc32(), bytesPerChecksum, 4);
this.dfsClient = dfsClient;
this.conf = dfsClient.conf;
this.src = src;
this.blockSize = blockSize;
this.blockReplication = replication;
this.progress = progress;
if (progress != null) {
DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
}
if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
") and blockSize(" + blockSize +
") do not match. " + "blockSize should be a " +
"multiple of io.bytes.per.checksum");
}
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
bytesPerChecksum);
}
/**
* Create a new output stream to the given DataNode.
* @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
*/
DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum)
throws IOException, UnresolvedLinkException {
this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
try {
dfsClient.namenode.create(
src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
UnresolvedPathException.class);
}
streamer = new DataStreamer();
streamer.start();
}
/**
* Create a new output stream to the given DataNode.
* @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
*/
DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum) throws IOException {
this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
initialFileSize = stat.getLen(); // length of file when opened
//
// The last partial block of the file has to be filled.
//
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
} else {
computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
streamer = new DataStreamer();
}
streamer.start();
}
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize();
int n = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
packetSize = n + chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
", chunkSize=" + chunkSize +
", chunksPerPacket=" + chunksPerPacket +
", packetSize=" + packetSize);
}
}
private void queuePacket(Packet packet) {
synchronized (dataQueue) {
dataQueue.addLast(packet);
dataQueue.notifyAll();
}
}
private void waitAndQueuePacket(Packet packet) throws IOException {
synchronized (dataQueue) {
// If queue is full, then wait till we have enough space
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
try {
dataQueue.wait();
} catch (InterruptedException e) {
}
}
isClosed();
queuePacket(packet);
}
}
// @see FSOutputSummer#writeChunk()
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException {
dfsClient.checkOpen();
isClosed();
int cklen = checksum.length;
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
if (checksum.length != this.checksum.getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
this.checksum.getChecksumSize() +
" but found to be " + checksum.length);
}
if (currentPacket == null) {
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + bytesCurBlock);
}
}
currentPacket.writeChecksum(checksum, 0, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
// If packet is full, enqueue it for transmission
//
if (currentPacket.numChunks == currentPacket.maxChunks ||
bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.seqno +
", src=" + src +
", bytesCurBlock=" + bytesCurBlock +
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
waitAndQueuePacket(currentPacket);
currentPacket = null;
// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumChunk(bytesPerChecksum);
}
if (!appendChunk) {
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
waitAndQueuePacket(currentPacket);
currentPacket = null;
bytesCurBlock = 0;
lastFlushOffset = -1;
}
}
}
@Override
@Deprecated
public synchronized void sync() throws IOException {
hflush();
}
/**
* flushes out to all replicas of the block.
* The data is in the buffers of the DNs
* but not neccessary on the DN's OS buffers.
*
* It is a synchronous operation. When it returns,
* it gurantees that flushed data become visible to new readers.
* It is not guaranteed that data has been flushed to
* persistent store on the datanode.
* Block allocations are persisted on namenode.
*/
@Override
public synchronized void hflush() throws IOException {
dfsClient.checkOpen();
isClosed();
try {
/* Record current blockOffset. This might be changed inside
* flushBuffer() where a partial checksum chunk might be flushed.
* After the flush, reset the bytesCurBlock back to its previous value,
* any partial checksum chunk will be sent now and in next packet.
*/
long saveOffset = bytesCurBlock;
// flush checksum buffer, but keep checksum buffer intact
flushBuffer(true);
DFSClient.LOG.debug("DFSClient flush() : saveOffset " + saveOffset +
" bytesCurBlock " + bytesCurBlock +
" lastFlushOffset " + lastFlushOffset);
// Flush only if we haven't already flushed till this offset.
if (lastFlushOffset != bytesCurBlock) {
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
// wait for all packets to be sent and acknowledged
flushInternal();
} else {
// just discard the current packet since it is already been sent.
currentPacket = null;
}
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
//
bytesCurBlock = saveOffset;
// If any new blocks were allocated since the last flush,
// then persist block locations on namenode.
//
if (persistBlocks.getAndSet(false)) {
dfsClient.namenode.fsync(src, dfsClient.clientName);
}
} catch (IOException e) {
lastException = new IOException("IOException flush:" + e);
closeThreads(true);
throw e;
}
}
/**
* The expected semantics is all data have flushed out to all replicas
* and all replicas have done posix fsync equivalent - ie the OS has
* flushed it to the disk device (but the disk may have it in its cache).
*
* Right now by default it is implemented as hflush
*/
@Override
public synchronized void hsync() throws IOException {
hflush();
}
/**
* Returns the number of replicas of current block. This can be different
* from the designated replication factor of the file because the NameNode
* does not replicate the block to which a client is currently writing to.
* The client continues to write to a block even if a few datanodes in the
* write pipeline have failed.
* @return the number of valid replicas of the current block
*/
public synchronized int getNumCurrentReplicas() throws IOException {
dfsClient.checkOpen();
isClosed();
if (streamer == null) {
return blockReplication; // no pipeline, return repl factor of file
}
DatanodeInfo[] currentNodes = streamer.getNodes();
if (currentNodes == null) {
return blockReplication; // no pipeline, return repl factor of file
}
return currentNodes.length;
}
/**
* Waits till all existing data is flushed and confirmations
* received from datanodes.
*/
private synchronized void flushInternal() throws IOException {
dfsClient.checkOpen();
isClosed();
//
// If there is data in the current buffer, send it across
//
if (currentPacket != null) {
queuePacket(currentPacket);
currentPacket = null;
}
synchronized (dataQueue) {
while (!closed && dataQueue.size() + ackQueue.size() > 0) {
try {
dataQueue.wait();
} catch (InterruptedException e) {
}
}
isClosed();
}
}
/**
* Aborts this output stream and releases any system
* resources associated with this stream.
*/
synchronized void abort() throws IOException {
if (closed) {
return;
}
streamer.setLastException(new IOException("Lease timeout of " +
(dfsClient.hdfsTimeout/1000) + " seconds expired."));
closeThreads(true);
}
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
private void closeThreads(boolean force) throws IOException {
try {
streamer.close(force);
streamer.join();
if (s != null) {
s.close();
}
} catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer");
} finally {
streamer = null;
s = null;
closed = true;
}
}
/**
* Closes this output stream and releases any system
* resources associated with this stream.
*/
@Override
public synchronized void close() throws IOException {
if (closed) {
IOException e = lastException;
if (e == null)
return;
else
throw e;
}
try {
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
waitAndQueuePacket(currentPacket);
}
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
}
flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer
Block lastBlock = streamer.getBlock();
closeThreads(false);
completeFile(lastBlock);
dfsClient.leasechecker.remove(src);
} finally {
closed = true;
}
}
// should be called holding (this) lock since setTestFilename() may
// be called during unit tests
private void completeFile(Block last) throws IOException {
long localstart = System.currentTimeMillis();
boolean fileComplete = false;
while (!fileComplete) {
fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
if (!fileComplete) {
if (!dfsClient.clientRunning ||
(dfsClient.hdfsTimeout > 0 &&
localstart + dfsClient.hdfsTimeout < System.currentTimeMillis())) {
String msg = "Unable to close file because dfsclient " +
" was unable to contact the HDFS servers." +
" clientRunning " + dfsClient.clientRunning +
" hdfsTimeout " + dfsClient.hdfsTimeout;
DFSClient.LOG.info(msg);
throw new IOException(msg);
}
try {
Thread.sleep(400);
if (System.currentTimeMillis() - localstart > 5000) {
DFSClient.LOG.info("Could not complete file " + src + " retrying...");
}
} catch (InterruptedException ie) {
}
}
}
}
void setArtificialSlowdown(long period) {
artificialSlowdown = period;
}
synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER +
(checksum.getBytesPerChecksum() +
checksum.getChecksumSize()) * chunksPerPacket;
}
synchronized void setTestFilename(String newname) {
src = newname;
}
/**
* Returns the size of a file as it was when this stream was opened
*/
long getInitialLen() {
return initialFileSize;
}
/**
* Returns the access token currently used by streamer, for testing only
*/
BlockAccessToken getAccessToken() {
return streamer.getAccessToken();
}
}