blob: f6a5ece26557f69508e1bbba67473cba9ab3c88d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSPacket;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemoteBlockReader2;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
/**
* ErasureCodingWorker handles the erasure coding recovery work commands. These
* commands would be issued from Namenode as part of Datanode's heart beat
* response. BPOfferService delegates the work to this class for handling EC
* commands.
*/
public final class ErasureCodingWorker {
private static final Log LOG = DataNode.LOG;
private final DataNode datanode;
private final Configuration conf;
private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int STRIPED_READ_THRESHOLD_MILLIS;
private final int STRIPED_READ_BUFFER_SIZE;
public ErasureCodingWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
this.conf = conf;
STRIPED_READ_THRESHOLD_MILLIS = conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT);
initializeStripedReadThreadPool(conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
STRIPED_READ_BUFFER_SIZE = conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
}
private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits);
}
private void initializeStripedReadThreadPool(int num) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using striped reads; pool threads=" + num);
}
STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedRead-" + threadIndex.getAndIncrement());
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
LOG.info("Execution for striped reading rejected, "
+ "Executing in current thread");
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
}
/**
* Handles the Erasure Coding recovery work commands.
*
* @param ecTasks
* BlockECRecoveryInfo
*/
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
try {
new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start();
} catch (Throwable e) {
LOG.warn("Failed to recover striped block " +
recoveryInfo.getExtendedBlock().getLocalBlock(), e);
}
}
}
/**
* ReconstructAndTransferBlock recover one or more missed striped block in the
* striped block group, the minimum number of live striped blocks should be
* no less than data block number.
*
* | <- Striped Block Group -> |
* blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group
* | | | |
* v v v v
* +------+ +------+ +------+ +------+
* |cell_0| |cell_1| |cell_2| |cell_3| ...
* +------+ +------+ +------+ +------+
* |cell_4| |cell_5| |cell_6| |cell_7| ...
* +------+ +------+ +------+ +------+
* |cell_8| |cell_9| |cell10| |cell11| ...
* +------+ +------+ +------+ +------+
* ... ... ... ...
*
*
* We use following steps to recover striped block group, in each round, we
* recover <code>bufferSize</code> data until finish, the
* <code>bufferSize</code> is configurable and may be less or larger than
* cell size:
* step1: read <code>bufferSize</code> data from minimum number of sources
* required by recovery.
* step2: decode data for targets.
* step3: transfer data to targets.
*
* In step1, try to read <code>bufferSize</code> data from minimum number
* of sources , if there is corrupt or stale sources, read from new source
* will be scheduled. The best sources are remembered for next round and
* may be updated in each round.
*
* In step2, typically if source blocks we read are all data blocks, we
* need to call encode, and if there is one parity block, we need to call
* decode. Notice we only read once and recover all missed striped block
* if they are more than one.
*
* In step3, send the recovered data to targets by constructing packet
* and send them directly. Same as continuous block replication, we
* don't check the packet ack. Since the datanode doing the recovery work
* are one of the source datanodes, so the recovered data are sent
* remotely.
*
* There are some points we can do further improvements in next phase:
* 1. we can read the block file directly on the local datanode,
* currently we use remote block reader. (Notice short-circuit is not
* a good choice, see inline comments).
* 2. We need to check the packet ack for EC recovery? Since EC recovery
* is more expensive than continuous block replication, it needs to
* read from several other datanodes, should we make sure the
* recovered result received by targets?
*/
private class ReconstructAndTransferBlock implements Runnable {
private final int dataBlkNum;
private final int parityBlkNum;
private final int cellSize;
private RawErasureDecoder decoder;
// Striped read buffer size
private int bufferSize;
private final ExtendedBlock blockGroup;
private final int minRequiredSources;
// position in striped internal block
private long positionInBlock;
// sources
private final short[] liveIndices;
private final DatanodeInfo[] sources;
private final List<StripedReader> stripedReaders;
// The buffers and indices for striped blocks whose length is 0
private ByteBuffer[] zeroStripeBuffers;
private short[] zeroStripeIndices;
// targets
private final DatanodeInfo[] targets;
private final StorageType[] targetStorageTypes;
private final short[] targetIndices;
private final ByteBuffer[] targetBuffers;
private final Socket[] targetSockets;
private final DataOutputStream[] targetOutputStreams;
private final DataInputStream[] targetInputStreams;
private final long[] blockOffset4Targets;
private final long[] seqNo4Targets;
private final static int WRITE_PACKET_SIZE = 64 * 1024;
private DataChecksum checksum;
private int maxChunksPerPacket;
private byte[] packetBuf;
private byte[] checksumBuf;
private int bytesPerChecksum;
private int checksumSize;
private final CachingStrategy cachingStrategy;
private final Map<Future<Void>, Integer> futures = new HashMap<>();
private final CompletionService<Void> readService =
new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL);
ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy();
dataBlkNum = ecPolicy.getNumDataUnits();
parityBlkNum = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
blockGroup = recoveryInfo.getExtendedBlock();
final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1);
minRequiredSources = Math.min(cellsNum, dataBlkNum);
liveIndices = recoveryInfo.getLiveBlockIndices();
sources = recoveryInfo.getSourceDnInfos();
stripedReaders = new ArrayList<>(sources.length);
Preconditions.checkArgument(liveIndices.length >= minRequiredSources,
"No enough live striped blocks.");
Preconditions.checkArgument(liveIndices.length == sources.length,
"liveBlockIndices and source dns should match");
if (minRequiredSources < dataBlkNum) {
zeroStripeBuffers =
new ByteBuffer[dataBlkNum - minRequiredSources];
zeroStripeIndices = new short[dataBlkNum - minRequiredSources];
}
targets = recoveryInfo.getTargetDnInfos();
targetStorageTypes = recoveryInfo.getTargetStorageTypes();
targetIndices = new short[targets.length];
targetBuffers = new ByteBuffer[targets.length];
Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
"Too much missed striped blocks.");
targetSockets = new Socket[targets.length];
targetOutputStreams = new DataOutputStream[targets.length];
targetInputStreams = new DataInputStream[targets.length];
blockOffset4Targets = new long[targets.length];
seqNo4Targets = new long[targets.length];
for (int i = 0; i < targets.length; i++) {
blockOffset4Targets[i] = 0;
seqNo4Targets[i] = 0;
}
getTargetIndices();
cachingStrategy = CachingStrategy.newDefaultStrategy();
}
private ByteBuffer allocateBuffer(int length) {
return ByteBuffer.allocate(length);
}
private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
dataBlkNum, i);
}
private long getBlockLen(ExtendedBlock blockGroup, int i) {
return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
cellSize, dataBlkNum, i);
}
/**
* StripedReader is used to read from one source DN, it contains a block
* reader, buffer and striped block index.
* Only allocate StripedReader once for one source, and the StripedReader
* has the same array order with sources. Typically we only need to allocate
* minimum number (minRequiredSources) of StripedReader, and allocate
* new for new source DN if some existing DN invalid or slow.
* If some source DN is corrupt, set the corresponding blockReader to
* null and will never read from it again.
*
* @param i the array index of sources
* @param offsetInBlock offset for the internal block
* @return StripedReader
*/
private StripedReader addStripedReader(int i, long offsetInBlock) {
StripedReader reader = new StripedReader(liveIndices[i]);
stripedReaders.add(reader);
BlockReader blockReader = newBlockReader(
getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
if (blockReader != null) {
initChecksumAndBufferSizeIfNeeded(blockReader);
reader.blockReader = blockReader;
}
reader.buffer = allocateBuffer(bufferSize);
return reader;
}
@Override
public void run() {
datanode.incrementXmitsInProgress();
try {
// Store the array indices of source DNs we have read successfully.
// In each iteration of read, the success list may be updated if
// some source DN is corrupted or slow. And use the updated success
// list of DNs for next iteration read.
int[] success = new int[minRequiredSources];
int nsuccess = 0;
for (int i = 0;
i < sources.length && nsuccess < minRequiredSources; i++) {
StripedReader reader = addStripedReader(i, 0);
if (reader.blockReader != null) {
success[nsuccess++] = i;
}
}
if (nsuccess < minRequiredSources) {
String error = "Can't find minimum sources required by "
+ "recovery, block id: " + blockGroup.getBlockId();
throw new IOException(error);
}
if (zeroStripeBuffers != null) {
for (int i = 0; i < zeroStripeBuffers.length; i++) {
zeroStripeBuffers[i] = allocateBuffer(bufferSize);
}
}
for (int i = 0; i < targets.length; i++) {
targetBuffers[i] = allocateBuffer(bufferSize);
}
checksumSize = checksum.getChecksumSize();
int chunkSize = bytesPerChecksum + checksumSize;
maxChunksPerPacket = Math.max(
(WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1);
int maxPacketSize = chunkSize * maxChunksPerPacket
+ PacketHeader.PKT_MAX_HEADER_LEN;
packetBuf = new byte[maxPacketSize];
checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
// targetsStatus store whether some target is success, it will record
// any failed target once, if some target failed (invalid DN or transfer
// failed), will not transfer data to it any more.
boolean[] targetsStatus = new boolean[targets.length];
if (initTargetStreams(targetsStatus) == 0) {
String error = "All targets are failed.";
throw new IOException(error);
}
long firstStripedBlockLength = getBlockLen(blockGroup, 0);
while (positionInBlock < firstStripedBlockLength) {
int toRead = Math.min(
bufferSize, (int)(firstStripedBlockLength - positionInBlock));
// step1: read from minimum source DNs required for reconstruction.
// The returned success list is the source DNs we do real read from
success = readMinimumStripedData4Recovery(success);
// step2: decode to reconstruct targets
long remaining = firstStripedBlockLength - positionInBlock;
int toRecoverLen = remaining < bufferSize ?
(int)remaining : bufferSize;
recoverTargets(success, targetsStatus, toRecoverLen);
// step3: transfer data
if (transferData2Targets(targetsStatus) == 0) {
String error = "Transfer failed for all targets.";
throw new IOException(error);
}
clearBuffers();
positionInBlock += toRead;
}
endTargetBlocks(targetsStatus);
// Currently we don't check the acks for packets, this is similar as
// block replication.
} catch (Throwable e) {
LOG.warn("Failed to recover striped block: " + blockGroup, e);
} finally {
datanode.decrementXmitsInProgress();
// close block readers
for (StripedReader stripedReader : stripedReaders) {
closeBlockReader(stripedReader.blockReader);
}
for (int i = 0; i < targets.length; i++) {
IOUtils.closeStream(targetOutputStreams[i]);
IOUtils.closeStream(targetInputStreams[i]);
IOUtils.closeStream(targetSockets[i]);
}
}
}
// init checksum from block reader
private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
if (checksum == null) {
checksum = blockReader.getDataChecksum();
bytesPerChecksum = checksum.getBytesPerChecksum();
// The bufferSize is flat to divide bytesPerChecksum
int readBufferSize = STRIPED_READ_BUFFER_SIZE;
bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
readBufferSize - readBufferSize % bytesPerChecksum;
} else {
assert blockReader.getDataChecksum().equals(checksum);
}
}
private void getTargetIndices() {
BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
for (int i = 0; i < sources.length; i++) {
bitset.set(liveIndices[i]);
}
int m = 0;
int k = 0;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
if (!bitset.get(i)) {
if (getBlockLen(blockGroup, i) > 0) {
if (m < targets.length) {
targetIndices[m++] = (short)i;
}
} else {
zeroStripeIndices[k++] = (short)i;
}
}
}
}
private long getReadLength(int index) {
long blockLen = getBlockLen(blockGroup, index);
long remaining = blockLen - positionInBlock;
return remaining > bufferSize ? bufferSize : remaining;
}
/**
* Read from minimum source DNs required for reconstruction in the iteration.
* First try the success list which we think they are the best DNs
* If source DN is corrupt or slow, try to read some other source DN,
* and will update the success list.
*
* Remember the updated success list and return it for following
* operations and next iteration read.
*
* @param success the initial success list of source DNs we think best
* @return updated success list of source DNs we do real read
* @throws IOException
*/
private int[] readMinimumStripedData4Recovery(final int[] success)
throws IOException {
int nsuccess = 0;
int[] newSuccess = new int[minRequiredSources];
BitSet used = new BitSet(sources.length);
/*
* Read from minimum source DNs required, the success list contains
* source DNs which we think best.
*/
for (int i = 0; i < minRequiredSources; i++) {
StripedReader reader = stripedReaders.get(success[i]);
if (getReadLength(liveIndices[success[i]]) > 0) {
Callable<Void> readCallable = readFromBlock(
reader.blockReader, reader.buffer);
Future<Void> f = readService.submit(readCallable);
futures.put(f, success[i]);
} else {
// If the read length is 0, we don't need to do real read
reader.buffer.position(0);
newSuccess[nsuccess++] = success[i];
}
used.set(success[i]);
}
while (!futures.isEmpty()) {
try {
StripingChunkReadResult result =
StripedBlockUtil.getNextCompletedStripedRead(
readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
int resultIndex = -1;
if (result.state == StripingChunkReadResult.SUCCESSFUL) {
resultIndex = result.index;
} else if (result.state == StripingChunkReadResult.FAILED) {
// If read failed for some source DN, we should not use it anymore
// and schedule read from another source DN.
StripedReader failedReader = stripedReaders.get(result.index);
closeBlockReader(failedReader.blockReader);
failedReader.blockReader = null;
resultIndex = scheduleNewRead(used);
} else if (result.state == StripingChunkReadResult.TIMEOUT) {
// If timeout, we also schedule a new read.
resultIndex = scheduleNewRead(used);
}
if (resultIndex >= 0) {
newSuccess[nsuccess++] = resultIndex;
if (nsuccess >= minRequiredSources) {
// cancel remaining reads if we read successfully from minimum
// number of source DNs required by reconstruction.
cancelReads(futures.keySet());
futures.clear();
break;
}
}
} catch (InterruptedException e) {
LOG.info("Read data interrupted.", e);
break;
}
}
if (nsuccess < minRequiredSources) {
String error = "Can't read data from minimum number of sources "
+ "required by reconstruction, block id: " + blockGroup.getBlockId();
throw new IOException(error);
}
return newSuccess;
}
private void paddingBufferToLen(ByteBuffer buffer, int len) {
int toPadding = len - buffer.position();
for (int i = 0; i < toPadding; i++) {
buffer.put((byte) 0);
}
}
// Initialize decoder
private void initDecoderIfNecessary() {
if (decoder == null) {
decoder = newDecoder(dataBlkNum, parityBlkNum);
}
}
private int[] getErasedIndices(boolean[] targetsStatus) {
int[] result = new int[targets.length];
int m = 0;
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
result[m++] = convertIndex4Decode(targetIndices[i],
dataBlkNum, parityBlkNum);
}
}
return Arrays.copyOf(result, m);
}
private void recoverTargets(int[] success, boolean[] targetsStatus,
int toRecoverLen) {
initDecoderIfNecessary();
ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
for (int i = 0; i < success.length; i++) {
StripedReader reader = stripedReaders.get(success[i]);
ByteBuffer buffer = reader.buffer;
paddingBufferToLen(buffer, toRecoverLen);
inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] =
(ByteBuffer)buffer.flip();
}
if (success.length < dataBlkNum) {
for (int i = 0; i < zeroStripeBuffers.length; i++) {
ByteBuffer buffer = zeroStripeBuffers[i];
paddingBufferToLen(buffer, toRecoverLen);
int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum,
parityBlkNum);
inputs[index] = (ByteBuffer)buffer.flip();
}
}
int[] erasedIndices = getErasedIndices(targetsStatus);
ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length];
int m = 0;
for (int i = 0; i < targetBuffers.length; i++) {
if (targetsStatus[i]) {
outputs[m++] = targetBuffers[i];
outputs[i].limit(toRecoverLen);
}
}
decoder.decode(inputs, erasedIndices, outputs);
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
long blockLen = getBlockLen(blockGroup, targetIndices[i]);
long remaining = blockLen - positionInBlock;
if (remaining < 0) {
targetBuffers[i].limit(0);
} else if (remaining < toRecoverLen) {
targetBuffers[i].limit((int)remaining);
}
}
}
}
/**
* Schedule a read from some new source DN if some DN is corrupted
* or slow, this is called from the read iteration.
* Initially we may only have <code>minRequiredSources</code> number of
* StripedReader.
* If the position is at the end of target block, don't need to do
* real read, and return the array index of source DN, otherwise -1.
*
* @param used the used source DNs in this iteration.
* @return the array index of source DN if don't need to do real read.
*/
private int scheduleNewRead(BitSet used) {
StripedReader reader = null;
// step1: initially we may only have <code>minRequiredSources</code>
// number of StripedReader, and there may be some source DNs we never
// read before, so will try to create StripedReader for one new source DN
// and try to read from it. If found, go to step 3.
int m = stripedReaders.size();
while (reader == null && m < sources.length) {
reader = addStripedReader(m, positionInBlock);
if (getReadLength(liveIndices[m]) > 0) {
if (reader.blockReader == null) {
reader = null;
m++;
}
} else {
used.set(m);
return m;
}
}
// step2: if there is no new source DN we can use, try to find a source
// DN we ever read from but because some reason, e.g., slow, it
// is not in the success DN list at the begin of this iteration, so
// we have not tried it in this iteration. Now we have a chance to
// revisit it again.
for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
if (!used.get(i)) {
StripedReader r = stripedReaders.get(i);
if (getReadLength(liveIndices[i]) > 0) {
closeBlockReader(r.blockReader);
r.blockReader = newBlockReader(
getBlock(blockGroup, liveIndices[i]), positionInBlock,
sources[i]);
if (r.blockReader != null) {
m = i;
reader = r;
}
} else {
used.set(i);
r.buffer.position(0);
return i;
}
}
}
// step3: schedule if find a correct source DN and need to do real read.
if (reader != null) {
Callable<Void> readCallable = readFromBlock(
reader.blockReader, reader.buffer);
Future<Void> f = readService.submit(readCallable);
futures.put(f, m);
used.set(m);
}
return -1;
}
// cancel all reads.
private void cancelReads(Collection<Future<Void>> futures) {
for (Future<Void> future : futures) {
future.cancel(true);
}
}
private Callable<Void> readFromBlock(final BlockReader reader,
final ByteBuffer buf) {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
actualReadFromBlock(reader, buf);
return null;
} catch (IOException e) {
LOG.info(e.getMessage());
throw e;
}
}
};
}
/**
* Read bytes from block
*/
private void actualReadFromBlock(BlockReader reader, ByteBuffer buf)
throws IOException {
int len = buf.remaining();
int n = 0;
while (n < len) {
int nread = reader.read(buf);
if (nread <= 0) {
break;
}
n += nread;
}
}
// close block reader
private void closeBlockReader(BlockReader blockReader) {
try {
if (blockReader != null) {
blockReader.close();
}
} catch (IOException e) {
// ignore
}
}
private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
return NetUtils.createSocketAddr(dnInfo.getXferAddr(
datanode.getDnConf().getConnectToDnViaHostname()));
}
private BlockReader newBlockReader(final ExtendedBlock block,
long offsetInBlock, DatanodeInfo dnInfo) {
if (offsetInBlock >= block.getNumBytes()) {
return null;
}
try {
InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
/*
* This can be further improved if the replica is local, then we can
* read directly from DN and need to check the replica is FINALIZED
* state, notice we should not use short-circuit local read which
* requires config for domain-socket in UNIX or legacy config in Windows.
*/
return RemoteBlockReader2.newBlockReader(
"dummy", block, blockToken, offsetInBlock,
block.getNumBytes() - offsetInBlock, true,
"", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
null, cachingStrategy);
} catch (IOException e) {
return null;
}
}
private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
final int socketTimeout = datanode.getDnConf().getSocketTimeout();
try {
sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
NetUtils.connect(sock, addr, socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(),
sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
blockToken, datanodeId);
peer.setReadTimeout(socketTimeout);
success = true;
return peer;
} finally {
if (!success) {
IOUtils.cleanup(LOG, peer);
IOUtils.closeSocket(sock);
}
}
}
/**
* Send data to targets
*/
private int transferData2Targets(boolean[] targetsStatus) {
int nsuccess = 0;
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
boolean success = false;
try {
ByteBuffer buffer = targetBuffers[i];
if (buffer.remaining() == 0) {
continue;
}
checksum.calculateChunkedSums(
buffer.array(), 0, buffer.remaining(), checksumBuf, 0);
int ckOff = 0;
while (buffer.remaining() > 0) {
DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket,
blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false);
int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum;
int toWrite = buffer.remaining() > maxBytesToPacket ?
maxBytesToPacket : buffer.remaining();
int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize;
packet.writeChecksum(checksumBuf, ckOff, ckLen);
ckOff += ckLen;
packet.writeData(buffer, toWrite);
// Send packet
packet.writeTo(targetOutputStreams[i]);
blockOffset4Targets[i] += toWrite;
nsuccess++;
success = true;
}
} catch (IOException e) {
LOG.warn(e.getMessage());
}
targetsStatus[i] = success;
}
}
return nsuccess;
}
/**
* clear all buffers
*/
private void clearBuffers() {
for (StripedReader stripedReader : stripedReaders) {
if (stripedReader.buffer != null) {
stripedReader.buffer.clear();
}
}
if (zeroStripeBuffers != null) {
for (int i = 0; i < zeroStripeBuffers.length; i++) {
zeroStripeBuffers[i].clear();
}
}
for (int i = 0; i < targetBuffers.length; i++) {
if (targetBuffers[i] != null) {
cleanBuffer(targetBuffers[i]);
}
}
}
private ByteBuffer cleanBuffer(ByteBuffer buffer) {
Arrays.fill(buffer.array(), (byte) 0);
return (ByteBuffer)buffer.clear();
}
// send an empty packet to mark the end of the block
private void endTargetBlocks(boolean[] targetsStatus) {
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
try {
DFSPacket packet = new DFSPacket(packetBuf, 0,
blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true);
packet.writeTo(targetOutputStreams[i]);
targetOutputStreams[i].flush();
} catch (IOException e) {
LOG.warn(e.getMessage());
}
}
}
}
/**
* Initialize output/input streams for transferring data to target
* and send create block request.
*/
private int initTargetStreams(boolean[] targetsStatus) {
int nsuccess = 0;
for (int i = 0; i < targets.length; i++) {
Socket socket = null;
DataOutputStream out = null;
DataInputStream in = null;
boolean success = false;
try {
InetSocketAddress targetAddr =
getSocketAddress4Transfer(targets[i]);
socket = datanode.newSocket();
NetUtils.connect(socket, targetAddr,
datanode.getDnConf().getSocketTimeout());
socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
ExtendedBlock block = getBlock(blockGroup, targetIndices[i]);
Token<BlockTokenIdentifier> blockToken =
datanode.getBlockAccessToken(block,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(socket);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
IOStreamPair saslStreams = datanode.getSaslClient().socketSend(
socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtil.getSmallBufferSize(conf)));
in = new DataInputStream(unbufIn);
DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId());
new Sender(out).writeBlock(block, targetStorageTypes[i],
blockToken, "", new DatanodeInfo[]{targets[i]},
new StorageType[]{targetStorageTypes[i]}, source,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
checksum, cachingStrategy, false, false, null);
targetSockets[i] = socket;
targetOutputStreams[i] = out;
targetInputStreams[i] = in;
nsuccess++;
success = true;
} catch (Throwable e) {
LOG.warn(e.getMessage());
} finally {
if (!success) {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeStream(socket);
}
}
targetsStatus[i] = success;
}
return nsuccess;
}
}
private static class StripedReader {
private final short index; // internal block index
private BlockReader blockReader;
private ByteBuffer buffer;
private StripedReader(short index) {
this.index = index;
}
}
}