| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdds.scm.storage; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| |
| import org.apache.hadoop.fs.CanUnbuffer; |
| import org.apache.hadoop.fs.Seekable; |
| import org.apache.hadoop.hdds.client.BlockID; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.scm.XceiverClientFactory; |
| import org.apache.hadoop.hdds.scm.XceiverClientSpi; |
| import org.apache.hadoop.hdds.scm.client.HddsClientUtils; |
| import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; |
| import org.apache.hadoop.io.retry.RetryPolicy; |
| import org.apache.hadoop.security.token.Token; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * An {@link InputStream} called from KeyInputStream to read a block from the |
| * container. |
| * This class encapsulates all state management for iterating |
| * through the sequence of chunks through {@link ChunkInputStream}. |
| */ |
| public class BlockInputStream extends InputStream |
| implements Seekable, CanUnbuffer { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(BlockInputStream.class); |
| |
| private static final int EOF = -1; |
| |
| private final BlockID blockID; |
| private final long length; |
| private Pipeline pipeline; |
| private final Token<OzoneBlockTokenIdentifier> token; |
| private final boolean verifyChecksum; |
| private XceiverClientFactory xceiverClientFactory; |
| private XceiverClientSpi xceiverClient; |
| private boolean initialized = false; |
| private final RetryPolicy retryPolicy = |
| HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1)); |
| private int retries; |
| |
| // List of ChunkInputStreams, one for each chunk in the block |
| private List<ChunkInputStream> chunkStreams; |
| |
| // chunkOffsets[i] stores the index of the first data byte in |
| // chunkStream i w.r.t the block data. |
| // Let’s say we have chunk size as 40 bytes. And let's say the parent |
| // block stores data from index 200 and has length 400. |
| // The first 40 bytes of this block will be stored in chunk[0], next 40 in |
| // chunk[1] and so on. But since the chunkOffsets are w.r.t the block only |
| // and not the key, the values in chunkOffsets will be [0, 40, 80,....]. |
| private long[] chunkOffsets = null; |
| |
| // Index of the chunkStream corresponding to the current position of the |
| // BlockInputStream i.e offset of the data to be read next from this block |
| private int chunkIndex; |
| |
| // Position of the BlockInputStream is maintainted by this variable till |
| // the stream is initialized. This position is w.r.t to the block only and |
| // not the key. |
| // For the above example, if we seek to position 240 before the stream is |
| // initialized, then value of blockPosition will be set to 40. |
| // Once, the stream is initialized, the position of the stream |
| // will be determined by the current chunkStream and its position. |
| private long blockPosition = 0; |
| |
| // Tracks the chunkIndex corresponding to the last blockPosition so that it |
| // can be reset if a new position is seeked. |
| private int chunkIndexOfPrevPosition; |
| |
| private final Function<BlockID, Pipeline> refreshPipelineFunction; |
| |
| public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, |
| Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum, |
| XceiverClientFactory xceiverClientFactory, |
| Function<BlockID, Pipeline> refreshPipelineFunction) { |
| this.blockID = blockId; |
| this.length = blockLen; |
| this.pipeline = pipeline; |
| this.token = token; |
| this.verifyChecksum = verifyChecksum; |
| this.xceiverClientFactory = xceiverClientFactory; |
| this.refreshPipelineFunction = refreshPipelineFunction; |
| } |
| |
| public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, |
| Token<OzoneBlockTokenIdentifier> token, |
| boolean verifyChecksum, |
| XceiverClientFactory xceiverClientFactory |
| ) { |
| this(blockId, blockLen, pipeline, token, verifyChecksum, |
| xceiverClientFactory, null); |
| } |
| /** |
| * Initialize the BlockInputStream. Get the BlockData (list of chunks) from |
| * the Container and create the ChunkInputStreams for each Chunk in the Block. |
| */ |
| public synchronized void initialize() throws IOException { |
| |
| // Pre-check that the stream has not been intialized already |
| if (initialized) { |
| return; |
| } |
| |
| List<ChunkInfo> chunks; |
| try { |
| chunks = getChunkInfos(); |
| } catch (ContainerNotFoundException ioEx) { |
| refreshPipeline(ioEx); |
| chunks = getChunkInfos(); |
| } |
| |
| if (chunks != null && !chunks.isEmpty()) { |
| // For each chunk in the block, create a ChunkInputStream and compute |
| // its chunkOffset |
| this.chunkOffsets = new long[chunks.size()]; |
| long tempOffset = 0; |
| |
| this.chunkStreams = new ArrayList<>(chunks.size()); |
| for (int i = 0; i < chunks.size(); i++) { |
| addStream(chunks.get(i)); |
| chunkOffsets[i] = tempOffset; |
| tempOffset += chunks.get(i).getLen(); |
| } |
| |
| initialized = true; |
| this.chunkIndex = 0; |
| |
| if (blockPosition > 0) { |
| // Stream was seeked to blockPosition before initialization. Seek to the |
| // blockPosition now. |
| seek(blockPosition); |
| } |
| } |
| } |
| |
| private void refreshPipeline(IOException cause) throws IOException { |
| LOG.info("Unable to read information for block {} from pipeline {}: {}", |
| blockID, pipeline.getId(), cause.getMessage()); |
| if (refreshPipelineFunction != null) { |
| LOG.debug("Re-fetching pipeline for block {}", blockID); |
| Pipeline newPipeline = refreshPipelineFunction.apply(blockID); |
| if (newPipeline == null || newPipeline.sameDatanodes(pipeline)) { |
| LOG.warn("No new pipeline for block {}", blockID); |
| throw cause; |
| } else { |
| LOG.debug("New pipeline got for block {}", blockID); |
| this.pipeline = newPipeline; |
| } |
| } else { |
| throw cause; |
| } |
| } |
| |
| /** |
| * Send RPC call to get the block info from the container. |
| * @return List of chunks in this block. |
| */ |
| protected List<ChunkInfo> getChunkInfos() throws IOException { |
| // irrespective of the container state, we will always read via Standalone |
| // protocol. |
| if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { |
| pipeline = Pipeline.newBuilder(pipeline) |
| .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); |
| } |
| acquireClient(); |
| boolean success = false; |
| List<ChunkInfo> chunks; |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Initializing BlockInputStream for get key to access {}", |
| blockID.getContainerID()); |
| } |
| |
| DatanodeBlockID datanodeBlockID = blockID |
| .getDatanodeBlockIDProtobuf(); |
| GetBlockResponseProto response = ContainerProtocolCalls |
| .getBlock(xceiverClient, datanodeBlockID, token); |
| |
| chunks = response.getBlockData().getChunksList(); |
| success = true; |
| } finally { |
| if (!success) { |
| xceiverClientFactory.releaseClientForReadData(xceiverClient, false); |
| } |
| } |
| |
| return chunks; |
| } |
| |
| protected void acquireClient() throws IOException { |
| xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline); |
| } |
| |
| /** |
| * Append another ChunkInputStream to the end of the list. Note that the |
| * ChunkInputStream is only created here. The chunk will be read from the |
| * Datanode only when a read operation is performed on for that chunk. |
| */ |
| protected synchronized void addStream(ChunkInfo chunkInfo) { |
| chunkStreams.add(createChunkInputStream(chunkInfo)); |
| } |
| |
| protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { |
| return new ChunkInputStream(chunkInfo, blockID, |
| xceiverClientFactory, () -> pipeline, verifyChecksum, token); |
| } |
| |
| public synchronized long getRemaining() { |
| return length - getPos(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public synchronized int read() throws IOException { |
| byte[] buf = new byte[1]; |
| if (read(buf, 0, 1) == EOF) { |
| return EOF; |
| } |
| return Byte.toUnsignedInt(buf[0]); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public synchronized int read(byte[] b, int off, int len) throws IOException { |
| if (b == null) { |
| throw new NullPointerException(); |
| } |
| if (off < 0 || len < 0 || len > b.length - off) { |
| throw new IndexOutOfBoundsException(); |
| } |
| if (len == 0) { |
| return 0; |
| } |
| |
| if (!initialized) { |
| initialize(); |
| } |
| |
| checkOpen(); |
| int totalReadLen = 0; |
| while (len > 0) { |
| // if we are at the last chunk and have read the entire chunk, return |
| if (chunkStreams.size() == 0 || |
| (chunkStreams.size() - 1 <= chunkIndex && |
| chunkStreams.get(chunkIndex) |
| .getRemaining() == 0)) { |
| return totalReadLen == 0 ? EOF : totalReadLen; |
| } |
| |
| // Get the current chunkStream and read data from it |
| ChunkInputStream current = chunkStreams.get(chunkIndex); |
| int numBytesToRead = Math.min(len, (int)current.getRemaining()); |
| int numBytesRead; |
| try { |
| numBytesRead = current.read(b, off, numBytesToRead); |
| retries = 0; // reset retries after successful read |
| } catch (StorageContainerException e) { |
| if (shouldRetryRead(e)) { |
| handleReadError(e); |
| continue; |
| } else { |
| throw e; |
| } |
| } |
| |
| if (numBytesRead != numBytesToRead) { |
| // This implies that there is either data loss or corruption in the |
| // chunk entries. Even EOF in the current stream would be covered in |
| // this case. |
| throw new IOException(String.format( |
| "Inconsistent read for chunkName=%s length=%d numBytesToRead= %d " + |
| "numBytesRead=%d", current.getChunkName(), current.getLength(), |
| numBytesToRead, numBytesRead)); |
| } |
| totalReadLen += numBytesRead; |
| off += numBytesRead; |
| len -= numBytesRead; |
| if (current.getRemaining() <= 0 && |
| ((chunkIndex + 1) < chunkStreams.size())) { |
| chunkIndex += 1; |
| } |
| } |
| return totalReadLen; |
| } |
| |
| /** |
| * Seeks the BlockInputStream to the specified position. If the stream is |
| * not initialized, save the seeked position via blockPosition. Otherwise, |
| * update the position in 2 steps: |
| * 1. Updating the chunkIndex to the chunkStream corresponding to the |
| * seeked position. |
| * 2. Seek the corresponding chunkStream to the adjusted position. |
| * |
| * Let’s say we have chunk size as 40 bytes. And let's say the parent block |
| * stores data from index 200 and has length 400. If the key was seeked to |
| * position 90, then this block will be seeked to position 90. |
| * When seek(90) is called on this blockStream, then |
| * 1. chunkIndex will be set to 2 (as indices 80 - 120 reside in chunk[2]). |
| * 2. chunkStream[2] will be seeked to position 10 |
| * (= 90 - chunkOffset[2] (= 80)). |
| */ |
| @Override |
| public synchronized void seek(long pos) throws IOException { |
| if (!initialized) { |
| // Stream has not been initialized yet. Save the position so that it |
| // can be seeked when the stream is initialized. |
| blockPosition = pos; |
| return; |
| } |
| |
| checkOpen(); |
| if (pos < 0 || pos >= length) { |
| if (pos == 0) { |
| // It is possible for length and pos to be zero in which case |
| // seek should return instead of throwing exception |
| return; |
| } |
| throw new EOFException( |
| "EOF encountered at pos: " + pos + " for block: " + blockID); |
| } |
| |
| if (chunkIndex >= chunkStreams.size()) { |
| chunkIndex = Arrays.binarySearch(chunkOffsets, pos); |
| } else if (pos < chunkOffsets[chunkIndex]) { |
| chunkIndex = |
| Arrays.binarySearch(chunkOffsets, 0, chunkIndex, pos); |
| } else if (pos >= chunkOffsets[chunkIndex] + chunkStreams |
| .get(chunkIndex).getLength()) { |
| chunkIndex = Arrays.binarySearch(chunkOffsets, |
| chunkIndex + 1, chunkStreams.size(), pos); |
| } |
| if (chunkIndex < 0) { |
| // Binary search returns -insertionPoint - 1 if element is not present |
| // in the array. insertionPoint is the point at which element would be |
| // inserted in the sorted array. We need to adjust the chunkIndex |
| // accordingly so that chunkIndex = insertionPoint - 1 |
| chunkIndex = -chunkIndex - 2; |
| } |
| |
| // Reset the previous chunkStream's position |
| chunkStreams.get(chunkIndexOfPrevPosition).resetPosition(); |
| |
| // Reset all the chunkStreams above the chunkIndex. We do this to reset |
| // any previous reads which might have updated the chunkPosition. |
| for (int index = chunkIndex + 1; index < chunkStreams.size(); index++) { |
| chunkStreams.get(index).seek(0); |
| } |
| // seek to the proper offset in the ChunkInputStream |
| chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]); |
| chunkIndexOfPrevPosition = chunkIndex; |
| } |
| |
| @Override |
| public synchronized long getPos() { |
| if (length == 0) { |
| return 0; |
| } |
| |
| if (!initialized) { |
| // The stream is not initialized yet. Return the blockPosition |
| return blockPosition; |
| } else { |
| return chunkOffsets[chunkIndex] + chunkStreams.get(chunkIndex).getPos(); |
| } |
| } |
| |
| @Override |
| public boolean seekToNewSource(long targetPos) throws IOException { |
| return false; |
| } |
| |
| @Override |
| public synchronized void close() { |
| releaseClient(); |
| xceiverClientFactory = null; |
| } |
| |
| private void releaseClient() { |
| if (xceiverClientFactory != null && xceiverClient != null) { |
| xceiverClientFactory.releaseClient(xceiverClient, false); |
| xceiverClient = null; |
| } |
| } |
| |
| public synchronized void resetPosition() { |
| this.blockPosition = 0; |
| } |
| |
| /** |
| * Checks if the stream is open. If not, throw an exception. |
| * |
| * @throws IOException if stream is closed |
| */ |
| protected synchronized void checkOpen() throws IOException { |
| if (xceiverClientFactory == null) { |
| throw new IOException("BlockInputStream has been closed."); |
| } |
| } |
| |
| public BlockID getBlockID() { |
| return blockID; |
| } |
| |
| public long getLength() { |
| return length; |
| } |
| |
| @VisibleForTesting |
| synchronized int getChunkIndex() { |
| return chunkIndex; |
| } |
| |
| @VisibleForTesting |
| synchronized long getBlockPosition() { |
| return blockPosition; |
| } |
| |
| @Override |
| public void unbuffer() { |
| storePosition(); |
| releaseClient(); |
| |
| final List<ChunkInputStream> inputStreams = this.chunkStreams; |
| if (inputStreams != null) { |
| for (ChunkInputStream is : inputStreams) { |
| is.unbuffer(); |
| } |
| } |
| } |
| |
| private synchronized void storePosition() { |
| blockPosition = getPos(); |
| } |
| |
| private boolean shouldRetryRead(IOException cause) throws IOException { |
| RetryPolicy.RetryAction retryAction; |
| try { |
| retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true); |
| } catch (IOException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY; |
| } |
| |
| private void handleReadError(IOException cause) throws IOException { |
| releaseClient(); |
| final List<ChunkInputStream> inputStreams = this.chunkStreams; |
| if (inputStreams != null) { |
| for (ChunkInputStream is : inputStreams) { |
| is.releaseClient(); |
| } |
| } |
| |
| refreshPipeline(cause); |
| } |
| } |