blob: fe48cb93237126333a8d906989bfff69b95e22aa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
/**
* The AbfsInputStream for AbfsClient.
*/
public class AbfsInputStream extends FSInputStream {
private final AbfsClient client;
private final Statistics statistics;
private final String path;
private final long contentLength;
private final int bufferSize; // default buffer size
private final int readAheadQueueDepth; // initialized in constructor
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
private byte[] buffer = null; // will be initialized on first use
private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
private long fCursorAfterLastRead = -1;
private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
// of valid bytes in buffer)
private boolean closed = false;
public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
final String path,
final long contentLength,
final int bufferSize,
final int readAheadQueueDepth,
final boolean tolerateOobAppends,
final String eTag) {
this.client = client;
this.statistics = statistics;
this.path = path;
this.contentLength = contentLength;
this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.tolerateOobAppends = tolerateOobAppends;
this.eTag = eTag;
this.readAheadEnabled = true;
}
public String getPath() {
return path;
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
int numberOfBytesRead = read(b, 0, 1);
if (numberOfBytesRead < 0) {
return -1;
} else {
return (b[0] & 0xFF);
}
}
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
currentOff += lastReadBytes;
currentLen -= lastReadBytes;
totalReadBytes += lastReadBytes;
}
if (currentLen <= 0 || currentLen > b.length - currentOff) {
break;
}
} while (lastReadBytes > 0);
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Preconditions.checkNotNull(b);
if (len == 0) {
return 0;
}
if (this.available() == 0) {
return -1;
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
//If buffer is empty, then fill the buffer.
if (bCursor == limit) {
//If EOF, then return -1
if (fCursor >= contentLength) {
return -1;
}
long bytesRead = 0;
//reset buffer to initial state - i.e., throw away existing data
bCursor = 0;
limit = 0;
if (buffer == null) {
buffer = new byte[bufferSize];
}
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
}
if (bytesRead == -1) {
return -1;
}
limit += bytesRead;
fCursor += bytesRead;
fCursorAfterLastRead = fCursor;
}
//If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
//(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor;
int bytesToRead = Math.min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead);
bCursor += bytesToRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
return bytesToRead;
}
private int readInternal(final long position, final byte[] b, final int offset, final int length,
final boolean bypassReadAhead) throws IOException {
if (readAheadEnabled && !bypassReadAhead) {
// try reading from read-ahead
if (offset != 0) {
throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
}
int receivedBytes;
// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
}
// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
return receivedBytes;
}
// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
return readRemote(position, b, offset, length);
}
}
int readRemote(long position, byte[] b, int offset, int length) throws IOException {
if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset");
}
if (position >= contentLength) {
return -1; // Hadoop prefers -1 to EOFException
}
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
if (offset >= b.length) {
throw new IllegalArgumentException("offset greater than length of array");
}
if (length < 0) {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (length > (b.length - offset)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
final AbfsRestOperation op;
try {
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ere.getMessage());
}
}
throw new IOException(ex);
}
long bytesRead = op.getResult().getBytesReceived();
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
return (int) bytesRead;
}
/**
* Seek to given position in stream.
* @param n position to seek to
* @throws IOException if there is an error
* @throws EOFException if attempting to seek past end of file
*/
@Override
public synchronized void seek(long n) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
if (n < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (n > contentLength) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
if (n>=fCursor-limit && n<=fCursor) { // within buffer
bCursor = (int) (n-(fCursor-limit));
return;
}
// next read will read from here
fCursor = n;
//invalidate buffer
limit = 0;
bCursor = 0;
}
@Override
public synchronized long skip(long n) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
long currentPos = getPos();
if (currentPos == contentLength) {
if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
}
long newPos = currentPos + n;
if (newPos < 0) {
newPos = 0;
n = newPos - currentPos;
}
if (newPos > contentLength) {
newPos = contentLength;
n = newPos - currentPos;
}
seek(newPos);
return n;
}
/**
* Return the size of the remaining available bytes
* if the size is less than or equal to {@link Integer#MAX_VALUE},
* otherwise, return {@link Integer#MAX_VALUE}.
*
* This is to match the behavior of DFSInputStream.available(),
* which some clients may rely on (HBase write-ahead log reading in
* particular).
*/
@Override
public synchronized int available() throws IOException {
if (closed) {
throw new IOException(
FSExceptionMessages.STREAM_IS_CLOSED);
}
final long remaining = this.contentLength - this.getPos();
return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE;
}
/**
* Returns the length of the file that this stream refers to. Note that the length returned is the length
* as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
* @throws IOException if the stream is closed
*/
public long length() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return contentLength;
}
/**
* Return the current offset from the start of the file
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public synchronized long getPos() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return fCursor - limit + bCursor;
}
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
* @throws IOException throws {@link IOException} if there is an error
*/
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized void close() throws IOException {
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
* @param readlimit ignored
*/
@Override
public synchronized void mark(int readlimit) {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
*/
@Override
public synchronized void reset() throws IOException {
throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
}
/**
* gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
*
* @return always {@code false}
*/
@Override
public boolean markSupported() {
return false;
}
}