blob: 40bf6f4ae1a70f1b305c3eb2cfce0a6469452c7f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.toShort;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.PageRange;
/**
* An input stream that reads file data from a page blob stored
* using ASV's custom format.
*/
final class PageBlobInputStream extends InputStream {
private static final Log LOG = LogFactory.getLog(PageBlobInputStream.class);
// The blob we're reading from.
private final CloudPageBlobWrapper blob;
// The operation context to use for storage requests.
private final OperationContext opContext;
// The number of pages remaining to be read from the server.
private long numberOfPagesRemaining;
// The current byte offset to start reading from the server next,
// equivalent to (total number of pages we've read) * (page size).
private long currentOffsetInBlob;
// The buffer holding the current data we last read from the server.
private byte[] currentBuffer;
// The current byte offset we're at in the buffer.
private int currentBufferOffset;
// The current buffer length
private int currentBufferLength;
// Maximum number of pages to get per any one request.
private static final int MAX_PAGES_PER_DOWNLOAD =
4 * 1024 * 1024 / PAGE_SIZE;
// Whether the stream has been closed.
private boolean closed = false;
// Total stream size, or -1 if not initialized.
long pageBlobSize = -1;
// Current position in stream of valid data.
long filePosition = 0;
/**
* Helper method to extract the actual data size of a page blob.
* This typically involves 2 service requests (one for page ranges, another
* for the last page's data).
*
* @param blob The blob to get the size from.
* @param opContext The operation context to use for the requests.
* @return The total data size of the blob in bytes.
* @throws IOException If the format is corrupt.
* @throws StorageException If anything goes wrong in the requests.
*/
public static long getPageBlobDataSize(CloudPageBlobWrapper blob,
OperationContext opContext) throws IOException, StorageException {
// Get the page ranges for the blob. There should be one range starting
// at byte 0, but we tolerate (and ignore) ranges after the first one.
ArrayList<PageRange> pageRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
if (pageRanges.size() == 0) {
return 0;
}
if (pageRanges.get(0).getStartOffset() != 0) {
// Not expected: we always upload our page blobs as a contiguous range
// starting at byte 0.
throw badStartRangeException(blob, pageRanges.get(0));
}
long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;
// Get the last page.
long lastPageStart = totalRawBlobSize - PAGE_SIZE;
ByteArrayOutputStream baos =
new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE);
blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
new BlobRequestOptions(), opContext);
byte[] lastPage = baos.toByteArray();
short lastPageSize = getPageSize(blob, lastPage, 0);
long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize;
}
/**
* Constructs a stream over the given page blob.
*/
public PageBlobInputStream(CloudPageBlobWrapper blob,
OperationContext opContext)
throws IOException {
this.blob = blob;
this.opContext = opContext;
ArrayList<PageRange> allRanges;
try {
allRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
} catch (StorageException e) {
throw new IOException(e);
}
if (allRanges.size() > 0) {
if (allRanges.get(0).getStartOffset() != 0) {
throw badStartRangeException(blob, allRanges.get(0));
}
if (allRanges.size() > 1) {
LOG.warn(String.format(
"Blob %s has %d page ranges beyond the first range. "
+ "Only reading the first range.",
blob.getUri(), allRanges.size() - 1));
}
numberOfPagesRemaining =
(allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE;
} else {
numberOfPagesRemaining = 0;
}
}
/** Return the size of the remaining available bytes
* if the size is less than or equal to {@link Integer#MAX_VALUE},
* otherwise, return {@link Integer#MAX_VALUE}.
*
* This is to match the behavior of DFSInputStream.available(),
* which some clients may rely on (HBase write-ahead log reading in
* particular).
*/
@Override
public synchronized int available() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (pageBlobSize == -1) {
try {
pageBlobSize = getPageBlobDataSize(blob, opContext);
} catch (StorageException e) {
throw new IOException("Unable to get page blob size.", e);
}
}
final long remaining = pageBlobSize - filePosition;
return remaining <= Integer.MAX_VALUE ?
(int) remaining : Integer.MAX_VALUE;
}
@Override
public synchronized void close() throws IOException {
closed = true;
}
private boolean dataAvailableInBuffer() {
return currentBuffer != null
&& currentBufferOffset < currentBufferLength;
}
/**
* Check our buffer and download more from the server if needed.
* If data is not available in the buffer, method downloads maximum
* page blob download size (4MB) or if there is less then 4MB left,
* all remaining pages.
* If we are on the last page, method will return true even if
* we reached the end of stream.
* @return true if there's more data in the buffer, false if buffer is empty
* and we reached the end of the blob.
* @throws IOException
*/
private synchronized boolean ensureDataInBuffer() throws IOException {
if (dataAvailableInBuffer()) {
// We still have some data in our buffer.
return true;
}
currentBuffer = null;
currentBufferOffset = 0;
currentBufferLength = 0;
if (numberOfPagesRemaining == 0) {
// No more data to read.
return false;
}
final long pagesToRead = Math.min(MAX_PAGES_PER_DOWNLOAD,
numberOfPagesRemaining);
final int bufferSize = (int) (pagesToRead * PAGE_SIZE);
// Download page to current buffer.
try {
// Create a byte array output stream to capture the results of the
// download.
ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
blob.downloadRange(currentOffsetInBlob, bufferSize, baos,
withMD5Checking(), opContext);
validateDataIntegrity(baos.toByteArray());
} catch (StorageException e) {
throw new IOException(e);
}
numberOfPagesRemaining -= pagesToRead;
currentOffsetInBlob += bufferSize;
return true;
}
private void validateDataIntegrity(byte[] buffer)
throws IOException {
if (buffer.length % PAGE_SIZE != 0) {
throw new AssertionError("Unexpected buffer size: "
+ buffer.length);
}
int bufferLength = 0;
int numberOfPages = buffer.length / PAGE_SIZE;
long totalPagesAfterCurrent = numberOfPagesRemaining;
for (int page = 0; page < numberOfPages; page++) {
// Calculate the number of pages that exist in the blob after this one
totalPagesAfterCurrent--;
short currentPageSize = getPageSize(blob, buffer, page * PAGE_SIZE);
// Only the last page can be partially filled.
if (currentPageSize < PAGE_DATA_SIZE
&& totalPagesAfterCurrent > 0) {
throw fileCorruptException(blob, String.format(
"Page with partial data found in the middle (%d pages from the"
+ " end) that only has %d bytes of data.",
totalPagesAfterCurrent, currentPageSize));
}
bufferLength += currentPageSize + PAGE_HEADER_SIZE;
}
currentBufferOffset = PAGE_HEADER_SIZE;
currentBufferLength = bufferLength;
currentBuffer = buffer;
}
// Reads the page size from the page header at the given offset.
private static short getPageSize(CloudPageBlobWrapper blob,
byte[] data, int offset) throws IOException {
short pageSize = toShort(data[offset], data[offset + 1]);
if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) {
throw fileCorruptException(blob, String.format(
"Unexpected page size in the header: %d.",
pageSize));
}
return pageSize;
}
@Override
public synchronized int read(byte[] outputBuffer, int offset, int len)
throws IOException {
// If len is zero return 0 per the InputStream contract
if (len == 0) {
return 0;
}
int numberOfBytesRead = 0;
while (len > 0) {
if (!ensureDataInBuffer()) {
break;
}
int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
System.arraycopy(currentBuffer, currentBufferOffset, outputBuffer,
offset, numBytesToRead);
numberOfBytesRead += numBytesToRead;
offset += numBytesToRead;
len -= numBytesToRead;
if (numBytesToRead == bytesRemainingInCurrentPage) {
// We've finished this page, move on to the next.
advancePagesInBuffer(1);
} else {
currentBufferOffset += numBytesToRead;
}
}
// if outputBuffer len is > 0 and zero bytes were read, we reached
// an EOF
if (numberOfBytesRead == 0) {
return -1;
}
filePosition += numberOfBytesRead;
return numberOfBytesRead;
}
@Override
public int read() throws IOException {
byte[] oneByte = new byte[1];
int result = read(oneByte);
if (result < 0) {
return result;
}
return oneByte[0];
}
/**
* Skips over and discards <code>n</code> bytes of data from this input
* stream. The <code>skip</code> method may, for a variety of reasons, end
* up skipping over some smaller number of bytes, possibly <code>0</code>.
* This may result from any of a number of conditions; reaching end of file
* before <code>n</code> bytes have been skipped is only one possibility.
* The actual number of bytes skipped is returned. If {@code n} is
* negative, the {@code skip} method for class {@code InputStream} always
* returns 0, and no bytes are skipped. Subclasses may handle the negative
* value differently.
*
* <p> The <code>skip</code> method of this class creates a
* byte array and then repeatedly reads into it until <code>n</code> bytes
* have been read or the end of the stream has been reached. Subclasses are
* encouraged to provide a more efficient implementation of this method.
* For instance, the implementation may depend on the ability to seek.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
* @exception IOException if the stream does not support seek,
* or if some other I/O error occurs.
*/
@Override
public synchronized long skip(long n) throws IOException {
long skipped = skipImpl(n);
filePosition += skipped; // track the position in the stream
return skipped;
}
private long skipImpl(long n) throws IOException {
if (n == 0) {
return 0;
}
// First skip within the current buffer as much as possible.
long skippedWithinBuffer = skipWithinBuffer(n);
if (skippedWithinBuffer > n) {
// TO CONSIDER: Using a contracts framework such as Google's cofoja for
// these post-conditions.
throw new AssertionError(String.format(
"Bug in skipWithinBuffer: it skipped over %d bytes when asked to "
+ "skip %d bytes.", skippedWithinBuffer, n));
}
n -= skippedWithinBuffer;
long skipped = skippedWithinBuffer;
if (n == 0) {
return skipped;
}
if (numberOfPagesRemaining == 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
} else if (numberOfPagesRemaining > 1) {
// skip over as many pages as we can, but we must read the last
// page as it may not be full
long pagesToSkipOver = Math.min(n / PAGE_DATA_SIZE,
numberOfPagesRemaining - 1);
numberOfPagesRemaining -= pagesToSkipOver;
currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
skipped += pagesToSkipOver * PAGE_DATA_SIZE;
n -= pagesToSkipOver * PAGE_DATA_SIZE;
}
if (n == 0) {
return skipped;
}
// Now read in at the current position, and skip within current buffer.
if (!ensureDataInBuffer()) {
return skipped;
}
return skipped + skipWithinBuffer(n);
}
/**
* Skip over n bytes within the current buffer or just over skip the whole
* buffer if n is greater than the bytes remaining in the buffer.
* @param n The number of data bytes to skip.
* @return The number of bytes actually skipped.
* @throws IOException if data corruption found in the buffer.
*/
private long skipWithinBuffer(long n) throws IOException {
if (!dataAvailableInBuffer()) {
return 0;
}
long skipped = 0;
// First skip within the current page.
skipped = skipWithinCurrentPage(n);
if (skipped > n) {
throw new AssertionError(String.format(
"Bug in skipWithinCurrentPage: it skipped over %d bytes when asked"
+ " to skip %d bytes.", skipped, n));
}
n -= skipped;
if (n == 0 || !dataAvailableInBuffer()) {
return skipped;
}
// Calculate how many whole pages (pages before the possibly partially
// filled last page) remain.
int currentPageIndex = currentBufferOffset / PAGE_SIZE;
int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE;
int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1;
if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) {
// I'm within one of the whole pages remaining, skip in there.
advancePagesInBuffer((int) (n / PAGE_DATA_SIZE));
currentBufferOffset += n % PAGE_DATA_SIZE;
return n + skipped;
}
// Skip over the whole pages.
advancePagesInBuffer(wholePagesRemaining);
skipped += wholePagesRemaining * PAGE_DATA_SIZE;
n -= wholePagesRemaining * PAGE_DATA_SIZE;
// At this point we know we need to skip to somewhere in the last page,
// or just go to the end.
return skipWithinCurrentPage(n) + skipped;
}
/**
* Skip over n bytes within the current page or just over skip the whole
* page if n is greater than the bytes remaining in the page.
* @param n The number of data bytes to skip.
* @return The number of bytes actually skipped.
* @throws IOException if data corruption found in the buffer.
*/
private long skipWithinCurrentPage(long n) throws IOException {
int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage();
if (n <= remainingBytesInCurrentPage) {
currentBufferOffset += n;
return n;
} else {
advancePagesInBuffer(1);
return remainingBytesInCurrentPage;
}
}
/**
* Gets the number of bytes remaining within the current page in the buffer.
* @return The number of bytes remaining.
* @throws IOException if data corruption found in the buffer.
*/
private int getBytesRemainingInCurrentPage() throws IOException {
if (!dataAvailableInBuffer()) {
return 0;
}
// Calculate our current position relative to the start of the current
// page.
int currentDataOffsetInPage =
(currentBufferOffset % PAGE_SIZE) - PAGE_HEADER_SIZE;
int pageBoundary = getCurrentPageStartInBuffer();
// Get the data size of the current page from the header.
short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary);
return sizeOfCurrentPage - currentDataOffsetInPage;
}
private static IOException badStartRangeException(CloudPageBlobWrapper blob,
PageRange startRange) {
return fileCorruptException(blob, String.format(
"Page blobs for ASV should always use a page range starting at byte 0. "
+ "This starts at byte %d.",
startRange.getStartOffset()));
}
private void advancePagesInBuffer(int numberOfPages) {
currentBufferOffset =
getCurrentPageStartInBuffer()
+ (numberOfPages * PAGE_SIZE)
+ PAGE_HEADER_SIZE;
}
private int getCurrentPageStartInBuffer() {
return PAGE_SIZE * (currentBufferOffset / PAGE_SIZE);
}
private static IOException fileCorruptException(CloudPageBlobWrapper blob,
String reason) {
return new IOException(String.format(
"The page blob: '%s' is corrupt or has an unexpected format: %s.",
blob.getUri(), reason));
}
}