blob: 4ca8fe6b42321509e09cfa80602dc6e94bd87b34 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.base.Preconditions;
/**
* This class supports writing files in striped layout and erasure coded format.
* Each stripe contains a sequence of cells.
*/
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream {
static class MultipleBlockingQueue<T> {
private final List<BlockingQueue<T>> queues;
MultipleBlockingQueue(int numQueue, int queueSize) {
queues = new ArrayList<>(numQueue);
for (int i = 0; i < numQueue; i++) {
queues.add(new LinkedBlockingQueue<T>(queueSize));
}
}
boolean isEmpty() {
for(int i = 0; i < queues.size(); i++) {
if (!queues.get(i).isEmpty()) {
return false;
}
}
return true;
}
int numQueues() {
return queues.size();
}
void offer(int i, T object) {
final boolean b = queues.get(i).offer(object);
Preconditions.checkState(b, "Failed to offer " + object
+ " to queue, i=" + i);
}
T take(int i) throws InterruptedIOException {
try {
return queues.get(i).take();
} catch(InterruptedException ie) {
throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie);
}
}
T poll(int i) {
return queues.get(i).poll();
}
T peek(int i) {
return queues.get(i).peek();
}
}
/** Coordinate the communication between the streamers. */
class Coordinator {
private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
private final MultipleBlockingQueue<LocatedBlock> newBlocks;
private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
Coordinator(final DfsClientConf conf, final int numDataBlocks,
final int numAllBlocks) {
followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
}
MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
return followingBlocks;
}
MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
return newBlocks;
}
MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
return updateBlocks;
}
StripedDataStreamer getStripedDataStreamer(int i) {
return DFSStripedOutputStream.this.getStripedDataStreamer(i);
}
void offerEndBlock(int i, ExtendedBlock block) {
endBlocks.offer(i, block);
}
ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
return endBlocks.take(i);
}
boolean hasAllEndBlocks() {
for(int i = 0; i < endBlocks.numQueues(); i++) {
if (endBlocks.peek(i) == null) {
return false;
}
}
return true;
}
void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
ExtendedBlock b = endBlocks.peek(i);
if (b == null) {
// streamer just has failed, put end block and continue
b = block;
offerEndBlock(i, b);
}
b.setNumBytes(newBytes);
}
/** @return a block representing the entire block group. */
ExtendedBlock getBlockGroup() {
final StripedDataStreamer s0 = getStripedDataStreamer(0);
final ExtendedBlock b0 = s0.getBlock();
if (b0 == null) {
return null;
}
final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
final ExtendedBlock block = new ExtendedBlock(b0);
long numBytes = b0.getNumBytes();
for (int i = 1; i < numDataBlocks; i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
final ExtendedBlock bi = si.getBlock();
if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(bi.getGenerationStamp());
}
numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
}
block.setNumBytes(numBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
}
return block;
}
}
/** Buffers for writing the data and parity cells of a stripe. */
class CellBuffers {
private final ByteBuffer[] buffers;
private final byte[][] checksumArrays;
CellBuffers(int numParityBlocks) throws InterruptedException{
if (cellSize % bytesPerChecksum != 0) {
throw new HadoopIllegalArgumentException("Invalid values: "
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
+ bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
}
checksumArrays = new byte[numParityBlocks][];
final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
for (int i = 0; i < checksumArrays.length; i++) {
checksumArrays[i] = new byte[size];
}
buffers = new ByteBuffer[numAllBlocks];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
}
}
private ByteBuffer[] getBuffers() {
return buffers;
}
byte[] getChecksumArray(int i) {
return checksumArrays[i - numDataBlocks];
}
private int addTo(int i, byte[] b, int off, int len) {
final ByteBuffer buf = buffers[i];
final int pos = buf.position() + len;
Preconditions.checkState(pos <= cellSize);
buf.put(b, off, len);
return pos;
}
private void clear() {
for (int i = 0; i< numAllBlocks; i++) {
buffers[i].clear();
if (i >= numDataBlocks) {
Arrays.fill(buffers[i].array(), (byte) 0);
}
}
}
private void release() {
for (int i = 0; i < numAllBlocks; i++) {
byteArrayManager.release(buffers[i].array());
}
}
private void flipDataBuffers() {
for (int i = 0; i < numDataBlocks; i++) {
buffers[i].flip();
}
}
}
private final Coordinator coordinator;
private final CellBuffers cellBuffers;
private final RawErasureEncoder encoder;
private final List<StripedDataStreamer> streamers;
private final DFSPacket[] currentPackets; // current Packet of each streamer
/** Size of each striping cell, must be a multiple of bytesPerChecksum */
private final int cellSize;
private final int numAllBlocks;
private final int numDataBlocks;
@Override
ExtendedBlock getBlock() {
return coordinator.getBlockGroup();
}
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes)
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating DFSStripedOutputStream for " + src);
}
final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
final int numParityBlocks = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
numDataBlocks = ecPolicy.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks;
encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
numDataBlocks, numParityBlocks);
coordinator = new Coordinator(dfsClient.getConf(),
numDataBlocks, numAllBlocks);
try {
cellBuffers = new CellBuffers(numParityBlocks);
} catch (InterruptedException ie) {
throw DFSUtil.toInterruptedIOException(
"Failed to create cell buffers", ie);
}
List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator);
s.add(streamer);
}
streamers = Collections.unmodifiableList(s);
currentPackets = new DFSPacket[streamers.size()];
setCurrentStreamer(0);
}
StripedDataStreamer getStripedDataStreamer(int i) {
return streamers.get(i);
}
int getCurrentIndex() {
return getCurrentStreamer().getIndex();
}
private synchronized StripedDataStreamer getCurrentStreamer() {
return (StripedDataStreamer)streamer;
}
private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
throws IOException {
// backup currentPacket for current streamer
int oldIdx = streamers.indexOf(streamer);
if (oldIdx >= 0) {
currentPackets[oldIdx] = currentPacket;
}
streamer = streamers.get(newIdx);
currentPacket = currentPackets[newIdx];
adjustChunkBoundary();
return getCurrentStreamer();
}
/**
* Encode the buffers, i.e. compute parities.
*
* @param buffers data buffers + parity buffers
*/
private static void encode(RawErasureEncoder encoder, int numData,
ByteBuffer[] buffers) {
final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
encoder.encode(dataBuffers, parityBuffers);
}
private void checkStreamers() throws IOException {
int count = 0;
for(StripedDataStreamer s : streamers) {
if (!s.isFailed()) {
if (s.getBlock() != null) {
s.getErrorState().initExternalError();
}
count++;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("checkStreamers: " + streamers);
LOG.debug("count=" + count);
}
if (count < numDataBlocks) {
throw new IOException("Failed: the number of remaining blocks = "
+ count + " < the number of data blocks = " + numDataBlocks);
}
}
private void handleStreamerFailure(String err,
Exception e) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().setFailed(true);
checkStreamers();
currentPacket = null;
}
@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
final int index = getCurrentIndex();
final StripedDataStreamer current = getCurrentStreamer();
final int pos = cellBuffers.addTo(index, bytes, offset, len);
final boolean cellFull = pos == cellSize;
final long oldBytes = current.getBytesCurBlock();
if (!current.isFailed()) {
try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
} catch(Exception e) {
handleStreamerFailure("offset=" + offset + ", length=" + len, e);
}
}
if (current.isFailed()) {
final long newBytes = oldBytes + len;
coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
current.setBytesCurBlock(newBytes);
}
// Two extra steps are needed when a striping cell is full:
// 1. Forward the current index pointer
// 2. Generate parity packets if a full stripe of data cells are present
if (cellFull) {
int next = index + 1;
//When all data cells in a stripe are ready, we need to encode
//them and generate some parity cells. These cells will be
//converted to packets and put to their DataStreamer's queue.
if (next == numDataBlocks) {
cellBuffers.flipDataBuffers();
writeParityCells();
next = 0;
}
setCurrentStreamer(next);
}
}
private int stripeDataSize() {
return numDataBlocks * cellSize;
}
@Override
public void hflush() {
throw new UnsupportedOperationException();
}
@Override
public void hsync() {
throw new UnsupportedOperationException();
}
@Override
protected synchronized void start() {
for (StripedDataStreamer streamer : streamers) {
streamer.start();
}
}
@Override
synchronized void abort() throws IOException {
if (isClosed()) {
return;
}
for (StripedDataStreamer streamer : streamers) {
streamer.getLastException().set(new IOException("Lease timeout of "
+ (dfsClient.getConf().getHdfsTimeout()/1000) +
" seconds expired."));
}
closeThreads(true);
dfsClient.endFileLease(fileId);
}
@Override
boolean isClosed() {
if (closed) {
return true;
}
for(StripedDataStreamer s : streamers) {
if (!s.streamerClosed()) {
return false;
}
}
return true;
}
@Override
protected void closeThreads(boolean force) throws IOException {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
try {
for (StripedDataStreamer streamer : streamers) {
try {
streamer.close(force);
streamer.join();
streamer.closeSocket();
} catch (Exception e) {
try {
handleStreamerFailure("force=" + force, e);
} catch (IOException ioe) {
b.add(ioe);
}
} finally {
streamer.setSocketToNull();
}
}
} finally {
setClosed();
}
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
}
/**
* Simply add bytesCurBlock together. Note that this result is not accurately
* the size of the block group.
*/
private long getCurrentSumBytes() {
long sum = 0;
for (int i = 0; i < numDataBlocks; i++) {
sum += streamers.get(i).getBytesCurBlock();
}
return sum;
}
private void writeParityCellsForLastStripe() throws IOException {
final long currentBlockGroupBytes = getCurrentSumBytes();
if (currentBlockGroupBytes % stripeDataSize() == 0) {
return;
}
final int firstCellSize =
(int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
firstCellSize : cellSize;
final ByteBuffer[] buffers = cellBuffers.getBuffers();
for (int i = 0; i < numAllBlocks; i++) {
// Pad zero bytes to make all cells exactly the size of parityCellSize
// If internal block is smaller than parity block, pad zero bytes.
// Also pad zero bytes to all parity cells
final int position = buffers[i].position();
assert position <= parityCellSize : "If an internal block is smaller" +
" than parity block, then its last cell should be small than last" +
" parity cell";
for (int j = 0; j < parityCellSize - position; j++) {
buffers[i].put((byte) 0);
}
buffers[i].flip();
}
writeParityCells();
}
void writeParityCells() throws IOException {
final ByteBuffer[] buffers = cellBuffers.getBuffers();
//encode the data cells
encode(encoder, numDataBlocks, buffers);
for (int i = numDataBlocks; i < numAllBlocks; i++) {
writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
}
cellBuffers.clear();
}
void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
) throws IOException {
final StripedDataStreamer current = setCurrentStreamer(index);
final int len = buffer.limit();
final long oldBytes = current.getBytesCurBlock();
if (!current.isFailed()) {
try {
DataChecksum sum = getDataChecksum();
sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
getChecksumSize());
}
} catch(Exception e) {
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
}
}
if (current.isFailed()) {
final long newBytes = oldBytes + len;
current.setBytesCurBlock(newBytes);
}
}
@Override
void setClosed() {
super.setClosed();
for (int i = 0; i < numAllBlocks; i++) {
streamers.get(i).release();
}
cellBuffers.release();
}
@Override
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
try {
si.getLastException().check(true);
} catch (IOException e) {
b.add(e);
}
}
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
return;
}
try {
// flush from all upper layers
try {
flushBuffer();
// if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
enqueueAllCurrentPackets();
} catch(Exception e) {
handleStreamerFailure("closeImpl", e);
}
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (!s.isFailed()) {
try {
if (s.getBytesCurBlock() > 0) {
setCurrentPacketToEmpty();
}
// flush all data to Datanode
flushInternal();
} catch(Exception e) {
handleStreamerFailure("closeImpl", e);
}
}
}
closeThreads(false);
final ExtendedBlock lastBlock = coordinator.getBlockGroup();
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
completeFile(lastBlock);
} finally {
scope.close();
}
dfsClient.endFileLease(fileId);
} catch (ClosedChannelException ignored) {
} finally {
setClosed();
}
}
private void enqueueAllCurrentPackets() throws IOException {
int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) {
setCurrentStreamer(i);
if (currentPacket != null) {
enqueueCurrentPacket();
}
}
setCurrentStreamer(idx);
}
}