blob: 9c034538c5720b6f23550a5df6c1c4b5b3d049fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Supplier;
/**
* An {@link InputStream} called from BlockInputStream to read a chunk from the
* container. Each chunk may contain multiple underlying {@link ByteBuffer}
* instances.
*/
public class ChunkInputStream extends InputStream
implements Seekable, CanUnbuffer {
private ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private final XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final Supplier<Pipeline> pipelineSupplier;
private boolean verifyChecksum;
private boolean allocated = false;
// Buffer to store the chunk data read from the DN container
private List<ByteBuffer> buffers;
// Index of the buffers corresponding to the current position of the buffers
private int bufferIndex;
// The offset of the current data residing in the buffers w.r.t the start
// of chunk data
private long bufferOffset;
// The number of bytes of chunk data residing in the buffers currently
private long bufferLength;
// Position of the ChunkInputStream is maintained by this variable (if a
// seek is performed. This position is w.r.t to the chunk only and not the
// block or key. This variable is also set before attempting a read to enable
// retry. Once the chunk is read, this variable is reset.
private long chunkPosition = -1;
private final Token<? extends TokenIdentifier> token;
private static final int EOF = -1;
ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
XceiverClientFactory xceiverClientFactory,
Supplier<Pipeline> pipelineSupplier,
boolean verifyChecksum, Token<? extends TokenIdentifier> token) {
this.chunkInfo = chunkInfo;
this.length = chunkInfo.getLen();
this.blockID = blockId;
this.xceiverClientFactory = xceiverClientFactory;
this.pipelineSupplier = pipelineSupplier;
this.verifyChecksum = verifyChecksum;
this.token = token;
}
public synchronized long getRemaining() {
return length - getPos();
}
/**
* {@inheritDoc}
*/
@Override
public synchronized int read() throws IOException {
acquireClient();
int available = prepareRead(1);
int dataout = EOF;
if (available == EOF) {
// There is no more data in the chunk stream. The buffers should have
// been released by now
Preconditions.checkState(buffers == null);
} else {
dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
}
if (chunkStreamEOF()) {
// consumer might use getPos to determine EOF,
// so release buffers when serving the last byte of data
releaseBuffers();
}
return dataout;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
// According to the JavaDocs for InputStream, it is recommended that
// subclasses provide an override of bulk read if possible for performance
// reasons. In addition to performance, we need to do it for correctness
// reasons. The Ozone REST service uses PipedInputStream and
// PipedOutputStream to relay HTTP response data between a Jersey thread and
// a Netty thread. It turns out that PipedInputStream/PipedOutputStream
// have a subtle dependency (bug?) on the wrapped stream providing separate
// implementations of single-byte read and bulk read. Without this, get key
// responses might close the connection before writing all of the bytes
// advertised in the Content-Length.
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return 0;
}
acquireClient();
int total = 0;
while (len > 0) {
int available = prepareRead(len);
if (available == EOF) {
// There is no more data in the chunk stream. The buffers should have
// been released by now
Preconditions.checkState(buffers == null);
return total != 0 ? total : EOF;
}
buffers.get(bufferIndex).get(b, off + total, available);
len -= available;
total += available;
}
if (chunkStreamEOF()) {
// smart consumers determine EOF by calling getPos()
// so we release buffers when serving the final bytes of data
releaseBuffers();
}
return total;
}
/**
* Seeks the ChunkInputStream to the specified position. This is done by
* updating the chunkPosition to the seeked position in case the buffers
* are not allocated or buffers do not contain the data corresponding to
* the seeked position (determined by buffersHavePosition()). Otherwise,
* the buffers position is updated to the seeked position.
*/
@Override
public synchronized void seek(long pos) throws IOException {
if (pos < 0 || pos >= length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
return;
}
throw new EOFException("EOF encountered at pos: " + pos + " for chunk: "
+ chunkInfo.getChunkName());
}
if (buffersHavePosition(pos)) {
// The bufferPosition is w.r.t the current chunk.
// Adjust the bufferIndex and position to the seeked position.
adjustBufferPosition(pos - bufferOffset);
} else {
chunkPosition = pos;
}
}
@Override
public synchronized long getPos() {
if (chunkPosition >= 0) {
return chunkPosition;
}
if (chunkStreamEOF()) {
return length;
}
if (buffersHaveData()) {
return bufferOffset + buffers.get(bufferIndex).position();
}
if (buffersAllocated()) {
return bufferOffset + bufferLength;
}
return 0;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public synchronized void close() {
releaseClient();
}
protected synchronized void releaseClient() {
if (xceiverClientFactory != null && xceiverClient != null) {
xceiverClientFactory.releaseClient(xceiverClient, false);
xceiverClient = null;
}
}
/**
* Acquire new client if previous one was released.
*/
protected synchronized void acquireClient() throws IOException {
if (xceiverClientFactory != null && xceiverClient == null) {
xceiverClient = xceiverClientFactory.acquireClientForReadData(
pipelineSupplier.get());
}
}
/**
* Prepares to read by advancing through buffers or allocating new buffers,
* as needed until it finds data to return, or encounters EOF.
* @param len desired lenght of data to read
* @return length of data available to read, possibly less than desired length
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
if (chunkPosition >= 0) {
if (buffersHavePosition(chunkPosition)) {
// The current buffers have the seeked position. Adjust the buffer
// index and position to point to the chunkPosition.
adjustBufferPosition(chunkPosition - bufferOffset);
} else {
// Read a required chunk data to fill the buffers with seeked
// position data
readChunkFromContainer(len);
}
}
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len;
} else if (dataRemainingInChunk()) {
// There is more data in the chunk stream which has not
// been read into the buffers yet.
readChunkFromContainer(len);
} else {
// All available input from this chunk stream has been consumed.
return EOF;
}
}
}
/**
* Reads full or partial Chunk from DN Container based on the current
* position of the ChunkInputStream, the number of bytes of data to read
* and the checksum boundaries.
* If successful, then the read data in saved in the buffers so that
* subsequent read calls can utilize it.
* @param len number of bytes of data to be read
* @throws IOException if there is an I/O error while performing the call
* to Datanode
*/
private synchronized void readChunkFromContainer(int len) throws IOException {
// index of first byte to be read from the chunk
long startByteIndex;
if (chunkPosition >= 0) {
// If seek operation was called to advance the buffer position, the
// chunk should be read from that position onwards.
startByteIndex = chunkPosition;
} else {
// Start reading the chunk from the last chunkPosition onwards.
startByteIndex = bufferOffset + bufferLength;
}
// bufferOffset and bufferLength are updated below, but if read fails
// and is retried, we need the previous position. Position is reset after
// successful read in adjustBufferPosition()
storePosition();
if (verifyChecksum) {
// Update the bufferOffset and bufferLength as per the checksum
// boundary requirement.
computeChecksumBoundaries(startByteIndex, len);
} else {
// Read from the startByteIndex
bufferOffset = startByteIndex;
bufferLength = len;
}
// Adjust the chunkInfo so that only the required bytes are read from
// the chunk.
final ChunkInfo adjustedChunkInfo = ChunkInfo.newBuilder(chunkInfo)
.setOffset(bufferOffset + chunkInfo.getOffset())
.setLen(bufferLength)
.build();
ByteString byteString = readChunk(adjustedChunkInfo);
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
allocated = true;
// If the stream was seeked to position before, then the buffer
// position should be adjusted as the reads happen at checksum boundaries.
// The buffers position might need to be adjusted for the following
// scenarios:
// 1. Stream was seeked to a position before the chunk was read
// 2. Chunk was read from index < the current position to account for
// checksum boundaries.
adjustBufferPosition(startByteIndex - bufferOffset);
}
/**
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException {
ReadChunkResponseProto readChunkResponse;
try {
List<CheckedBiFunction> validators =
ContainerProtocolCalls.getValidatorList();
validators.add(validator);
readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, token);
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
}
return readChunkResponse.getData();
}
private CheckedBiFunction<ContainerCommandRequestProto,
ContainerCommandResponseProto, IOException> validator =
(request, response) -> {
final ChunkInfo reqChunkInfo =
request.getReadChunk().getChunkData();
ReadChunkResponseProto readChunkResponse = response.getReadChunk();
ByteString byteString = readChunkResponse.getData();
if (byteString.size() != reqChunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
byteString.size()));
}
if (verifyChecksum) {
ChecksumData checksumData = ChecksumData.getFromProtoBuf(
chunkInfo.getChecksumData());
// ChecksumData stores checksum for each 'numBytesPerChecksum'
// number of bytes in a list. Compute the index of the first
// checksum to match with the read data
long relativeOffset = reqChunkInfo.getOffset() -
chunkInfo.getOffset();
int bytesPerChecksum = checksumData.getBytesPerChecksum();
int startIndex = (int) (relativeOffset / bytesPerChecksum);
Checksum.verifyChecksum(byteString, checksumData, startIndex);
}
};
/**
* Return the offset and length of bytes that need to be read from the
* chunk file to cover the checksum boundaries covering the actual start and
* end of the chunk index to be read.
* For example, lets say the client is reading from index 120 to 450 in the
* chunk. And let's say checksum is stored for every 100 bytes in the chunk
* i.e. the first checksum is for bytes from index 0 to 99, the next for
* bytes from index 100 to 199 and so on. To verify bytes from 120 to 450,
* we would need to read from bytes 100 to 499 so that checksum
* verification can be done.
*
* @param startByteIndex the first byte index to be read by client
* @param dataLen number of bytes to be read from the chunk
*/
private void computeChecksumBoundaries(long startByteIndex, int dataLen) {
int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
// index of the last byte to be read from chunk, inclusively.
final long endByteIndex = startByteIndex + dataLen - 1;
bufferOffset = (startByteIndex / bytesPerChecksum)
* bytesPerChecksum; // inclusive
final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
* bytesPerChecksum; // exclusive
bufferLength = Math.min(endIndex, length) - bufferOffset;
}
/**
* Adjust the buffers position to account for seeked position and/ or checksum
* boundary reads.
* @param bufferPosition the position to which the buffers must be advanced
*/
private void adjustBufferPosition(long bufferPosition) {
// The bufferPosition is w.r.t the current chunk.
// Adjust the bufferIndex and position to the seeked chunkPosition.
long tempOffest = 0;
for (int i = 0; i < buffers.size(); i++) {
if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
tempOffest += buffers.get(i).capacity();
} else {
bufferIndex = i;
break;
}
}
buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
// Reset the chunkPosition as chunk stream has been initialized i.e. the
// buffers have been allocated.
resetPosition();
}
/**
* Check if the buffers have been allocated data and false otherwise.
*/
@VisibleForTesting
protected boolean buffersAllocated() {
return buffers != null && !buffers.isEmpty();
}
/**
* Check if the buffers have any data remaining between the current
* position and the limit.
*/
private boolean buffersHaveData() {
boolean hasData = false;
if (buffersAllocated()) {
while (bufferIndex < (buffers.size())) {
if (buffers.get(bufferIndex).hasRemaining()) {
// current buffer has data
hasData = true;
break;
} else {
if (buffersRemaining()) {
// move to next available buffer
++bufferIndex;
Preconditions.checkState(bufferIndex < buffers.size());
} else {
// no more buffers remaining
break;
}
}
}
}
return hasData;
}
private boolean buffersRemaining() {
return (bufferIndex < (buffers.size() - 1));
}
/**
* Check if curernt buffers have the data corresponding to the input position.
*/
private boolean buffersHavePosition(long pos) {
// Check if buffers have been allocated
if (buffersAllocated()) {
// Check if the current buffers cover the input position
return pos >= bufferOffset &&
pos < bufferOffset + bufferLength;
}
return false;
}
/**
* Check if there is more data in the chunk which has not yet been read
* into the buffers.
*/
private boolean dataRemainingInChunk() {
long bufferPos;
if (chunkPosition >= 0) {
bufferPos = chunkPosition;
} else {
bufferPos = bufferOffset + bufferLength;
}
return bufferPos < length;
}
/**
* Check if end of chunkStream has been reached.
*/
private boolean chunkStreamEOF() {
if (!allocated) {
// Chunk data has not been read yet
return false;
}
if (buffersHaveData() || dataRemainingInChunk()) {
return false;
} else {
Preconditions.checkState(bufferOffset + bufferLength == length,
"EOF detected, but not at the last byte of the chunk");
return true;
}
}
/**
* If EOF is reached, release the buffers.
*/
private void releaseBuffers() {
buffers = null;
bufferIndex = 0;
// We should not reset bufferOffset and bufferLength here because when
// getPos() is called in chunkStreamsEOF() we use these values and
// determine whether chunk is read completely or not.
}
/**
* Reset the chunkPosition once the buffers are allocated.
*/
void resetPosition() {
this.chunkPosition = -1;
}
private void storePosition() {
chunkPosition = getPos();
}
String getChunkName() {
return chunkInfo.getChunkName();
}
protected long getLength() {
return length;
}
@VisibleForTesting
protected long getChunkPosition() {
return chunkPosition;
}
@Override
public synchronized void unbuffer() {
storePosition();
releaseBuffers();
releaseClient();
}
}