| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.azure; |
| |
| import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE; |
| import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE; |
| import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE; |
| import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.fromShort; |
| import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.util.Arrays; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.hadoop.fs.Syncable; |
| import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper; |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.microsoft.azure.storage.OperationContext; |
| import com.microsoft.azure.storage.StorageException; |
| import com.microsoft.azure.storage.blob.BlobRequestOptions; |
| import com.microsoft.azure.storage.blob.CloudPageBlob; |
| |
| |
| /** |
| * An output stream that write file data to a page blob stored using ASV's |
| * custom format. |
| */ |
| final class PageBlobOutputStream extends OutputStream implements Syncable { |
| /** |
| * The maximum number of raw bytes Azure Storage allows us to upload in a |
| * single request (4 MB). |
| */ |
| private static final int MAX_RAW_BYTES_PER_REQUEST = 4 * 1024 * 1024; |
| /** |
| * The maximum number of pages Azure Storage allows us to upload in a |
| * single request. |
| */ |
| private static final int MAX_PAGES_IN_REQUEST = |
| MAX_RAW_BYTES_PER_REQUEST / PAGE_SIZE; |
| /** |
| * The maximum number of data bytes (header not included) we can upload |
| * in a single request. I'm limiting it to (N - 1) pages to account for |
| * the possibility that we may have to rewrite the previous request's |
| * last page. |
| */ |
| private static final int MAX_DATA_BYTES_PER_REQUEST = |
| PAGE_DATA_SIZE * (MAX_PAGES_IN_REQUEST - 1); |
| |
| private final CloudPageBlobWrapper blob; |
| private final OperationContext opContext; |
| |
| /** |
| * If the IO thread encounters an error, it'll store it here. |
| */ |
| private volatile IOException lastError; |
| |
| /** |
| * Current size of the page blob in bytes. It may be extended if the file |
| * gets full. |
| */ |
| private long currentBlobSize; |
| /** |
| * The current byte offset we're at in the blob (how many bytes we've |
| * uploaded to the server). |
| */ |
| private long currentBlobOffset; |
| /** |
| * The data in the last page that we wrote to the server, in case we have to |
| * overwrite it in the new request. |
| */ |
| private byte[] previousLastPageDataWritten = new byte[0]; |
| /** |
| * The current buffer we're writing to before sending to the server. |
| */ |
| private ByteArrayOutputStream outBuffer; |
| /** |
| * The task queue for writing to the server. |
| */ |
| private final LinkedBlockingQueue<Runnable> ioQueue; |
| /** |
| * The thread pool we're using for writing to the server. Note that the IO |
| * write is NOT designed for parallelism, so there can only be one thread |
| * in that pool (I'm using the thread pool mainly for the lifetime management |
| * capabilities, otherwise I'd have just used a simple Thread). |
| */ |
| private final ThreadPoolExecutor ioThreadPool; |
| |
| // The last task given to the ioThreadPool to execute, to allow |
| // waiting until it's done. |
| private WriteRequest lastQueuedTask; |
| // Whether the stream has been closed. |
| private boolean closed = false; |
| |
| public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class); |
| |
| // Set the minimum page blob file size to 128MB, which is >> the default |
| // block size of 32MB. This default block size is often used as the |
| // hbase.regionserver.hlog.blocksize. |
| // The goal is to have a safe minimum size for HBase log files to allow them |
| // to be filled and rolled without exceeding the minimum size. A larger size |
| // can be used by setting the fs.azure.page.blob.size configuration variable. |
| public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L; |
| |
| // The default and minimum amount to extend a page blob by if it starts |
| // to get full. |
| public static final long |
| PAGE_BLOB_DEFAULT_EXTENSION_SIZE = 128L * 1024L * 1024L; |
| |
| // The configured page blob extension size (either the default, or if greater, |
| // the value configured in fs.azure.page.blob.extension.size |
| private long configuredPageBlobExtensionSize; |
| |
| /** |
| * Constructs an output stream over the given page blob. |
| * |
| * @param blob the blob that this stream is associated with. |
| * @param opContext an object used to track the execution of the operation |
| * @throws StorageException if anything goes wrong creating the blob. |
| */ |
| public PageBlobOutputStream(final CloudPageBlobWrapper blob, |
| final OperationContext opContext, |
| final Configuration conf) throws StorageException { |
| this.blob = blob; |
| this.outBuffer = new ByteArrayOutputStream(); |
| this.opContext = opContext; |
| this.lastQueuedTask = null; |
| this.ioQueue = new LinkedBlockingQueue<Runnable>(); |
| |
| // As explained above: the IO writes are not designed for parallelism, |
| // so we only have one thread in this thread pool. |
| this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS, |
| ioQueue); |
| |
| |
| |
| // Make page blob files have a size that is the greater of a |
| // minimum size, or the value of fs.azure.page.blob.size from configuration. |
| long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0); |
| LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize |
| + " from configuration (0 if not present)."); |
| long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize); |
| |
| // Ensure that the pageBlobSize is a multiple of page size. |
| if (pageBlobSize % PAGE_SIZE != 0) { |
| pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE; |
| } |
| blob.create(pageBlobSize, new BlobRequestOptions(), opContext); |
| currentBlobSize = pageBlobSize; |
| |
| // Set the page blob extension size. It must be a minimum of the default |
| // value. |
| configuredPageBlobExtensionSize = |
| conf.getLong("fs.azure.page.blob.extension.size", 0); |
| if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) { |
| configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE; |
| } |
| |
| // make sure it is a multiple of the page size |
| if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) { |
| configuredPageBlobExtensionSize += |
| PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE; |
| } |
| } |
| |
| private void checkStreamState() throws IOException { |
| if (lastError != null) { |
| throw lastError; |
| } |
| } |
| |
| /** |
| * Closes this output stream and releases any system resources associated with |
| * this stream. If any data remains in the buffer it is committed to the |
| * service. |
| */ |
| @Override |
| public synchronized void close() throws IOException { |
| if (closed) { |
| return; |
| } |
| |
| LOG.debug("Closing page blob output stream."); |
| flush(); |
| checkStreamState(); |
| ioThreadPool.shutdown(); |
| try { |
| LOG.debug(ioThreadPool.toString()); |
| if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) { |
| LOG.debug("Timed out after 10 minutes waiting for IO requests to finish"); |
| NativeAzureFileSystemHelper.logAllLiveStackTraces(); |
| LOG.debug(ioThreadPool.toString()); |
| throw new IOException("Timed out waiting for IO requests to finish"); |
| } |
| } catch (InterruptedException e) { |
| LOG.debug("Caught InterruptedException"); |
| |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| } |
| |
| closed = true; |
| } |
| |
| |
| |
| /** |
| * A single write request for data to write to Azure storage. |
| */ |
| private class WriteRequest implements Runnable { |
| private final byte[] dataPayload; |
| private final CountDownLatch doneSignal = new CountDownLatch(1); |
| |
| public WriteRequest(byte[] dataPayload) { |
| this.dataPayload = dataPayload; |
| } |
| |
| public void waitTillDone() throws InterruptedException { |
| doneSignal.await(); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| LOG.debug("before runInternal()"); |
| runInternal(); |
| LOG.debug("after runInternal()"); |
| } finally { |
| doneSignal.countDown(); |
| } |
| } |
| |
| private void runInternal() { |
| if (lastError != null) { |
| // We're already in an error state, no point doing anything. |
| return; |
| } |
| if (dataPayload.length == 0) { |
| // Nothing to do. |
| return; |
| } |
| |
| // Since we have to rewrite the last request's last page's data |
| // (may be empty), total data size is our data plus whatever was |
| // left from there. |
| final int totalDataBytes = dataPayload.length |
| + previousLastPageDataWritten.length; |
| // Calculate the total number of pages we're writing to the server. |
| final int numberOfPages = (totalDataBytes / PAGE_DATA_SIZE) |
| + (totalDataBytes % PAGE_DATA_SIZE == 0 ? 0 : 1); |
| // Fill up the raw bytes we're writing. |
| byte[] rawPayload = new byte[numberOfPages * PAGE_SIZE]; |
| // Keep track of the size of the last page we uploaded. |
| int currentLastPageDataSize = -1; |
| for (int page = 0; page < numberOfPages; page++) { |
| // Our current byte offset in the data. |
| int dataOffset = page * PAGE_DATA_SIZE; |
| // Our current byte offset in the raw buffer. |
| int rawOffset = page * PAGE_SIZE; |
| // The size of the data in the current page. |
| final short currentPageDataSize = (short) Math.min(PAGE_DATA_SIZE, |
| totalDataBytes - dataOffset); |
| // Save off this page's size as the potential last page's size. |
| currentLastPageDataSize = currentPageDataSize; |
| |
| // Write out the page size in the header. |
| final byte[] header = fromShort(currentPageDataSize); |
| System.arraycopy(header, 0, rawPayload, rawOffset, header.length); |
| rawOffset += header.length; |
| |
| int bytesToCopyFromDataPayload = currentPageDataSize; |
| if (dataOffset < previousLastPageDataWritten.length) { |
| // First write out the last page's data. |
| final int bytesToCopyFromLastPage = Math.min(currentPageDataSize, |
| previousLastPageDataWritten.length - dataOffset); |
| System.arraycopy(previousLastPageDataWritten, dataOffset, |
| rawPayload, rawOffset, bytesToCopyFromLastPage); |
| bytesToCopyFromDataPayload -= bytesToCopyFromLastPage; |
| rawOffset += bytesToCopyFromLastPage; |
| dataOffset += bytesToCopyFromLastPage; |
| } |
| |
| if (dataOffset >= previousLastPageDataWritten.length) { |
| // Then write the current payload's data. |
| System.arraycopy(dataPayload, |
| dataOffset - previousLastPageDataWritten.length, |
| rawPayload, rawOffset, bytesToCopyFromDataPayload); |
| } |
| } |
| |
| // Raw payload constructed, ship it off to the server. |
| writePayloadToServer(rawPayload); |
| |
| // Post-send bookkeeping. |
| currentBlobOffset += rawPayload.length; |
| if (currentLastPageDataSize < PAGE_DATA_SIZE) { |
| // Partial page, save it off so it's overwritten in the next request. |
| final int startOffset = (numberOfPages - 1) * PAGE_SIZE + PAGE_HEADER_SIZE; |
| previousLastPageDataWritten = Arrays.copyOfRange(rawPayload, |
| startOffset, |
| startOffset + currentLastPageDataSize); |
| // Since we're rewriting this page, set our current offset in the server |
| // to that page's beginning. |
| currentBlobOffset -= PAGE_SIZE; |
| } else { |
| // It wasn't a partial page, we won't need to rewrite it. |
| previousLastPageDataWritten = new byte[0]; |
| } |
| |
| // Extend the file if we need more room in the file. This typically takes |
| // less than 200 milliseconds if it has to actually be done, |
| // so it is okay to include it in a write and won't cause a long pause. |
| // Other writes can be queued behind this write in any case. |
| conditionalExtendFile(); |
| } |
| |
| /** |
| * Writes the given raw payload to Azure Storage at the current blob |
| * offset. |
| */ |
| private void writePayloadToServer(byte[] rawPayload) { |
| final ByteArrayInputStream wrapperStream = |
| new ByteArrayInputStream(rawPayload); |
| LOG.debug("writing payload of " + rawPayload.length + " bytes to Azure page blob"); |
| try { |
| long start = System.currentTimeMillis(); |
| blob.uploadPages(wrapperStream, currentBlobOffset, rawPayload.length, |
| withMD5Checking(), PageBlobOutputStream.this.opContext); |
| long end = System.currentTimeMillis(); |
| LOG.trace("Azure uploadPages time for " + rawPayload.length + " bytes = " + (end - start)); |
| } catch (IOException ex) { |
| LOG.debug(ExceptionUtils.getStackTrace(ex)); |
| lastError = ex; |
| } catch (StorageException ex) { |
| LOG.debug(ExceptionUtils.getStackTrace(ex)); |
| lastError = new IOException(ex); |
| } |
| if (lastError != null) { |
| LOG.debug("Caught error in PageBlobOutputStream#writePayloadToServer()"); |
| } |
| } |
| } |
| |
| private synchronized void flushIOBuffers() { |
| if (outBuffer.size() == 0) { |
| return; |
| } |
| lastQueuedTask = new WriteRequest(outBuffer.toByteArray()); |
| ioThreadPool.execute(lastQueuedTask); |
| outBuffer = new ByteArrayOutputStream(); |
| } |
| |
| @VisibleForTesting |
| synchronized void waitForLastFlushCompletion() throws IOException { |
| try { |
| if (lastQueuedTask != null) { |
| lastQueuedTask.waitTillDone(); |
| } |
| } catch (InterruptedException e1) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** |
| * Extend the page blob file if we are close to the end. |
| */ |
| private void conditionalExtendFile() { |
| |
| // maximum allowed size of an Azure page blob (1 terabyte) |
| final long MAX_PAGE_BLOB_SIZE = 1024L * 1024L * 1024L * 1024L; |
| |
| // If blob is already at the maximum size, then don't try to extend it. |
| if (currentBlobSize == MAX_PAGE_BLOB_SIZE) { |
| return; |
| } |
| |
| // If we are within the maximum write size of the end of the file, |
| if (currentBlobSize - currentBlobOffset <= MAX_RAW_BYTES_PER_REQUEST) { |
| |
| // Extend the file. Retry up to 3 times with back-off. |
| CloudPageBlob cloudPageBlob = (CloudPageBlob) blob.getBlob(); |
| long newSize = currentBlobSize + configuredPageBlobExtensionSize; |
| |
| // Make sure we don't exceed maximum blob size. |
| if (newSize > MAX_PAGE_BLOB_SIZE) { |
| newSize = MAX_PAGE_BLOB_SIZE; |
| } |
| final int MAX_RETRIES = 3; |
| int retries = 1; |
| boolean resizeDone = false; |
| while(!resizeDone && retries <= MAX_RETRIES) { |
| try { |
| cloudPageBlob.resize(newSize); |
| resizeDone = true; |
| currentBlobSize = newSize; |
| } catch (StorageException e) { |
| LOG.warn("Failed to extend size of " + cloudPageBlob.getUri()); |
| try { |
| |
| // sleep 2, 8, 18 seconds for up to 3 retries |
| Thread.sleep(2000 * retries * retries); |
| } catch (InterruptedException e1) { |
| |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| } |
| } finally { |
| retries++; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Flushes this output stream and forces any buffered output bytes to be |
| * written out. If any data remains in the buffer it is committed to the |
| * service. Data is queued for writing but not forced out to the service |
| * before the call returns. |
| */ |
| @Override |
| public void flush() throws IOException { |
| checkStreamState(); |
| flushIOBuffers(); |
| } |
| |
| /** |
| * Writes b.length bytes from the specified byte array to this output stream. |
| * |
| * @param data |
| * the byte array to write. |
| * |
| * @throws IOException |
| * if an I/O error occurs. In particular, an IOException may be |
| * thrown if the output stream has been closed. |
| */ |
| @Override |
| public void write(final byte[] data) throws IOException { |
| write(data, 0, data.length); |
| } |
| |
| /** |
| * Writes length bytes from the specified byte array starting at offset to |
| * this output stream. |
| * |
| * @param data |
| * the byte array to write. |
| * @param offset |
| * the start offset in the data. |
| * @param length |
| * the number of bytes to write. |
| * @throws IOException |
| * if an I/O error occurs. In particular, an IOException may be |
| * thrown if the output stream has been closed. |
| */ |
| @Override |
| public void write(final byte[] data, final int offset, final int length) |
| throws IOException { |
| if (offset < 0 || length < 0 || length > data.length - offset) { |
| throw new IndexOutOfBoundsException(); |
| } |
| |
| writeInternal(data, offset, length); |
| } |
| |
| /** |
| * Writes the specified byte to this output stream. The general contract for |
| * write is that one byte is written to the output stream. The byte to be |
| * written is the eight low-order bits of the argument b. The 24 high-order |
| * bits of b are ignored. |
| * |
| * @param byteVal |
| * the byteValue to write. |
| * @throws IOException |
| * if an I/O error occurs. In particular, an IOException may be |
| * thrown if the output stream has been closed. |
| */ |
| @Override |
| public void write(final int byteVal) throws IOException { |
| write(new byte[] { (byte) (byteVal & 0xFF) }); |
| } |
| |
| /** |
| * Writes the data to the buffer and triggers writes to the service as needed. |
| * |
| * @param data |
| * the byte array to write. |
| * @param offset |
| * the start offset in the data. |
| * @param length |
| * the number of bytes to write. |
| * @throws IOException |
| * if an I/O error occurs. In particular, an IOException may be |
| * thrown if the output stream has been closed. |
| */ |
| private synchronized void writeInternal(final byte[] data, int offset, |
| int length) throws IOException { |
| while (length > 0) { |
| checkStreamState(); |
| final int availableBufferBytes = MAX_DATA_BYTES_PER_REQUEST |
| - this.outBuffer.size(); |
| final int nextWrite = Math.min(availableBufferBytes, length); |
| |
| outBuffer.write(data, offset, nextWrite); |
| offset += nextWrite; |
| length -= nextWrite; |
| |
| if (outBuffer.size() > MAX_DATA_BYTES_PER_REQUEST) { |
| throw new RuntimeException("Internal error: maximum write size " + |
| Integer.toString(MAX_DATA_BYTES_PER_REQUEST) + "exceeded."); |
| } |
| |
| if (outBuffer.size() == MAX_DATA_BYTES_PER_REQUEST) { |
| flushIOBuffers(); |
| } |
| } |
| } |
| |
| /** |
| * Force all data in the output stream to be written to Azure storage. |
| * Wait to return until this is complete. |
| */ |
| @Override |
| public synchronized void hsync() throws IOException { |
| LOG.debug("Entering PageBlobOutputStream#hsync()."); |
| long start = System.currentTimeMillis(); |
| flush(); |
| LOG.debug(ioThreadPool.toString()); |
| try { |
| if (lastQueuedTask != null) { |
| lastQueuedTask.waitTillDone(); |
| } |
| } catch (InterruptedException e1) { |
| |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| } |
| LOG.debug("Leaving PageBlobOutputStream#hsync(). Total hsync duration = " |
| + (System.currentTimeMillis() - start) + " msec."); |
| } |
| |
| @Override |
| public void hflush() throws IOException { |
| |
| // hflush is required to force data to storage, so call hsync, |
| // which does that. |
| hsync(); |
| } |
| |
| @Deprecated |
| public void sync() throws IOException { |
| |
| // Sync has been deprecated in favor of hflush. |
| hflush(); |
| } |
| |
| // For unit testing purposes: kill the IO threads. |
| @VisibleForTesting |
| void killIoThreads() { |
| ioThreadPool.shutdownNow(); |
| } |
| } |