blob: c37b2bec6ecd73a8f20cac2e423ac8432e1621d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
/**
* Encapsulates the BlobInputStream used by block blobs and adds support for
* random access and seek. Random access performance is improved by several
* orders of magnitude.
*/
final class BlockBlobInputStream extends InputStream implements Seekable {
private final CloudBlockBlobWrapper blob;
private final BlobRequestOptions options;
private final OperationContext opContext;
private InputStream blobInputStream = null;
private int minimumReadSizeInBytes = 0;
private long streamPositionAfterLastRead = -1;
// position of next network read within stream
private long streamPosition = 0;
// length of stream
private long streamLength = 0;
private boolean closed = false;
// internal buffer, re-used for performance optimization
private byte[] streamBuffer;
// zero-based offset within streamBuffer of current read position
private int streamBufferPosition;
// length of data written to streamBuffer, streamBuffer may be larger
private int streamBufferLength;
/**
* Creates a seek-able stream for reading from block blobs.
* @param blob a block blob reference.
* @param options the blob request options.
* @param opContext the blob operation context.
* @throws IOException IO failure
*/
BlockBlobInputStream(CloudBlockBlobWrapper blob,
BlobRequestOptions options,
OperationContext opContext) throws IOException {
this.blob = blob;
this.options = options;
this.opContext = opContext;
this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();
try {
this.blobInputStream = blob.openInputStream(options, opContext);
} catch (StorageException e) {
throw new IOException(e);
}
this.streamLength = blob.getProperties().getLength();
}
private void checkState() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
/**
* Reset the internal stream buffer but do not release the memory.
* The buffer can be reused to avoid frequent memory allocations of
* a large buffer.
*/
private void resetStreamBuffer() {
streamBufferPosition = 0;
streamBufferLength = 0;
}
/**
* Gets the read position of the stream.
* @return the zero-based byte offset of the read position.
* @throws IOException IO failure
*/
@Override
public synchronized long getPos() throws IOException {
checkState();
return (streamBuffer != null)
? streamPosition - streamBufferLength + streamBufferPosition
: streamPosition;
}
/**
* Sets the read position of the stream.
* @param pos a zero-based byte offset in the stream.
* @throws EOFException if read is out of range
*/
@Override
public synchronized void seek(long pos) throws IOException {
checkState();
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
}
if (pos > streamLength) {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
}
// calculate offset between the target and current position in the stream
long offset = pos - getPos();
if (offset == 0) {
// no=op, no state change
return;
}
if (offset > 0) {
// forward seek, data can be skipped as an optimization
if (skip(offset) != offset) {
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
return;
}
// reverse seek, offset is negative
if (streamBuffer != null) {
if (streamBufferPosition + offset >= 0) {
// target position is inside the stream buffer,
// only need to move backwards within the stream buffer
streamBufferPosition += offset;
} else {
// target position is outside the stream buffer,
// need to reset stream buffer and move position for next network read
resetStreamBuffer();
streamPosition = pos;
}
} else {
streamPosition = pos;
}
// close BlobInputStream after seek is invoked because BlobInputStream
// does not support seek
closeBlobInputStream();
}
/**
* Seeks an secondary copy of the data. This method is not supported.
* @param targetPos a zero-based byte offset in the stream.
* @return false
* @throws IOException IO failure
*/
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
/**
* Gets the number of bytes that can be read (or skipped over) without
* performing a network operation.
* @throws IOException IO failure
*/
@Override
public synchronized int available() throws IOException {
checkState();
if (blobInputStream != null) {
return blobInputStream.available();
} else {
return (streamBuffer == null)
? 0
: streamBufferLength - streamBufferPosition;
}
}
private void closeBlobInputStream() throws IOException {
if (blobInputStream != null) {
try {
blobInputStream.close();
} finally {
blobInputStream = null;
}
}
}
/**
* Closes this stream and releases any system resources associated with it.
* @throws IOException IO failure
*/
@Override
public synchronized void close() throws IOException {
closed = true;
closeBlobInputStream();
streamBuffer = null;
streamBufferPosition = 0;
streamBufferLength = 0;
}
private int doNetworkRead(byte[] buffer, int offset, int len)
throws IOException {
MemoryOutputStream outputStream;
boolean needToCopy = false;
if (streamPositionAfterLastRead == streamPosition) {
// caller is reading sequentially, so initialize the stream buffer
if (streamBuffer == null) {
streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes,
streamLength)];
}
resetStreamBuffer();
outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition,
streamBuffer.length);
needToCopy = true;
} else {
outputStream = new MemoryOutputStream(buffer, offset, len);
}
long bytesToRead = Math.min(
minimumReadSizeInBytes,
Math.min(
outputStream.capacity(),
streamLength - streamPosition));
try {
blob.downloadRange(streamPosition, bytesToRead, outputStream, options,
opContext);
} catch (StorageException e) {
throw new IOException(e);
}
int bytesRead = outputStream.size();
if (bytesRead > 0) {
streamPosition += bytesRead;
streamPositionAfterLastRead = streamPosition;
int count = Math.min(bytesRead, len);
if (needToCopy) {
streamBufferLength = bytesRead;
System.arraycopy(streamBuffer, streamBufferPosition, buffer, offset,
count);
streamBufferPosition += count;
}
return count;
} else {
// This may happen if the blob was modified after the length was obtained.
throw new EOFException("End of stream reached unexpectedly.");
}
}
/**
* Reads up to <code>len</code> bytes of data from the input stream into an
* array of bytes.
* @param b a buffer into which the data is written.
* @param offset a start offset into {@code buffer} where the data is written.
* @param len the maximum number of bytes to be read.
* @return the number of bytes written into {@code buffer}, or -1.
* @throws IOException IO failure
*/
@Override
public synchronized int read(byte[] b, int offset, int len)
throws IOException {
checkState();
NativeAzureFileSystemHelper.validateReadArgs(b, offset, len);
if (blobInputStream != null) {
int numberOfBytesRead = blobInputStream.read(b, offset, len);
streamPosition += numberOfBytesRead;
return numberOfBytesRead;
} else {
if (offset < 0 || len < 0 || len > b.length - offset) {
throw new IndexOutOfBoundsException("read arguments out of range");
}
if (len == 0) {
return 0;
}
int bytesRead = 0;
int available = available();
if (available > 0) {
bytesRead = Math.min(available, len);
System.arraycopy(streamBuffer, streamBufferPosition, b, offset,
bytesRead);
streamBufferPosition += bytesRead;
}
if (len == bytesRead) {
return len;
}
if (streamPosition >= streamLength) {
return (bytesRead > 0) ? bytesRead : -1;
}
offset += bytesRead;
len -= bytesRead;
return bytesRead + doNetworkRead(b, offset, len);
}
}
/**
* Reads the next byte of data from the stream.
* @return the next byte of data, or -1
* @throws IOException IO failure
*/
@Override
public int read() throws IOException {
byte[] buffer = new byte[1];
int numberOfBytesRead = read(buffer, 0, 1);
return (numberOfBytesRead < 1) ? -1 : buffer[0];
}
/**
* Skips over and discards n bytes of data from this input stream.
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
* @throws IOException IO failure
* @throws IndexOutOfBoundsException if n is negative or if the sum of n
* and the current value of getPos() is greater than the length of the stream.
*/
@Override
public synchronized long skip(long n) throws IOException {
checkState();
if (blobInputStream != null) {
// blobInput stream is open; delegate the work to it
long skipped = blobInputStream.skip(n);
// update position to the actual skip value
streamPosition += skipped;
return skipped;
}
// no blob stream; implement the skip logic directly
if (n < 0 || n > streamLength - getPos()) {
throw new IndexOutOfBoundsException("skip range");
}
if (streamBuffer != null) {
// there's a buffer, so seek with it
if (n < streamBufferLength - streamBufferPosition) {
// new range is in the buffer, so just update the buffer position
// skip within the buffer.
streamBufferPosition += (int) n;
} else {
// skip is out of range, so move position to ne value and reset
// the buffer ready for the next read()
streamPosition = getPos() + n;
resetStreamBuffer();
}
} else {
// no stream buffer; increment the stream position ready for
// the next triggered connection & read
streamPosition += n;
}
return n;
}
/**
* An <code>OutputStream</code> backed by a user-supplied buffer.
*/
static class MemoryOutputStream extends OutputStream {
private final byte[] buffer;
private final int offset;
private final int length;
private int writePosition;
/**
* Creates a <code>MemoryOutputStream</code> from a user-supplied buffer.
* @param buffer an array of bytes.
* @param offset a starting offset in <code>buffer</code> where the data
* will be written.
* @param length the maximum number of bytes to be written to the stream.
*/
MemoryOutputStream(byte[] buffer, int offset, int length) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
if (offset < 0 || length < 0 || length > buffer.length - offset) {
throw new IndexOutOfBoundsException("offset out of range of buffer");
}
this.buffer = buffer;
this.offset = offset;
this.length = length;
this.writePosition = offset;
}
/**
* Gets the current size of the stream.
*/
public synchronized int size() {
return writePosition - offset;
}
/**
* Gets the current capacity of the stream.
*/
public synchronized int capacity() {
return length;
}
/**
* Writes the next byte to the stream.
* @param b the byte to be written.
* @throws IOException IO failure
*/
public synchronized void write(int b) throws IOException {
if (size() > length - 1) {
throw new IOException("No space for more writes");
}
buffer[writePosition++] = (byte) b;
}
/**
* Writes a range of bytes to the stream.
* @param b a byte array.
* @param off the start offset in <code>buffer</code> from which the data
* is read.
* @param length the number of bytes to be written.
* @throws IOException IO failure
*/
public synchronized void write(byte[] b, int off, int length)
throws IOException {
if (b == null) {
throw new NullPointerException("Null buffer argument");
}
if (off < 0 || length < 0 || length > b.length - off) {
throw new IndexOutOfBoundsException("array write offset");
}
System.arraycopy(b, off, buffer, writePosition, length);
writePosition += length;
}
}
}