blob: 4bc144f3bd75c47ea769260ab17cd259798dbe1c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
/**
* A stream for accessing multipart streams.
*/
public class MultipartInputStream extends ExtendedInputStream {
private final String key;
private final long length;
// List of PartInputStream, one for each part of the key
private final List<? extends PartInputStream> partStreams;
// partOffsets[i] stores the index of the first data byte in
// partStream w.r.t the whole key data.
// For example, let’s say the part size is 200 bytes and part[0] stores
// data from indices 0 - 199, part[1] from indices 200 - 399 and so on.
// Then, partOffsets[0] = 0 (the offset of the first byte of data in
// part[0]), partOffsets[1] = 200 and so on.
private final long[] partOffsets;
private boolean closed;
// Index of the partStream corresponding to the current position of the
// MultipartCryptoKeyInputStream.
private int partIndex;
// Tracks the partIndex corresponding to the last seeked position so that it
// can be reset if a new position is seeked.
private int prevPartIndex;
public MultipartInputStream(String keyName,
List<? extends PartInputStream> inputStreams) {
Preconditions.checkNotNull(inputStreams);
this.key = keyName;
this.partStreams = inputStreams;
// Calculate and update the partOffsets
this.partOffsets = new long[inputStreams.size()];
int i = 0;
long streamLength = 0L;
for (PartInputStream partInputStream : inputStreams) {
this.partOffsets[i++] = streamLength;
streamLength += partInputStream.getLength();
}
this.length = streamLength;
}
@Override
protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
throws IOException {
Preconditions.checkArgument(strategy != null);
checkOpen();
int totalReadLen = 0;
while (strategy.getTargetLength() > 0) {
if (partStreams.size() == 0 ||
partStreams.size() - 1 <= partIndex &&
partStreams.get(partIndex).getRemaining() == 0) {
return totalReadLen == 0 ? EOF : totalReadLen;
}
// Get the current partStream and read data from it
PartInputStream current = partStreams.get(partIndex);
int numBytesToRead = getNumBytesToRead(strategy, current);
int numBytesRead = strategy
.readFromBlock((InputStream) current, numBytesToRead);
checkPartBytesRead(numBytesToRead, numBytesRead, current);
totalReadLen += numBytesRead;
if (current.getRemaining() <= 0 &&
partIndex + 1 < partStreams.size()) {
partIndex += 1;
}
}
return totalReadLen;
}
protected int getNumBytesToRead(ByteReaderStrategy strategy,
PartInputStream current) throws IOException {
return strategy.getTargetLength();
}
protected void checkPartBytesRead(int numBytesToRead, int numBytesRead,
PartInputStream stream) throws IOException {
}
/**
* Seeks the InputStream to the specified position. This involves 2 steps:
* 1. Updating the partIndex to the partStream corresponding to the
* seeked position.
* 2. Seeking the corresponding partStream to the adjusted position.
* <p>
* For example, let’s say the part sizes are 200 bytes and part[0] stores
* data from indices 0 - 199, part[1] from indices 200 - 399 and so on.
* Let’s say we seek to position 240. In the first step, the partIndex
* would be updated to 1 as indices 200 - 399 reside in partStream[1]. In
* the second step, the partStream[1] would be seeked to position 40 (=
* 240 - blockOffset[1] (= 200)).
*/
@Override
public synchronized void seek(long pos) throws IOException {
checkOpen();
if (pos == 0 && length == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
return;
}
if (pos < 0 || pos > length) {
throw new EOFException(
"EOF encountered at pos: " + pos + " for key: " + key);
}
// 1. Update the partIndex
if (partIndex >= partStreams.size()) {
partIndex = Arrays.binarySearch(partOffsets, pos);
} else if (pos < partOffsets[partIndex]) {
partIndex =
Arrays.binarySearch(partOffsets, 0, partIndex, pos);
} else if (pos >= partOffsets[partIndex] + partStreams
.get(partIndex).getLength()) {
partIndex = Arrays.binarySearch(partOffsets, partIndex + 1,
partStreams.size(), pos);
}
if (partIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
// inserted in the sorted array. We need to adjust the blockIndex
// accordingly so that partIndex = insertionPoint - 1
partIndex = -partIndex - 2;
}
// Reset the previous partStream's position
partStreams.get(prevPartIndex).seek(0);
// Reset all the partStreams above the partIndex. We do this to reset
// any previous reads which might have updated the higher part
// streams position.
for (int index = partIndex + 1; index < partStreams.size(); index++) {
partStreams.get(index).seek(0);
}
// 2. Seek the partStream to the adjusted position
partStreams.get(partIndex).seek(pos - partOffsets[partIndex]);
prevPartIndex = partIndex;
}
@Override
public synchronized long getPos() throws IOException {
return length == 0 ? 0 :
partOffsets[partIndex] + partStreams.get(partIndex).getPos();
}
@Override
public synchronized int available() throws IOException {
checkOpen();
long remaining = length - getPos();
return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
}
@Override
public synchronized void unbuffer() {
for (PartInputStream stream : partStreams) {
stream.unbuffer();
}
}
@Override
public synchronized long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
long toSkip = Math.min(n, length - getPos());
seek(getPos() + toSkip);
return toSkip;
}
@Override
public synchronized void close() throws IOException {
closed = true;
for (PartInputStream stream : partStreams) {
stream.close();
}
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
*
* @throws IOException if the connection is closed.
*/
private void checkOpen() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
}
}
public long getLength() {
return length;
}
@VisibleForTesting
public synchronized int getCurrentStreamIndex() {
return partIndex;
}
@VisibleForTesting
public long getRemainingOfIndex(int index) throws IOException {
return partStreams.get(index).getRemaining();
}
@VisibleForTesting
public List<? extends PartInputStream> getPartStreams() {
return partStreams;
}
}