blob: dc7daf8fa388638da521f0a0849d58e5a872fcf4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.io;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Function;
/**
* Class to read EC encoded data from blocks a stripe at a time, when some of
* the data blocks are not available. The public API for this class is:
*
* readStripe(ByteBuffer[] bufs)
*
* The other inherited public APIs will throw a NotImplementedException. This is
* because this class is intended to only read full stripes into a reusable set
* of bytebuffers, and the tradition read APIs do not facilitate this.
*
* The caller should pass an array of ByteBuffers to readStripe() which:
*
* 1. Have EC DataNum buffers in the array.
* 2. Each buffer should have its position set to zero
* 3. Each buffer should have ecChunkSize remaining
*
* These buffers are either read into directly from the data blocks on the
* datanodes, or they will be reconstructed from parity data using the EC
* decoder.
*
* The EC Decoder expects to receive an array of elements matching EC Data + EC
* Parity elements long. Missing or not needed elements should be set to null
* in the array. The elements should be assigned to the array in EC index order.
*
* Assuming we have n missing data locations, where n <= parity locations, the
* ByteBuffers passed in from the client are either assigned to the decoder
* input array, or they are assigned to the decoder output array, where
* reconstructed data is written. The required number of parity buffers will be
* assigned and added to the decoder input so it has sufficient locations to
* reconstruct the data. After reconstruction the byte buffers received will
* have the data for a full stripe populated, either by reading directly from
* the block or by reconstructing the data.
*
* The buffers are returned "ready to read" with the position at zero and
* remaining() indicating how much data was read. If the remaining data is less
* than a full stripe, the client can simply read upto remaining from each
* buffer in turn. If there is a full stripe, each buffer should have ecChunk
* size remaining.
*/
public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
private static final Logger LOG =
LoggerFactory.getLogger(ECBlockReconstructedStripeInputStream.class);
// List of buffers, data + parity long, needed by the EC decoder. Missing
// or not-need locations will be null.
private ByteBuffer[] decoderInputBuffers;
// Missing chunks are recovered into these buffers.
private ByteBuffer[] decoderOutputBuffers;
// Missing indexes to be recovered into the recovered buffers. Required by the
// EC decoder
private int[] missingIndexes;
// The blockLocation indexes to use to read data into the dataBuffers.
private List<Integer> dataIndexes = new ArrayList<>();
// Data Indexes we have tried to read from, and failed for some reason
private Set<Integer> failedDataIndexes = new HashSet<>();
private ByteBufferPool byteBufferPool;
private RawErasureDecoder decoder;
private boolean initialized = false;
private ExecutorService executor;
@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
ByteBufferPool byteBufferPool,
ExecutorService ecReconstructExecutor) {
super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
refreshFunction, streamFactory);
this.byteBufferPool = byteBufferPool;
this.executor = ecReconstructExecutor;
}
/**
* Provide a list of datanodes that are known to be bad, and no attempt will
* be made to read from them. If too many failed nodes are passed, then the
* reader may not have sufficient locations available to reconstruct the data.
*
* Note this call must be made before any attempt it made to read data,
* as that is when the reader is initialized. Attempting to call this method
* after a read will result in a runtime exception.
*
* @param dns A list of DatanodeDetails that are known to be bad.
*/
public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
if (initialized) {
throw new RuntimeException("Cannot add failed datanodes after the " +
"reader has been initialized");
}
DatanodeDetails[] locations = getDataLocations();
for (DatanodeDetails dn : dns) {
for (int i = 0; i < locations.length; i++) {
if (locations[i] != null && locations[i].equals(dn)) {
failedDataIndexes.add(i);
break;
}
}
}
}
private void init() throws InsufficientLocationsException {
if (decoder == null) {
decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
}
if (decoderInputBuffers == null) {
// The EC decoder needs an array data+parity long, with missing or not
// needed indexes set to null.
decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
}
if (!hasSufficientLocations()) {
throw new InsufficientLocationsException("There are insufficient " +
"datanodes to read the EC block");
}
dataIndexes.clear();
ECReplicationConfig repConfig = getRepConfig();
DatanodeDetails[] locations = getDataLocations();
setMissingIndexesAndDataLocations(locations);
List<Integer> parityIndexes =
selectParityIndexes(locations, missingIndexes.length);
// We read from the selected parity blocks, so add them to the data indexes.
dataIndexes.addAll(parityIndexes);
// The decoder inputs originally start as all nulls. Then we populate the
// pieces we have data for. The parity buffers are reused for the block
// so we can allocated them now. On re-init, we reuse any parity buffers
// already allocated.
for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
if (parityIndexes.contains(i)) {
if (decoderInputBuffers[i] == null) {
decoderInputBuffers[i] = allocateBuffer(repConfig);
}
} else {
decoderInputBuffers[i] = null;
}
}
decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
initialized = true;
}
/**
* Determine which indexes are missing, taking into account the length of the
* block. For a block shorter than a full EC stripe, it is expected that
* some of the data locations will not be present.
* Populates the missingIndex and dataIndexes instance variables.
* @param locations Available locations for the block group
*/
private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
ECReplicationConfig repConfig = getRepConfig();
int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
List<Integer> missingInd = new ArrayList<>();
for (int i = 0; i < repConfig.getData(); i++) {
if ((locations[i] == null || failedDataIndexes.contains(i))
&& i < expectedDataBlocks) {
missingInd.add(i);
} else if (locations[i] != null && !failedDataIndexes.contains(i)) {
dataIndexes.add(i);
}
}
missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
}
private void assignBuffers(ByteBuffer[] bufs) {
ECReplicationConfig repConfig = getRepConfig();
Preconditions.assertTrue(bufs.length == repConfig.getData());
int recoveryIndex = 0;
// Here bufs come from the caller and will be filled with data read from
// the blocks or recovered. Therefore, if the index is missing, we assign
// the buffer to the decoder outputs, where data is recovered via EC
// decoding. Otherwise the buffer is set to the input. Note, it may be a
// buffer which needs padded.
for (int i = 0; i < repConfig.getData(); i++) {
if (isMissingIndex(i)) {
decoderOutputBuffers[recoveryIndex++] = bufs[i];
decoderInputBuffers[i] = null;
} else {
decoderInputBuffers[i] = bufs[i];
}
}
}
private boolean isMissingIndex(int ind) {
for (int i : missingIndexes) {
if (i == ind) {
return true;
}
}
return false;
}
/**
* This method should be passed a list of byteBuffers which must contain EC
* Data Number entries. Each Bytebuffer should be at position 0 and have EC
* ChunkSize bytes remaining. After returning, the buffers will contain the
* data for the next stripe in the block. The buffers will be returned
* "ready to read" with their position set to zero and the limit set
* according to how much data they contain.
*
* @param bufs A list of byteBuffers which must contain EC Data Number
* entries. Each Bytebuffer should be at position 0 and have
* EC ChunkSize bytes remaining.
*
* @return The number of bytes read
* @throws IOException
*/
public synchronized int readStripe(ByteBuffer[] bufs) throws IOException {
int toRead = (int)Math.min(getRemaining(), getStripeSize());
if (toRead == 0) {
return EOF;
}
if (!initialized) {
init();
}
validateBuffers(bufs);
while (true) {
try {
assignBuffers(bufs);
clearParityBuffers();
// Set the read limits on the buffers so we do not read any garbage data
// from the end of the block that is unexpected.
setBufferReadLimits(bufs, toRead);
loadDataBuffersFromStream();
break;
} catch (IOException e) {
// Re-init now the bad block has been excluded. If we have ran out of
// locations, init will throw an InsufficientLocations exception.
init();
// seek to the current position so it rewinds any blocks we read
// already.
seek(getPos());
// Reset the input positions back to zero
for (ByteBuffer b : bufs) {
b.position(0);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting for reads to complete", ie);
}
}
if (missingIndexes.length > 0) {
padBuffers(toRead);
flipInputs();
decodeStripe();
// Reset the buffer positions and limits to remove any padding added
// before EC Decode.
setBufferReadLimits(bufs, toRead);
} else {
// If we have no missing indexes, then the buffers will be at their
// limits after reading so we need to flip them to ensure they are ready
// to read by the caller.
flipInputs();
}
setPos(getPos() + toRead);
if (remaining() == 0) {
// If we reach the end of the block (ie remaining is zero) we free
// the underlying streams and buffers. This is because KeyInputStream,
// which reads from the EC streams does not close the blocks until it has
// read all blocks in the key.
freeAllResourcesWithoutClosing();
}
return toRead;
}
private void validateBuffers(ByteBuffer[] bufs) {
Preconditions.assertTrue(bufs.length == getRepConfig().getData());
int chunkSize = getRepConfig().getEcChunkSize();
for (ByteBuffer b : bufs) {
Preconditions.assertTrue(b.remaining() == chunkSize);
}
}
private void padBuffers(int toRead) {
int dataNum = getRepConfig().getData();
int parityNum = getRepConfig().getParity();
int chunkSize = getRepConfig().getEcChunkSize();
if (toRead >= getStripeSize()) {
// There is no padding to do - we are reading a full stripe.
return;
}
int fullChunks = toRead / chunkSize;
// The size of each chunk is governed by the size of the first chunk.
// The parity always matches the first chunk size.
int paritySize = Math.min(toRead, chunkSize);
// We never need to pad the first chunk - its length dictates the length
// of all others.
fullChunks = Math.max(1, fullChunks);
for (int i = fullChunks; i < dataNum; i++) {
ByteBuffer buf = decoderInputBuffers[i];
if (buf != null) {
buf.limit(paritySize);
zeroFill(buf);
}
}
// Ensure the available parity buffers are the expected length
for (int i = dataNum; i < dataNum + parityNum; i++) {
ByteBuffer b = decoderInputBuffers[i];
if (b != null) {
Preconditions.assertTrue(b.position() == paritySize);
}
}
// The output buffers need their limit set to the parity size
for (ByteBuffer b : decoderOutputBuffers) {
b.limit(paritySize);
}
}
private void setBufferReadLimits(ByteBuffer[] bufs, int toRead) {
int chunkSize = getRepConfig().getEcChunkSize();
int fullChunks = toRead / chunkSize;
if (fullChunks == getRepConfig().getData()) {
// We are reading a full stripe, no concerns over padding.
return;
}
if (fullChunks == 0) {
bufs[0].limit(toRead);
// All buffers except the first contain no data.
for (int i = 1; i < bufs.length; i++) {
bufs[i].position(0);
bufs[i].limit(0);
}
// If we have less than 1 chunk, then the parity buffers are the size
// of the first chunk.
for (int i = getRepConfig().getData();
i < getRepConfig().getRequiredNodes(); i++) {
ByteBuffer b = decoderInputBuffers[i];
if (b != null) {
b.limit(toRead);
}
}
} else {
int remainingLength = toRead % chunkSize;
// The first partial has the remaining length
bufs[fullChunks].limit(remainingLength);
// All others have a zero limit
for (int i = fullChunks + 1; i < bufs.length; i++) {
bufs[i].position(0);
bufs[i].limit(0);
}
}
}
private void zeroFill(ByteBuffer buf) {
// fill with zeros from pos to limit.
if (buf.hasArray()) {
byte[] a = buf.array();
Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
buf.position(buf.limit());
} else {
while (buf.hasRemaining()) {
buf.put((byte)0);
}
}
}
/**
* Take the parity indexes which are already used, and the others which are
* available, and select random indexes to meet numRequired. The resulting
* list is sorted in ascending order of the indexes.
* @param locations The list of locations for all blocks in the block group/
* @param numRequired The number of parity chunks needed for reconstruction
* @return A list of indexes indicating which parity locations to read.
*/
private List<Integer> selectParityIndexes(
DatanodeDetails[] locations, int numRequired) {
List<Integer> indexes = new ArrayList<>();
List<Integer> selected = new ArrayList<>();
ECReplicationConfig repConfig = getRepConfig();
for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
if (locations[i] != null && !failedDataIndexes.contains(i)
&& decoderInputBuffers[i] == null) {
indexes.add(i);
}
// If we are re-initializing, we want to make sure we are re-using any
// previously selected good parity indexes, as the block stream is already
// opened.
if (decoderInputBuffers[i] != null && !failedDataIndexes.contains(i)) {
selected.add(i);
}
}
Preconditions.assertTrue(indexes.size() + selected.size() >= numRequired);
Random rand = new Random();
while (selected.size() < numRequired) {
selected.add(indexes.remove(rand.nextInt(indexes.size())));
}
Collections.sort(selected);
return selected;
}
private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
ByteBuffer buf = byteBufferPool.getBuffer(
false, repConfig.getEcChunkSize());
return buf;
}
private void flipInputs() {
for (ByteBuffer b : decoderInputBuffers) {
if (b != null) {
b.flip();
}
}
}
private void clearParityBuffers() {
for (int i = getRepConfig().getData();
i < getRepConfig().getRequiredNodes(); i++) {
if (decoderInputBuffers[i] != null) {
decoderInputBuffers[i].clear();
}
}
}
protected void loadDataBuffersFromStream()
throws IOException, InterruptedException {
Queue<ImmutablePair<Integer, Future<Void>>> pendingReads
= new ArrayDeque<>();
for (int i : dataIndexes) {
pendingReads.add(new ImmutablePair<>(i, executor.submit(() -> {
readIntoBuffer(i, decoderInputBuffers[i]);
return null;
})));
}
boolean exceptionOccurred = false;
while (!pendingReads.isEmpty()) {
int index = -1;
try {
ImmutablePair<Integer, Future<Void>> pair = pendingReads.poll();
index = pair.getKey();
// Should this future.get() have a timeout? At the end of the call chain
// we eventually call a grpc or ratis client to read the block data. Its
// the call to the DNs which could potentially block. There is a timeout
// on that call controlled by:
// OZONE_CLIENT_READ_TIMEOUT = "ozone.client.read.timeout";
// Which defaults to 30s. So if there is a DN communication problem, it
// should timeout in the client which should propagate up the stack as
// an IOException.
pair.getValue().get();
} catch (ExecutionException ee) {
LOG.warn("Failed to read from block {} EC index {}. Excluding the " +
"block", getBlockID(), index + 1, ee.getCause());
failedDataIndexes.add(index);
exceptionOccurred = true;
} catch (InterruptedException ie) {
// Catch each InterruptedException to ensure all the futures have been
// handled, and then throw the exception later
LOG.debug("Interrupted while waiting for reads to complete", ie);
Thread.currentThread().interrupt();
}
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException(
"Interrupted while waiting for reads to complete");
}
if (exceptionOccurred) {
throw new IOException("One or more errors occurred reading blocks");
}
}
private void readIntoBuffer(int ind, ByteBuffer buf) throws IOException {
BlockExtendedInputStream stream = getOrOpenStream(ind);
seekStreamIfNecessary(stream, 0);
while (buf.hasRemaining()) {
int read = stream.read(buf);
if (read == EOF) {
// We should not reach EOF, as the block should have enough data to
// fill the buffer. If the block does not, then it indicates the
// block is not as long as it should be, based on the block length
// stored in OM. Therefore if there is any remaining space in the
// buffer, we should throw an exception.
if (buf.hasRemaining()) {
throw new IOException("Expected to read " + buf.remaining() +
" bytes from block " + getBlockID() + " EC index " + (ind + 1) +
" but reached EOF");
}
break;
}
}
}
/**
* Take the populated input buffers and missing indexes and create the
* outputs. Note that the input buffers have to be "ready for read", ie they
* need to have been flipped after their data was loaded. The created outputs
* are "ready to read" by the underlying decoder API, so there is no need to
* flip them after the call. The decoder reads all the inputs leaving the
* buffer position at the end, so the inputs are flipped after the decode so
* we have a complete set of "outputs" for the EC Stripe which are ready to
* read.
* @throws IOException
*/
private void decodeStripe() throws IOException {
decoder.decode(decoderInputBuffers, missingIndexes, decoderOutputBuffers);
flipInputs();
}
@Override
public synchronized boolean hasSufficientLocations() {
// The number of locations needed is a function of the EC Chunk size. If the
// block length is <= the chunk size, we should only have one data location.
// If it is greater than the chunk size but less than chunk_size * 2, then
// we must have two locations. If it is greater than chunk_size * data_num,
// then we must have all data_num locations.
// The remaining data locations (for small block lengths) can be assumed to
// be all zeros.
// Then we need a total of dataNum blocks available across the available
// data, parity and padding blocks.
ECReplicationConfig repConfig = getRepConfig();
int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
int availableLocations =
availableDataLocations(expectedDataBlocks) + availableParityLocations();
int paddedLocations = repConfig.getData() - expectedDataBlocks;
int failedLocations = failedDataIndexes.size();
if (availableLocations + paddedLocations - failedLocations
>= repConfig.getData()) {
return true;
} else {
LOG.error("There are insufficient locations. {} available; {} padded;" +
" {} failed; {} expected;", availableLocations, paddedLocations,
failedLocations, expectedDataBlocks);
return false;
}
}
@Override
protected int readWithStrategy(ByteReaderStrategy strategy) {
throw new NotImplementedException("readWithStrategy is not implemented. " +
"Use readStripe() instead");
}
@Override
public synchronized void close() {
super.close();
freeBuffers();
}
@Override
public synchronized void unbuffer() {
super.unbuffer();
freeBuffers();
}
private void freeBuffers() {
// Inside this class, we only allocate buffers to read parity into. Data
// is reconstructed or read into a set of buffers passed in from the calling
// class. Therefore we only need to ensure we free the parity buffers here.
if (decoderInputBuffers != null) {
for (int i = getRepConfig().getData();
i < getRepConfig().getRequiredNodes(); i++) {
ByteBuffer buf = decoderInputBuffers[i];
if (buf != null) {
byteBufferPool.putBuffer(buf);
decoderInputBuffers[i] = null;
}
}
}
initialized = false;
}
private void freeAllResourcesWithoutClosing() throws IOException {
LOG.debug("Freeing all resources while leaving the block open");
freeBuffers();
closeStreams();
}
@Override
public synchronized void seek(long pos) throws IOException {
if (pos % getStripeSize() != 0) {
// As this reader can only return full stripes, we only seek to the start
// stripe offsets
throw new IOException("Requested position " + pos
+ " does not align with a stripe offset");
}
super.seek(pos);
}
}