blob: 884fa42484bde66a3d4a04ea51e3d4e5be248f50 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
import org.apache.htrace.core.TraceScope;
/**
* This class supports writing files in striped layout and erasure coded format.
* Each stripe contains a sequence of cells.
*/
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream {
static class MultipleBlockingQueue<T> {
private final List<BlockingQueue<T>> queues;
MultipleBlockingQueue(int numQueue, int queueSize) {
List<BlockingQueue<T>> list = new ArrayList<>(numQueue);
for (int i = 0; i < numQueue; i++) {
list.add(new LinkedBlockingQueue<T>(queueSize));
}
queues = Collections.synchronizedList(list);
}
void offer(int i, T object) {
final boolean b = queues.get(i).offer(object);
Preconditions.checkState(b, "Failed to offer " + object
+ " to queue, i=" + i);
}
T take(int i) throws InterruptedIOException {
try {
return queues.get(i).take();
} catch(InterruptedException ie) {
throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, ie);
}
}
T takeWithTimeout(int i) throws InterruptedIOException {
try {
return queues.get(i).poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, e);
}
}
T poll(int i) {
return queues.get(i).poll();
}
T peek(int i) {
return queues.get(i).peek();
}
void clear() {
for (BlockingQueue<T> q : queues) {
q.clear();
}
}
}
/** Coordinate the communication between the streamers. */
static class Coordinator {
/**
* The next internal block to write to for each streamers. The
* DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to
* get a new block group. The block group is split to internal blocks, which
* are then distributed into the queue for streamers to retrieve.
*/
private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
/**
* Used to sync among all the streamers before allocating a new block. The
* DFSStripedOutputStream uses this to make sure every streamer has finished
* writing the previous block.
*/
private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
/**
* The following data structures are used for syncing while handling errors
*/
private final MultipleBlockingQueue<LocatedBlock> newBlocks;
private final Map<StripedDataStreamer, Boolean> updateStreamerMap;
private final MultipleBlockingQueue<Boolean> streamerUpdateResult;
Coordinator(final int numAllBlocks) {
followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
updateStreamerMap = Collections.synchronizedMap(
new HashMap<StripedDataStreamer, Boolean>(numAllBlocks));
streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1);
}
MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
return followingBlocks;
}
MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
return newBlocks;
}
void offerEndBlock(int i, ExtendedBlock block) {
endBlocks.offer(i, block);
}
void offerStreamerUpdateResult(int i, boolean success) {
streamerUpdateResult.offer(i, success);
}
boolean takeStreamerUpdateResult(int i) throws InterruptedIOException {
return streamerUpdateResult.take(i);
}
void updateStreamer(StripedDataStreamer streamer,
boolean success) {
assert !updateStreamerMap.containsKey(streamer);
updateStreamerMap.put(streamer, success);
}
void clearFailureStates() {
newBlocks.clear();
updateStreamerMap.clear();
streamerUpdateResult.clear();
}
}
/** Buffers for writing the data and parity cells of a stripe. */
class CellBuffers {
private final ByteBuffer[] buffers;
private final byte[][] checksumArrays;
CellBuffers(int numParityBlocks) throws InterruptedException{
if (cellSize % bytesPerChecksum != 0) {
throw new HadoopIllegalArgumentException("Invalid values: "
+ HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
+ bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
}
checksumArrays = new byte[numParityBlocks][];
final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
for (int i = 0; i < checksumArrays.length; i++) {
checksumArrays[i] = new byte[size];
}
buffers = new ByteBuffer[numAllBlocks];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
}
}
private ByteBuffer[] getBuffers() {
return buffers;
}
byte[] getChecksumArray(int i) {
return checksumArrays[i - numDataBlocks];
}
private int addTo(int i, byte[] b, int off, int len) {
final ByteBuffer buf = buffers[i];
final int pos = buf.position() + len;
Preconditions.checkState(pos <= cellSize);
buf.put(b, off, len);
return pos;
}
private void clear() {
for (int i = 0; i< numAllBlocks; i++) {
buffers[i].clear();
if (i >= numDataBlocks) {
Arrays.fill(buffers[i].array(), (byte) 0);
}
}
}
private void release() {
for (int i = 0; i < numAllBlocks; i++) {
byteArrayManager.release(buffers[i].array());
}
}
private void flipDataBuffers() {
for (int i = 0; i < numDataBlocks; i++) {
buffers[i].flip();
}
}
}
private final Coordinator coordinator;
private final CellBuffers cellBuffers;
private final RawErasureEncoder encoder;
private final List<StripedDataStreamer> streamers;
private final DFSPacket[] currentPackets; // current Packet of each streamer
/** Size of each striping cell, must be a multiple of bytesPerChecksum */
private final int cellSize;
private final int numAllBlocks;
private final int numDataBlocks;
private ExtendedBlock currentBlockGroup;
private final String[] favoredNodes;
private final List<StripedDataStreamer> failedStreamers;
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes)
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating DFSStripedOutputStream for " + src);
}
final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
final int numParityBlocks = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
numDataBlocks = ecPolicy.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks;
this.favoredNodes = favoredNodes;
failedStreamers = new ArrayList<>();
encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
numDataBlocks, numParityBlocks);
coordinator = new Coordinator(numAllBlocks);
try {
cellBuffers = new CellBuffers(numParityBlocks);
} catch (InterruptedException ie) {
throw DFSUtilClient.toInterruptedIOException(
"Failed to create cell buffers", ie);
}
streamers = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator);
streamers.add(streamer);
}
currentPackets = new DFSPacket[streamers.size()];
setCurrentStreamer(0);
}
StripedDataStreamer getStripedDataStreamer(int i) {
return streamers.get(i);
}
int getCurrentIndex() {
return getCurrentStreamer().getIndex();
}
private synchronized StripedDataStreamer getCurrentStreamer() {
return (StripedDataStreamer) streamer;
}
private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
// backup currentPacket for current streamer
if (streamer != null) {
int oldIdx = streamers.indexOf(getCurrentStreamer());
if (oldIdx >= 0) {
currentPackets[oldIdx] = currentPacket;
}
}
streamer = getStripedDataStreamer(newIdx);
currentPacket = currentPackets[newIdx];
adjustChunkBoundary();
return getCurrentStreamer();
}
/**
* Encode the buffers, i.e. compute parities.
*
* @param buffers data buffers + parity buffers
*/
private static void encode(RawErasureEncoder encoder, int numData,
ByteBuffer[] buffers) {
final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
encoder.encode(dataBuffers, parityBuffers);
}
/**
* check all the existing StripedDataStreamer and find newly failed streamers.
* @return The newly failed streamers.
* @throws IOException if less than {@link #numDataBlocks} streamers are still
* healthy.
*/
private Set<StripedDataStreamer> checkStreamers() throws IOException {
Set<StripedDataStreamer> newFailed = new HashSet<>();
for(StripedDataStreamer s : streamers) {
if (!s.isHealthy() && !failedStreamers.contains(s)) {
newFailed.add(s);
}
}
final int failCount = failedStreamers.size() + newFailed.size();
if (LOG.isDebugEnabled()) {
LOG.debug("checkStreamers: " + streamers);
LOG.debug("healthy streamer count=" + (numAllBlocks - failCount));
LOG.debug("original failed streamers: " + failedStreamers);
LOG.debug("newly failed streamers: " + newFailed);
}
if (failCount > (numAllBlocks - numDataBlocks)) {
throw new IOException("Failed: the number of failed blocks = "
+ failCount + " > the number of data blocks = "
+ (numAllBlocks - numDataBlocks));
}
return newFailed;
}
private void handleStreamerFailure(String err, Exception e)
throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().getErrorState().setInternalError();
getCurrentStreamer().close(true);
checkStreamers();
currentPacket = null;
}
private void replaceFailedStreamers() {
assert streamers.size() == numAllBlocks;
final int currentIndex = getCurrentIndex();
assert currentIndex == 0;
for (short i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer oldStreamer = getStripedDataStreamer(i);
if (!oldStreamer.isHealthy()) {
LOG.info("replacing previously failed streamer " + oldStreamer);
StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat,
dfsClient, src, oldStreamer.progress,
oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator);
streamers.set(i, streamer);
currentPackets[i] = null;
if (i == currentIndex) {
this.streamer = streamer;
this.currentPacket = null;
}
streamer.start();
}
}
}
private void waitEndBlocks(int i) throws IOException {
while (getStripedDataStreamer(i).isHealthy()) {
final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i);
if (b != null) {
StripedBlockUtil.checkBlocks(currentBlockGroup, i, b);
return;
}
}
}
private DatanodeInfo[] getExcludedNodes() {
List<DatanodeInfo> excluded = new ArrayList<>();
for (StripedDataStreamer streamer : streamers) {
for (DatanodeInfo e : streamer.getExcludedNodes()) {
if (e != null) {
excluded.add(e);
}
}
}
return excluded.toArray(new DatanodeInfo[excluded.size()]);
}
private void allocateNewBlock() throws IOException {
if (currentBlockGroup != null) {
for (int i = 0; i < numAllBlocks; i++) {
// sync all the healthy streamers before writing to the new block
waitEndBlocks(i);
}
}
failedStreamers.clear();
DatanodeInfo[] excludedNodes = getExcludedNodes();
LOG.debug("Excluding DataNodes when allocating new block: "
+ Arrays.asList(excludedNodes));
// replace failed streamers
replaceFailedStreamers();
LOG.debug("Allocating new block group. The previous block group: "
+ currentBlockGroup);
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
currentBlockGroup, fileId, favoredNodes);
assert lb.isStriped();
if (lb.getLocations().length < numDataBlocks) {
throw new IOException("Failed to get " + numDataBlocks
+ " nodes from namenode: blockGroupSize= " + numAllBlocks
+ ", blocks.length= " + lb.getLocations().length);
}
// assign the new block to the current block group
currentBlockGroup = lb.getBlock();
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock) lb, cellSize, numDataBlocks,
numAllBlocks - numDataBlocks);
for (int i = 0; i < blocks.length; i++) {
StripedDataStreamer si = getStripedDataStreamer(i);
assert si.isHealthy();
if (blocks[i] == null) {
// Set exception and close streamer as there is no block locations
// found for the parity block.
LOG.warn("Failed to get block location for parity block, index=" + i);
si.getLastException().set(
new IOException("Failed to get following block, i=" + i));
si.getErrorState().setInternalError();
si.close(true);
} else {
coordinator.getFollowingBlocks().offer(i, blocks[i]);
}
}
}
private boolean shouldEndBlockGroup() {
return currentBlockGroup != null &&
currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
}
@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
final int index = getCurrentIndex();
final int pos = cellBuffers.addTo(index, bytes, offset, len);
final boolean cellFull = pos == cellSize;
if (currentBlockGroup == null || shouldEndBlockGroup()) {
// the incoming data should belong to a new block. Allocate a new block.
allocateNewBlock();
}
currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
// note: the current streamer can be refreshed after allocating a new block
final StripedDataStreamer current = getCurrentStreamer();
if (current.isHealthy()) {
try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
} catch(Exception e) {
handleStreamerFailure("offset=" + offset + ", length=" + len, e);
}
}
// Two extra steps are needed when a striping cell is full:
// 1. Forward the current index pointer
// 2. Generate parity packets if a full stripe of data cells are present
if (cellFull) {
int next = index + 1;
//When all data cells in a stripe are ready, we need to encode
//them and generate some parity cells. These cells will be
//converted to packets and put to their DataStreamer's queue.
if (next == numDataBlocks) {
cellBuffers.flipDataBuffers();
writeParityCells();
next = 0;
// if this is the end of the block group, end each internal block
if (shouldEndBlockGroup()) {
flushAllInternals();
checkStreamerFailures();
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) {
try {
endBlock();
} catch (IOException ignored) {}
}
}
} else {
// check failure state for all the streamers. Bump GS if necessary
checkStreamerFailures();
}
}
setCurrentStreamer(next);
}
}
@Override
synchronized void enqueueCurrentPacketFull() throws IOException {
LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ " appendChunk={}, {}", currentPacket, src, getStreamer()
.getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
getStreamer());
enqueueCurrentPacket();
adjustChunkBoundary();
// no need to end block here
}
/**
* @return whether the data streamer with the given index is streaming data.
* Note the streamer may not be in STREAMING stage if the block length is less
* than a stripe.
*/
private boolean isStreamerWriting(int streamerIndex) {
final long length = currentBlockGroup == null ?
0 : currentBlockGroup.getNumBytes();
if (length == 0) {
return false;
}
if (streamerIndex >= numDataBlocks) {
return true;
}
final int numCells = (int) ((length - 1) / cellSize + 1);
return streamerIndex < numCells;
}
private Set<StripedDataStreamer> markExternalErrorOnStreamers() {
Set<StripedDataStreamer> healthySet = new HashSet<>();
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer streamer = getStripedDataStreamer(i);
if (streamer.isHealthy() && isStreamerWriting(i)) {
Preconditions.checkState(
streamer.getStage() == BlockConstructionStage.DATA_STREAMING,
"streamer: " + streamer);
streamer.setExternalError();
healthySet.add(streamer);
}
}
return healthySet;
}
/**
* Check and handle data streamer failures. This is called only when we have
* written a full stripe (i.e., enqueue all packets for a full stripe), or
* when we're closing the outputstream.
*/
private void checkStreamerFailures() throws IOException {
Set<StripedDataStreamer> newFailed = checkStreamers();
if (newFailed.size() == 0) {
return;
}
// for healthy streamers, wait till all of them have fetched the new block
// and flushed out all the enqueued packets.
flushAllInternals();
// recheck failed streamers again after the flush
newFailed = checkStreamers();
while (newFailed.size() > 0) {
failedStreamers.addAll(newFailed);
coordinator.clearFailureStates();
// mark all the healthy streamers as external error
Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers();
// we have newly failed streamers, update block for pipeline
final ExtendedBlock newBG = updateBlockForPipeline(healthySet);
// wait till all the healthy streamers to
// 1) get the updated block info
// 2) create new block outputstream
newFailed = waitCreatingNewStreams(healthySet);
if (newFailed.size() + failedStreamers.size() >
numAllBlocks - numDataBlocks) {
throw new IOException(
"Data streamers failed while creating new block streams: "
+ newFailed + ". There are not enough healthy streamers.");
}
for (StripedDataStreamer failedStreamer : newFailed) {
assert !failedStreamer.isHealthy();
}
// TODO we can also succeed if all the failed streamers have not taken
// the updated block
if (newFailed.size() == 0) {
// reset external error state of all the streamers
for (StripedDataStreamer streamer : healthySet) {
assert streamer.isHealthy();
streamer.getErrorState().reset();
}
updatePipeline(newBG);
}
for (int i = 0; i < numAllBlocks; i++) {
coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
}
}
}
private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
Set<StripedDataStreamer> streamers) {
for (StripedDataStreamer streamer : streamers) {
if (!coordinator.updateStreamerMap.containsKey(streamer)) {
if (!streamer.isHealthy() &&
coordinator.getNewBlocks().peek(streamer.getIndex()) != null) {
// this streamer had internal error before getting updated block
failed.add(streamer);
}
}
}
return coordinator.updateStreamerMap.size() + failed.size();
}
private Set<StripedDataStreamer> waitCreatingNewStreams(
Set<StripedDataStreamer> healthyStreamers) throws IOException {
Set<StripedDataStreamer> failed = new HashSet<>();
final int expectedNum = healthyStreamers.size();
final long socketTimeout = dfsClient.getConf().getSocketTimeout();
// the total wait time should be less than the socket timeout, otherwise
// a slow streamer may cause other streamers to timeout. here we wait for
// half of the socket timeout
long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE;
final long waitInterval = 1000;
synchronized (coordinator) {
while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum
&& remaingTime > 0) {
try {
long start = Time.monotonicNow();
coordinator.wait(waitInterval);
remaingTime -= Time.monotonicNow() - start;
} catch (InterruptedException e) {
throw DFSUtilClient.toInterruptedIOException("Interrupted when waiting" +
" for results of updating striped streamers", e);
}
}
}
synchronized (coordinator) {
for (StripedDataStreamer streamer : healthyStreamers) {
if (!coordinator.updateStreamerMap.containsKey(streamer)) {
// close the streamer if it is too slow to create new connection
LOG.info("close the slow stream " + streamer);
streamer.setStreamerAsClosed();
failed.add(streamer);
}
}
}
for (Map.Entry<StripedDataStreamer, Boolean> entry :
coordinator.updateStreamerMap.entrySet()) {
if (!entry.getValue()) {
failed.add(entry.getKey());
}
}
for (StripedDataStreamer failedStreamer : failed) {
healthyStreamers.remove(failedStreamer);
}
return failed;
}
/**
* Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block
* to healthy streamers.
* @param healthyStreamers The healthy data streamers. These streamers join
* the failure handling.
*/
private ExtendedBlock updateBlockForPipeline(
Set<StripedDataStreamer> healthyStreamers) throws IOException {
final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline(
currentBlockGroup, dfsClient.clientName);
final long newGS = updated.getBlock().getGenerationStamp();
ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup);
newBlock.setGenerationStamp(newGS);
final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock) updated, cellSize, numDataBlocks,
numAllBlocks - numDataBlocks);
for (int i = 0; i < numAllBlocks; i++) {
StripedDataStreamer si = getStripedDataStreamer(i);
if (healthyStreamers.contains(si)) {
final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock),
null, null, null, -1, updated.isCorrupt(), null);
lb.setBlockToken(updatedBlks[i].getBlockToken());
coordinator.getNewBlocks().offer(i, lb);
}
}
return newBlock;
}
private void updatePipeline(ExtendedBlock newBG) throws IOException {
final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks];
final String[] newStorageIDs = new String[numAllBlocks];
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer streamer = getStripedDataStreamer(i);
final DatanodeInfo[] nodes = streamer.getNodes();
final String[] storageIDs = streamer.getStorageIDs();
if (streamer.isHealthy() && nodes != null && storageIDs != null) {
newNodes[i] = nodes[0];
newStorageIDs[i] = storageIDs[0];
} else {
newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID);
newStorageIDs[i] = "";
}
}
dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup,
newBG, newNodes, newStorageIDs);
currentBlockGroup = newBG;
}
private int stripeDataSize() {
return numDataBlocks * cellSize;
}
@Override
public void hflush() {
throw new UnsupportedOperationException();
}
@Override
public void hsync() {
throw new UnsupportedOperationException();
}
@Override
protected synchronized void start() {
for (StripedDataStreamer streamer : streamers) {
streamer.start();
}
}
@Override
synchronized void abort() throws IOException {
if (isClosed()) {
return;
}
for (StripedDataStreamer streamer : streamers) {
streamer.getLastException().set(new IOException("Lease timeout of "
+ (dfsClient.getConf().getHdfsTimeout()/1000) +
" seconds expired."));
}
closeThreads(true);
dfsClient.endFileLease(fileId);
}
@Override
boolean isClosed() {
if (closed) {
return true;
}
for(StripedDataStreamer s : streamers) {
if (!s.streamerClosed()) {
return false;
}
}
return true;
}
@Override
protected void closeThreads(boolean force) throws IOException {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
try {
for (StripedDataStreamer streamer : streamers) {
try {
streamer.close(force);
streamer.join();
streamer.closeSocket();
} catch (Exception e) {
try {
handleStreamerFailure("force=" + force, e);
} catch (IOException ioe) {
b.add(ioe);
}
} finally {
streamer.setSocketToNull();
}
}
} finally {
setClosed();
}
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
}
private boolean generateParityCellsForLastStripe() {
final long currentBlockGroupBytes = currentBlockGroup == null ?
0 : currentBlockGroup.getNumBytes();
final long lastStripeSize = currentBlockGroupBytes % stripeDataSize();
if (lastStripeSize == 0) {
return false;
}
final long parityCellSize = lastStripeSize < cellSize?
lastStripeSize : cellSize;
final ByteBuffer[] buffers = cellBuffers.getBuffers();
for (int i = 0; i < numAllBlocks; i++) {
// Pad zero bytes to make all cells exactly the size of parityCellSize
// If internal block is smaller than parity block, pad zero bytes.
// Also pad zero bytes to all parity cells
final int position = buffers[i].position();
assert position <= parityCellSize : "If an internal block is smaller" +
" than parity block, then its last cell should be small than last" +
" parity cell";
for (int j = 0; j < parityCellSize - position; j++) {
buffers[i].put((byte) 0);
}
buffers[i].flip();
}
return true;
}
void writeParityCells() throws IOException {
final ByteBuffer[] buffers = cellBuffers.getBuffers();
//encode the data cells
encode(encoder, numDataBlocks, buffers);
for (int i = numDataBlocks; i < numAllBlocks; i++) {
writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
}
cellBuffers.clear();
}
void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf)
throws IOException {
final StripedDataStreamer current = setCurrentStreamer(index);
final int len = buffer.limit();
final long oldBytes = current.getBytesCurBlock();
if (current.isHealthy()) {
try {
DataChecksum sum = getDataChecksum();
sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
getChecksumSize());
}
} catch(Exception e) {
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
}
}
}
@Override
void setClosed() {
super.setClosed();
for (int i = 0; i < numAllBlocks; i++) {
getStripedDataStreamer(i).release();
}
cellBuffers.release();
}
@Override
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
try {
si.getLastException().check(true);
} catch (IOException e) {
b.add(e);
}
}
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
return;
}
try {
// flush from all upper layers
flushBuffer();
// if the last stripe is incomplete, generate and write parity cells
if (generateParityCellsForLastStripe()) {
writeParityCells();
}
enqueueAllCurrentPackets();
// flush all the data packets
flushAllInternals();
// check failures
checkStreamerFailures();
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) {
try {
if (s.getBytesCurBlock() > 0) {
setCurrentPacketToEmpty();
}
// flush the last "close" packet to Datanode
flushInternal();
} catch(Exception e) {
// TODO for both close and endBlock, we currently do not handle
// failures when sending the last packet. We actually do not need to
// bump GS for this kind of failure. Thus counting the total number
// of failures may be good enough.
}
}
}
closeThreads(false);
try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) {
completeFile(currentBlockGroup);
}
dfsClient.endFileLease(fileId);
} catch (ClosedChannelException ignored) {
} finally {
setClosed();
}
}
private void enqueueAllCurrentPackets() throws IOException {
int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = setCurrentStreamer(i);
if (si.isHealthy() && currentPacket != null) {
try {
enqueueCurrentPacket();
} catch (IOException e) {
handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
}
}
}
setCurrentStreamer(idx);
}
void flushAllInternals() throws IOException {
int current = getCurrentIndex();
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) {
try {
// flush all data to Datanode
flushInternal();
} catch(Exception e) {
handleStreamerFailure("flushInternal " + s, e);
}
}
}
setCurrentStreamer(current);
}
static void sleep(long ms, String op) throws InterruptedIOException {
try {
Thread.sleep(ms);
} catch(InterruptedException ie) {
throw DFSUtilClient.toInterruptedIOException(
"Sleep interrupted during " + op, ie);
}
}
@Override
ExtendedBlock getBlock() {
return currentBlockGroup;
}
}