| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.ozone.client.io; |
| |
| import com.google.common.base.Preconditions; |
| import org.apache.hadoop.fs.FSExceptionMessages; |
| import org.apache.hadoop.hdds.client.BlockID; |
| import org.apache.hadoop.hdds.client.ECReplicationConfig; |
| import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.scm.XceiverClientFactory; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineID; |
| import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; |
| import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; |
| import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.function.Function; |
| |
| /** |
| * Class to read data from an EC Block Group. |
| */ |
| public class ECBlockInputStream extends BlockExtendedInputStream { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(ECBlockInputStream.class); |
| |
| private final ECReplicationConfig repConfig; |
| private final int ecChunkSize; |
| private final long stripeSize; |
| private final BlockInputStreamFactory streamFactory; |
| private final boolean verifyChecksum; |
| private final XceiverClientFactory xceiverClientFactory; |
| private final Function<BlockID, Pipeline> refreshFunction; |
| private final BlockLocationInfo blockInfo; |
| private final DatanodeDetails[] dataLocations; |
| private final BlockExtendedInputStream[] blockStreams; |
| private final int maxLocations; |
| |
| private long position = 0; |
| private boolean closed = false; |
| private boolean seeked = false; |
| |
| protected ECReplicationConfig getRepConfig() { |
| return repConfig; |
| } |
| |
| protected DatanodeDetails[] getDataLocations() { |
| return dataLocations; |
| } |
| |
| protected long getStripeSize() { |
| return stripeSize; |
| } |
| |
| /** |
| * Returns the number of available data locations, taking account of the |
| * expected number of locations. Eg, if the block is less than 1 EC chunk, |
| * we only expect 1 data location. If it is between 1 and 2 chunks, we expect |
| * there to be 2 locations, and so on. |
| * @param expectedLocations The maximum number of allowed data locations, |
| * depending on the block size. |
| * @return The number of available data locations. |
| */ |
| protected int availableDataLocations(int expectedLocations) { |
| int count = 0; |
| for (int i = 0; i < repConfig.getData() && i < expectedLocations; i++) { |
| if (dataLocations[i] != null) { |
| count++; |
| } |
| } |
| return count; |
| } |
| |
| protected int availableParityLocations() { |
| int count = 0; |
| for (int i = repConfig.getData(); |
| i < repConfig.getData() + repConfig.getParity(); i++) { |
| if (dataLocations[i] != null) { |
| count++; |
| } |
| } |
| return count; |
| } |
| |
| public ECBlockInputStream(ECReplicationConfig repConfig, |
| BlockLocationInfo blockInfo, boolean verifyChecksum, |
| XceiverClientFactory xceiverClientFactory, Function<BlockID, |
| Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) { |
| this.repConfig = repConfig; |
| this.ecChunkSize = repConfig.getEcChunkSize(); |
| this.verifyChecksum = verifyChecksum; |
| this.blockInfo = blockInfo; |
| this.streamFactory = streamFactory; |
| this.xceiverClientFactory = xceiverClientFactory; |
| this.refreshFunction = refreshFunction; |
| this.maxLocations = repConfig.getData() + repConfig.getParity(); |
| this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()]; |
| this.blockStreams = |
| new BlockExtendedInputStream[repConfig.getRequiredNodes()]; |
| |
| this.stripeSize = (long)ecChunkSize * repConfig.getData(); |
| setBlockLocations(this.blockInfo.getPipeline()); |
| } |
| |
| public synchronized boolean hasSufficientLocations() { |
| // The number of locations needed is a function of the EC Chunk size. If the |
| // block length is <= the chunk size, we should only have location 1. If it |
| // is greater than the chunk size but less than chunk_size * 2, then we must |
| // have two locations. If it is greater than chunk_size * data_num, then we |
| // must have all data_num locations. |
| // We only consider data locations here. |
| int expectedDataBlocks = calculateExpectedDataBlocks(repConfig); |
| return expectedDataBlocks == availableDataLocations(expectedDataBlocks); |
| } |
| |
| protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) { |
| return ECBlockInputStreamProxy.expectedDataLocations(rConfig, getLength()); |
| } |
| |
| /** |
| * Using the current position, returns the index of the blockStream we should |
| * be reading from. This is the index in the internal array holding the |
| * stream reference. The block group index will be one greater than this. |
| * @return |
| */ |
| protected int currentStreamIndex() { |
| return (int)((position / ecChunkSize) % repConfig.getData()); |
| } |
| |
| /** |
| * Uses the current position and ecChunkSize to determine which of the |
| * internal block streams the next read should come from. Also opens the |
| * stream if it has not been opened already. |
| * @return BlockInput stream to read from. |
| */ |
| protected BlockExtendedInputStream getOrOpenStream(int locationIndex) { |
| BlockExtendedInputStream stream = blockStreams[locationIndex]; |
| if (stream == null) { |
| // To read an EC block, we create a STANDALONE pipeline that contains the |
| // single location for the block index we want to read. The EC blocks are |
| // indexed from 1 to N, however the data locations are stored in the |
| // dataLocations array indexed from zero. |
| Pipeline pipeline = Pipeline.newBuilder() |
| .setReplicationConfig(StandaloneReplicationConfig.getInstance( |
| HddsProtos.ReplicationFactor.ONE)) |
| .setNodes(Arrays.asList(dataLocations[locationIndex])) |
| .setId(PipelineID.randomId()) |
| .setState(Pipeline.PipelineState.CLOSED) |
| .build(); |
| |
| BlockLocationInfo blkInfo = new BlockLocationInfo.Builder() |
| .setBlockID(blockInfo.getBlockID()) |
| .setLength(internalBlockLength(locationIndex + 1)) |
| .setPipeline(blockInfo.getPipeline()) |
| .setToken(blockInfo.getToken()) |
| .setPartNumber(blockInfo.getPartNumber()) |
| .build(); |
| stream = streamFactory.create( |
| StandaloneReplicationConfig.getInstance( |
| HddsProtos.ReplicationFactor.ONE), |
| blkInfo, pipeline, |
| blockInfo.getToken(), verifyChecksum, xceiverClientFactory, |
| refreshFunction); |
| blockStreams[locationIndex] = stream; |
| } |
| return stream; |
| } |
| |
| /** |
| * Returns the length of the Nth block in the block group, taking account of a |
| * potentially partial last stripe. Note that the internal block index is |
| * numbered starting from 1. |
| * @param index - Index number of the internal block, starting from 1 |
| * @return |
| */ |
| protected long internalBlockLength(int index) { |
| long lastStripe = blockInfo.getLength() % stripeSize; |
| long blockSize = (blockInfo.getLength() - lastStripe) / repConfig.getData(); |
| long lastCell = lastStripe / ecChunkSize + 1; |
| long lastCellLength = lastStripe % ecChunkSize; |
| |
| if (index > repConfig.getData()) { |
| // Its a parity block and their size is driven by the size of the |
| // first block of the block group. All parity blocks have the same size |
| // as block_1. |
| index = 1; |
| } |
| |
| if (index < lastCell) { |
| return blockSize + ecChunkSize; |
| } else if (index == lastCell) { |
| return blockSize + lastCellLength; |
| } else { |
| return blockSize; |
| } |
| } |
| |
| private void setBlockLocations(Pipeline pipeline) { |
| for (DatanodeDetails node : pipeline.getNodes()) { |
| int index = pipeline.getReplicaIndex(node); |
| addBlockLocation(index, node); |
| } |
| } |
| |
| private void addBlockLocation(int index, DatanodeDetails location) { |
| if (index > maxLocations) { |
| throw new IndexOutOfBoundsException("The index " + index + " is greater " |
| + "than the EC Replication Config (" + repConfig + ")"); |
| } |
| dataLocations[index - 1] = location; |
| } |
| |
| protected long blockLength() { |
| return blockInfo.getLength(); |
| } |
| |
| protected long remaining() { |
| return blockLength() - position; |
| } |
| |
| /** |
| * Read from the internal BlockInputStreams one EC cell at a time into the |
| * strategy buffer. This call may read from several internal BlockInputStreams |
| * if there is sufficient space in the buffer. |
| * @param strategy |
| * @return |
| * @throws IOException |
| */ |
| @Override |
| protected synchronized int readWithStrategy(ByteReaderStrategy strategy) |
| throws IOException { |
| Preconditions.checkArgument(strategy != null); |
| checkOpen(); |
| |
| if (remaining() == 0) { |
| return EOF; |
| } |
| |
| int totalRead = 0; |
| while (strategy.getTargetLength() > 0 && remaining() > 0) { |
| try { |
| int currentIndex = currentStreamIndex(); |
| BlockExtendedInputStream stream = getOrOpenStream(currentIndex); |
| int read = readFromStream(stream, strategy); |
| totalRead += read; |
| position += read; |
| } catch (IOException ioe) { |
| throw new BadDataLocationException( |
| dataLocations[currentStreamIndex()], ioe); |
| } |
| } |
| return totalRead; |
| } |
| |
| @Override |
| public synchronized long getRemaining() { |
| return blockInfo.getLength() - position; |
| } |
| |
| @Override |
| public synchronized long getLength() { |
| return blockInfo.getLength(); |
| } |
| |
| @Override |
| public BlockID getBlockID() { |
| return blockInfo.getBlockID(); |
| } |
| |
| protected void seekStreamIfNecessary(BlockExtendedInputStream stream, |
| long partialChunkSize) throws IOException { |
| if (seeked) { |
| // Seek on the underlying streams is performed lazily, as there is a |
| // possibility a read after a seek may only read a small amount of data. |
| // Once this block stream has been seeked, we always check the position, |
| // but in the usual case, where there are no seeks at all, we don't need |
| // to do this extra work. |
| long basePosition = (position / stripeSize) * (long)ecChunkSize; |
| long streamPosition = basePosition + partialChunkSize; |
| if (streamPosition != stream.getPos()) { |
| // This ECBlockInputStream has been seeked, so the underlying |
| // block stream is no longer at the correct position. Therefore we need |
| // to seek it too. |
| stream.seek(streamPosition); |
| } |
| } |
| } |
| |
| /** |
| * Read the most allowable amount of data from the current stream. This |
| * ensures we don't read past the end of an EC cell or the overall block |
| * group length. |
| * @param stream Stream to read from |
| * @param strategy The ReaderStrategy to read data into |
| * @return |
| * @throws IOException |
| */ |
| private int readFromStream(BlockExtendedInputStream stream, |
| ByteReaderStrategy strategy) |
| throws IOException { |
| long partialPosition = position % ecChunkSize; |
| seekStreamIfNecessary(stream, partialPosition); |
| long ecLimit = ecChunkSize - partialPosition; |
| // Free space in the buffer to read into |
| long bufLimit = strategy.getTargetLength(); |
| // How much we can read, the lower of the EC Cell, buffer and overall block |
| // remaining. |
| int expectedRead = (int)Math.min(Math.min(ecLimit, bufLimit), remaining()); |
| int actualRead = strategy.readFromBlock(stream, expectedRead); |
| if (actualRead == -1) { |
| // The Block Stream reached EOF, but we did not expect it to, so the block |
| // might be corrupt. |
| throw new IOException("Expected to read " + expectedRead + " but got EOF" |
| + " from blockGroup " + stream.getBlockID() + " index " |
| + currentStreamIndex() + 1); |
| } |
| return actualRead; |
| } |
| |
| /** |
| * Verify that the input stream is open. |
| * @throws IOException if the connection is closed. |
| */ |
| private void checkOpen() throws IOException { |
| if (closed) { |
| throw new IOException( |
| ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Block: " |
| + blockInfo.getBlockID()); |
| } |
| } |
| |
| @Override |
| public synchronized void close() { |
| closeStreams(); |
| closed = true; |
| } |
| |
| protected synchronized void closeStreams() { |
| for (int i = 0; i < blockStreams.length; i++) { |
| if (blockStreams[i] != null) { |
| try { |
| blockStreams[i].close(); |
| blockStreams[i] = null; |
| } catch (IOException e) { |
| LOG.error("Failed to close stream {}", blockStreams[i], e); |
| } |
| } |
| } |
| // If the streams have been closed outside of a close() call, then it may |
| // be due to freeing resources. If they are reopened, then we will need to |
| // seek the stream to its expected position when the next read is attempted. |
| seeked = true; |
| } |
| |
| @Override |
| public synchronized void unbuffer() { |
| for (BlockExtendedInputStream stream : blockStreams) { |
| if (stream != null) { |
| stream.unbuffer(); |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void seek(long pos) throws IOException { |
| checkOpen(); |
| if (pos < 0 || pos >= getLength()) { |
| if (pos == 0) { |
| // It is possible for length and pos to be zero in which case |
| // seek should return instead of throwing exception |
| return; |
| } |
| throw new EOFException( |
| "EOF encountered at pos: " + pos + " for block: " |
| + blockInfo.getBlockID()); |
| } |
| position = pos; |
| seeked = true; |
| } |
| |
| @Override |
| public synchronized long getPos() { |
| return position; |
| } |
| |
| protected synchronized void setPos(long pos) { |
| position = pos; |
| } |
| |
| @Override |
| public synchronized boolean seekToNewSource(long l) throws IOException { |
| return false; |
| } |
| } |