blob: 1cd798aa50197b22f4f5717ffe5e6c326abdc97e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.BlockSearchMode;
/**
* Stream object that implements append for Block Blobs in WASB.
*
* The stream object implements hflush/hsync and block compaction. Block
* compaction is the process of replacing a sequence of small blocks with one
* big block. Azure Block blobs supports up to 50000 blocks and every
* hflush/hsync generates one block. When the number of blocks is above 32000,
* the process of compaction decreases the total number of blocks, if possible.
* If compaction is disabled, hflush/hsync are empty functions.
*
* The stream object uses background threads for uploading the blocks and the
* block blob list. Blocks can be uploaded concurrently. However, when the block
* list is uploaded, block uploading should stop. If a block is uploaded before
* the block list and the block id is not in the list, the block will be lost.
* If the block is uploaded after the block list and the block id is in the
* list, the block list upload will fail. The exclusive access for the block
* list upload is managed by uploadingSemaphore.
*/
public class BlockBlobAppendStream extends OutputStream implements Syncable,
StreamCapabilities {
/**
* The name of the blob/file.
*/
private final String key;
/**
* This variable tracks if this is new blob or existing one.
*/
private boolean blobExist;
/**
* When the blob exist, to to prevent concurrent write we take a lease.
* Taking a lease is not necessary for new blobs.
*/
private SelfRenewingLease lease = null;
/**
* The support for process of compaction is optional.
*/
private final boolean compactionEnabled;
/**
* The number of blocks above each block compaction is triggered.
*/
private static final int DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT = 32000;
/**
* The number of blocks above each block compaction is triggered.
*/
private int activateCompactionBlockCount
= DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT;
/**
* The size of the output buffer. Writes store the data in outBuffer until
* either the size is above maxBlockSize or hflush/hsync is called.
*/
private final AtomicInteger maxBlockSize;
/**
* The current buffer where writes are stored.
*/
private ByteBuffer outBuffer;
/**
* The size of the blob that has been successfully stored in the Azure Blob
* service.
*/
private final AtomicLong committedBlobLength = new AtomicLong(0);
/**
* Position of last block in the blob.
*/
private volatile long blobLength = 0;
/**
* Minutes waiting before the close operation timed out.
*/
private static final int CLOSE_UPLOAD_DELAY = 10;
/**
* Keep alive time for the threadpool.
*/
private static final int THREADPOOL_KEEP_ALIVE = 30;
/**
* Azure Block Blob used for the stream.
*/
private final CloudBlockBlobWrapper blob;
/**
* Azure Storage operation context.
*/
private final OperationContext opContext;
/**
* Commands send from client calls to the background thread pool.
*/
private abstract class UploadCommand {
// the blob offset for the command
private final long commandBlobOffset;
// command completion latch
private final CountDownLatch completed = new CountDownLatch(1);
UploadCommand(long offset) {
this.commandBlobOffset = offset;
}
long getCommandBlobOffset() {
return commandBlobOffset;
}
void await() throws InterruptedException {
completed.await();
}
void awaitAsDependent() throws InterruptedException {
await();
}
void setCompleted() {
completed.countDown();
}
void execute() throws InterruptedException, IOException {}
void dump() {}
}
/**
* The list of recent commands. Before block list is committed, all the block
* listed in the list must be uploaded. activeBlockCommands is used for
* enumerating the blocks and waiting on the latch until the block is
* uploaded.
*/
private final ConcurrentLinkedQueue<UploadCommand> activeBlockCommands
= new ConcurrentLinkedQueue<>();
/**
* Variable to track if the stream has been closed.
*/
private volatile boolean closed = false;
/**
* First IOException encountered.
*/
private final AtomicReference<IOException> firstError
= new AtomicReference<>();
/**
* Flag set when the first error has been thrown.
*/
private boolean firstErrorThrown = false;
/**
* Semaphore for serializing block uploads with NativeAzureFileSystem.
*
* The semaphore starts with number of permits equal to the number of block
* upload threads. Each block upload thread needs one permit to start the
* upload. The put block list acquires all the permits before the block list
* is committed.
*/
private final Semaphore uploadingSemaphore = new Semaphore(
MAX_NUMBER_THREADS_IN_THREAD_POOL,
true);
/**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
private final ElasticByteBufferPool poolReadyByteBuffers
= new ElasticByteBufferPool();
/**
* The blob's block list.
*/
private final List<BlockEntry> blockEntries = new ArrayList<>(
DEFAULT_CAPACITY_BLOCK_ENTRIES);
private static final int DEFAULT_CAPACITY_BLOCK_ENTRIES = 1024;
/**
* The uncommitted blob's block list.
*/
private final ConcurrentLinkedDeque<BlockEntry> uncommittedBlockEntries
= new ConcurrentLinkedDeque<>();
/**
* Variable to hold the next block id to be used for azure storage blocks.
*/
private static final int UNSET_BLOCKS_COUNT = -1;
private long nextBlockCount = UNSET_BLOCKS_COUNT;
/**
* Variable to hold the block id prefix to be used for azure storage blocks.
*/
private String blockIdPrefix = null;
/**
* Maximum number of threads in block upload thread pool.
*/
private static final int MAX_NUMBER_THREADS_IN_THREAD_POOL = 4;
/**
* Number of times block upload needs is retried.
*/
private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
/**
* Wait time between block upload retries in milliseconds.
*/
private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
/**
* Logger.
*/
private static final Logger LOG =
LoggerFactory.getLogger(BlockBlobAppendStream.class);
/**
* The absolute maximum of blocks for a blob. It includes committed and
* temporary blocks.
*/
private static final int MAX_BLOCK_COUNT = 100000;
/**
* The upload thread pool executor.
*/
private ThreadPoolExecutor ioThreadPool;
/**
* Azure Storage access conditions for the blob.
*/
private final AccessCondition accessCondition = new AccessCondition();
/**
* Atomic integer to provide thread id for thread names for uploader threads.
*/
private final AtomicInteger threadSequenceNumber;
/**
* Prefix to be used for thread names for uploader threads.
*/
private static final String THREAD_ID_PREFIX = "append-blockblob";
/**
* BlockBlobAppendStream constructor.
*
* @param blob
* Azure Block Blob
* @param aKey
* blob's name
* @param bufferSize
* the maximum size of a blob block.
* @param compactionEnabled
* is the compaction process enabled for this blob
* @param opContext
* Azure Store operation context for the blob
* @throws IOException
* if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream cannot be used for append operations
*/
public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
final String aKey,
final int bufferSize,
final boolean compactionEnabled,
final OperationContext opContext)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(aKey));
Preconditions.checkArgument(bufferSize >= 0);
this.blob = blob;
this.opContext = opContext;
this.key = aKey;
this.maxBlockSize = new AtomicInteger(bufferSize);
this.threadSequenceNumber = new AtomicInteger(0);
this.blockIdPrefix = null;
this.compactionEnabled = compactionEnabled;
this.blobExist = true;
this.outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get());
try {
// download the block list
blockEntries.addAll(
blob.downloadBlockList(
BlockListingFilter.COMMITTED,
new BlobRequestOptions(),
opContext));
blobLength = blob.getProperties().getLength();
committedBlobLength.set(blobLength);
// Acquiring lease on the blob.
lease = new SelfRenewingLease(blob, true);
accessCondition.setLeaseID(lease.getLeaseID());
} catch (StorageException ex) {
if (ex.getErrorCode().equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)) {
blobExist = false;
}
else if (ex.getErrorCode().equals(
StorageErrorCodeStrings.LEASE_ALREADY_PRESENT)) {
throw new AzureException(
"Unable to set Append lease on the Blob: " + ex, ex);
}
else {
LOG.debug(
"Encountered storage exception."
+ " StorageException : {} ErrorCode : {}",
ex,
ex.getErrorCode());
throw new AzureException(ex);
}
}
setBlocksCountAndBlockIdPrefix(blockEntries);
this.ioThreadPool = new ThreadPoolExecutor(
MAX_NUMBER_THREADS_IN_THREAD_POOL,
MAX_NUMBER_THREADS_IN_THREAD_POOL,
THREADPOOL_KEEP_ALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new UploaderThreadFactory());
}
/**
* Set payload size of the stream.
* It is intended to be used for unit testing purposes only.
*/
@VisibleForTesting
synchronized void setMaxBlockSize(int size) {
maxBlockSize.set(size);
// it is for testing only so we can abandon the previously allocated
// payload
this.outBuffer = ByteBuffer.allocate(maxBlockSize.get());
}
/**
* Set compaction parameters.
* It is intended to be used for unit testing purposes only.
*/
@VisibleForTesting
void setCompactionBlockCount(int activationCount) {
activateCompactionBlockCount = activationCount;
}
/**
* Get the list of block entries. It is used for testing purposes only.
* @return List of block entries.
*/
@VisibleForTesting
List<BlockEntry> getBlockList() throws StorageException, IOException {
return blob.downloadBlockList(
BlockListingFilter.COMMITTED,
new BlobRequestOptions(),
opContext);
}
/**
* Writes the specified byte to this output stream. The general contract for
* write is that one byte is written to the output stream. The byte to be
* written is the eight low-order bits of the argument b. The 24 high-order
* bits of b are ignored.
*
* @param byteVal
* the byteValue to write.
* @throws IOException
* if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public void write(final int byteVal) throws IOException {
write(new byte[] { (byte) (byteVal & 0xFF) });
}
/**
* Writes length bytes from the specified byte array starting at offset to
* this output stream.
*
* @param data
* the byte array to write.
* @param offset
* the start offset in the data.
* @param length
* the number of bytes to write.
* @throws IOException
* if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public synchronized void write(final byte[] data, int offset, int length)
throws IOException {
Preconditions.checkArgument(data != null, "null data");
if (offset < 0 || length < 0 || length > data.length - offset) {
throw new IndexOutOfBoundsException();
}
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
while (outBuffer.remaining() < length) {
int remaining = outBuffer.remaining();
outBuffer.put(data, offset, remaining);
// upload payload to azure storage
addBlockUploadCommand();
offset += remaining;
length -= remaining;
}
outBuffer.put(data, offset, length);
}
/**
* Flushes this output stream and forces any buffered output bytes to be
* written out. If any data remains in the payload it is committed to the
* service. Data is queued for writing and forced out to the service
* before the call returns.
*/
@Override
public void flush() throws IOException {
if (closed) {
// calling close() after the stream is closed starts with call to flush()
return;
}
addBlockUploadCommand();
if (committedBlobLength.get() < blobLength) {
try {
// wait until the block list is committed
addFlushCommand().await();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
/**
* Force all data in the output stream to be written to Azure storage.
* Wait to return until this is complete.
*/
@Override
public void sync() throws IOException {
// when block compaction is disabled, hsync is empty function
if (compactionEnabled) {
flush();
}
}
/**
* Force all data in the output stream to be written to Azure storage.
* Wait to return until this is complete.
*/
@Override
public void hsync() throws IOException {
// when block compaction is disabled, hsync is empty function
if (compactionEnabled) {
flush();
}
}
/**
* Force all data in the output stream to be written to Azure storage.
* Wait to return until this is complete.
*/
@Override
public void hflush() throws IOException {
// when block compaction is disabled, hflush is empty function
if (compactionEnabled) {
flush();
}
}
/**
* The Synchronization capabilities of this stream depend upon the compaction
* policy.
* @param capability string to query the stream support for.
* @return true for hsync and hflush when compaction is enabled.
*/
@Override
public boolean hasCapability(String capability) {
if (!compactionEnabled) {
return false;
}
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.HSYNC:
case StreamCapabilities.HFLUSH:
return true;
default:
return false;
}
}
/**
* Force all data in the output stream to be written to Azure storage.
* Wait to return until this is complete. Close the access to the stream and
* shutdown the upload thread pool.
* If the blob was created, its lease will be released.
* Any error encountered caught in threads and stored will be rethrown here
* after cleanup.
*/
@Override
public synchronized void close() throws IOException {
LOG.debug("close {} ", key);
if (closed) {
return;
}
// Upload the last block regardless of compactionEnabled flag
flush();
// Initiates an orderly shutdown in which previously submitted tasks are
// executed.
ioThreadPool.shutdown();
try {
// wait up to CLOSE_UPLOAD_DELAY minutes to upload all the blocks
if (!ioThreadPool.awaitTermination(CLOSE_UPLOAD_DELAY, TimeUnit.MINUTES)) {
LOG.error("Time out occured while close() is waiting for IO request to"
+ " finish in append"
+ " for blob : {}",
key);
NativeAzureFileSystemHelper.logAllLiveStackTraces();
throw new AzureException("Timed out waiting for IO requests to finish");
}
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
// release the lease
if (firstError.get() == null && blobExist) {
try {
lease.free();
} catch (StorageException ex) {
LOG.debug("Lease free update blob {} encountered Storage Exception:"
+ " {} Error Code : {}",
key,
ex,
ex.getErrorCode());
maybeSetFirstError(new AzureException(ex));
}
}
closed = true;
// finally, throw the first exception raised if it has not
// been thrown elsewhere.
if (firstError.get() != null && !firstErrorThrown) {
throw firstError.get();
}
}
/**
* Helper method used to generate the blockIDs. The algorithm used is similar
* to the Azure storage SDK.
*/
private void setBlocksCountAndBlockIdPrefix(List<BlockEntry> blockEntries) {
if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix == null) {
Random sequenceGenerator = new Random();
String blockZeroBlockId = (!blockEntries.isEmpty())
? blockEntries.get(0).getId()
: "";
String prefix = UUID.randomUUID().toString() + "-";
String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix,
0);
if (!blockEntries.isEmpty()
&& blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) {
// If blob has already been created with 2.2.0, append subsequent blocks
// with older version (2.2.0) blockId compute nextBlockCount, the way it
// was done before; and don't use blockIdPrefix
this.blockIdPrefix = "";
nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+ sequenceGenerator.nextInt(
Integer.MAX_VALUE - MAX_BLOCK_COUNT);
nextBlockCount += blockEntries.size();
} else {
// If there are no existing blocks, create the first block with newer
// version (4.2.0) blockId. If blob has already been created with 4.2.0,
// append subsequent blocks with newer version (4.2.0) blockId
this.blockIdPrefix = prefix;
nextBlockCount = blockEntries.size();
}
}
}
/**
* Helper method that generates the next block id for uploading a block to
* azure storage.
* @return String representing the block ID generated.
* @throws IOException if the stream is in invalid state
*/
private String generateBlockId() throws IOException {
if (nextBlockCount == UNSET_BLOCKS_COUNT || blockIdPrefix == null) {
throw new AzureException(
"Append Stream in invalid state. nextBlockCount not set correctly");
}
return (!blockIdPrefix.isEmpty())
? generateNewerVersionBlockId(blockIdPrefix, nextBlockCount++)
: generateOlderVersionBlockId(nextBlockCount++);
}
/**
* Helper method that generates an older (2.2.0) version blockId.
* @return String representing the block ID generated.
*/
private String generateOlderVersionBlockId(long id) {
byte[] blockIdInBytes = new byte[8];
for (int m = 0; m < 8; m++) {
blockIdInBytes[7 - m] = (byte) ((id >> (8 * m)) & 0xFF);
}
return new String(
Base64.encodeBase64(blockIdInBytes),
StandardCharsets.UTF_8);
}
/**
* Helper method that generates an newer (4.2.0) version blockId.
* @return String representing the block ID generated.
*/
private String generateNewerVersionBlockId(String prefix, long id) {
String blockIdSuffix = String.format("%06d", id);
byte[] blockIdInBytes =
(prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8);
return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
}
/**
* This is shared between upload block Runnable and CommitBlockList. The
* method captures retry logic
* @param blockId block name
* @param dataPayload block content
*/
private void writeBlockRequestInternal(String blockId,
ByteBuffer dataPayload,
boolean bufferPoolBuffer) {
IOException lastLocalException = null;
int uploadRetryAttempts = 0;
while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
try {
long startTime = System.nanoTime();
blob.uploadBlock(blockId, accessCondition, new ByteArrayInputStream(
dataPayload.array()), dataPayload.position(),
new BlobRequestOptions(), opContext);
LOG.debug("upload block finished for {} ms. block {} ",
TimeUnit.NANOSECONDS.toMillis(
System.nanoTime() - startTime), blockId);
break;
} catch(Exception ioe) {
LOG.debug("Encountered exception during uploading block for Blob {}"
+ " Exception : {}", key, ioe);
uploadRetryAttempts++;
lastLocalException = new AzureException(
"Encountered Exception while uploading block: " + ioe, ioe);
try {
Thread.sleep(
BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1));
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
if (bufferPoolBuffer) {
poolReadyByteBuffers.putBuffer(dataPayload);
}
if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
maybeSetFirstError(lastLocalException);
}
}
/**
* Set {@link #firstError} to the exception if it is not already set.
* @param exception exception to save
*/
private void maybeSetFirstError(IOException exception) {
firstError.compareAndSet(null, exception);
}
/**
* Throw the first error caught if it has not been raised already
* @throws IOException if one is caught and needs to be thrown.
*/
private void maybeThrowFirstError() throws IOException {
if (firstError.get() != null) {
firstErrorThrown = true;
throw firstError.get();
}
}
/**
* Write block list. The method captures retry logic
*/
private void writeBlockListRequestInternal() {
IOException lastLocalException = null;
int uploadRetryAttempts = 0;
while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
try {
long startTime = System.nanoTime();
blob.commitBlockList(blockEntries, accessCondition,
new BlobRequestOptions(), opContext);
LOG.debug("Upload block list took {} ms for blob {} ",
TimeUnit.NANOSECONDS.toMillis(
System.nanoTime() - startTime), key);
break;
} catch(Exception ioe) {
LOG.debug("Encountered exception during uploading block for Blob {}"
+ " Exception : {}", key, ioe);
uploadRetryAttempts++;
lastLocalException = new AzureException(
"Encountered Exception while uploading block: " + ioe, ioe);
try {
Thread.sleep(
BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1));
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
maybeSetFirstError(lastLocalException);
}
}
/**
* A ThreadFactory that creates uploader thread with
* meaningful names helpful for debugging purposes.
*/
class UploaderThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(String.format("%s-%d", THREAD_ID_PREFIX,
threadSequenceNumber.getAndIncrement()));
return t;
}
}
/**
* Upload block commands.
*/
private class UploadBlockCommand extends UploadCommand {
// the block content for upload
private final ByteBuffer payload;
// description of the block
private final BlockEntry entry;
UploadBlockCommand(String blockId, ByteBuffer payload) {
super(blobLength);
BlockEntry blockEntry = new BlockEntry(blockId);
blockEntry.setSize(payload.position());
blockEntry.setSearchMode(BlockSearchMode.LATEST);
this.payload = payload;
this.entry = blockEntry;
uncommittedBlockEntries.add(blockEntry);
}
/**
* Execute command.
*/
void execute() throws InterruptedException {
uploadingSemaphore.acquire(1);
writeBlockRequestInternal(entry.getId(), payload, true);
uploadingSemaphore.release(1);
}
void dump() {
LOG.debug("upload block {} size: {} for blob {}",
entry.getId(),
entry.getSize(),
key);
}
}
/**
* Upload blob block list commands.
*/
private class UploadBlockListCommand extends UploadCommand {
private BlockEntry lastBlock = null;
UploadBlockListCommand() {
super(blobLength);
if (!uncommittedBlockEntries.isEmpty()) {
lastBlock = uncommittedBlockEntries.getLast();
}
}
void awaitAsDependent() throws InterruptedException {
// empty. later commit block does not need to wait previous commit block
// lists.
}
void dump() {
LOG.debug("commit block list with {} blocks for blob {}",
uncommittedBlockEntries.size(), key);
}
/**
* Execute command.
*/
public void execute() throws InterruptedException, IOException {
if (committedBlobLength.get() >= getCommandBlobOffset()) {
LOG.debug("commit already applied for {}", key);
return;
}
if (lastBlock == null) {
LOG.debug("nothing to commit for {}", key);
return;
}
LOG.debug("active commands: {} for {}", activeBlockCommands.size(), key);
for (UploadCommand activeCommand : activeBlockCommands) {
if (activeCommand.getCommandBlobOffset() < getCommandBlobOffset()) {
activeCommand.dump();
activeCommand.awaitAsDependent();
} else {
break;
}
}
// stop all uploads until the block list is committed
uploadingSemaphore.acquire(MAX_NUMBER_THREADS_IN_THREAD_POOL);
BlockEntry uncommittedBlock;
do {
uncommittedBlock = uncommittedBlockEntries.poll();
blockEntries.add(uncommittedBlock);
} while (uncommittedBlock != lastBlock);
if (blockEntries.size() > activateCompactionBlockCount) {
LOG.debug("Block compaction: activated with {} blocks for {}",
blockEntries.size(), key);
// Block compaction
long startCompaction = System.nanoTime();
blockCompaction();
LOG.debug("Block compaction finished for {} ms with {} blocks for {}",
TimeUnit.NANOSECONDS.toMillis(
System.nanoTime() - startCompaction),
blockEntries.size(), key);
}
writeBlockListRequestInternal();
uploadingSemaphore.release(MAX_NUMBER_THREADS_IN_THREAD_POOL);
// remove blocks previous commands
for (Iterator<UploadCommand> it = activeBlockCommands.iterator();
it.hasNext();) {
UploadCommand activeCommand = it.next();
if (activeCommand.getCommandBlobOffset() <= getCommandBlobOffset()) {
it.remove();
} else {
break;
}
}
committedBlobLength.set(getCommandBlobOffset());
}
/**
* Internal output stream with read access to the internal buffer.
*/
private class ByteArrayOutputStreamInternal extends ByteArrayOutputStream {
ByteArrayOutputStreamInternal(int size) {
super(size);
}
byte[] getByteArray() {
return buf;
}
}
/**
* Block compaction process.
*
* Block compaction is only enabled when the number of blocks exceeds
* activateCompactionBlockCount. The algorithm searches for the longest
* segment [b..e) where (e-b) > 2 && |b| + |b+1| ... |e-1| < maxBlockSize
* such that size(b1) + size(b2) + ... + size(bn) < maximum-block-size.
* It then downloads the blocks in the sequence, concatenates the data to
* form a single block, uploads this new block, and updates the block
* list to replace the sequence of blocks with the new block.
*/
private void blockCompaction() throws IOException {
//current segment [segmentBegin, segmentEnd) and file offset/size of the
// current segment
int segmentBegin = 0, segmentEnd = 0;
long segmentOffsetBegin = 0, segmentOffsetEnd = 0;
//longest segment [maxSegmentBegin, maxSegmentEnd) and file offset/size of
// the longest segment
int maxSegmentBegin = 0, maxSegmentEnd = 0;
long maxSegmentOffsetBegin = 0, maxSegmentOffsetEnd = 0;
for (BlockEntry block : blockEntries) {
segmentEnd++;
segmentOffsetEnd += block.getSize();
if (segmentOffsetEnd - segmentOffsetBegin > maxBlockSize.get()) {
if (segmentEnd - segmentBegin > 2) {
if (maxSegmentEnd - maxSegmentBegin < segmentEnd - segmentBegin) {
maxSegmentBegin = segmentBegin;
maxSegmentEnd = segmentEnd;
maxSegmentOffsetBegin = segmentOffsetBegin;
maxSegmentOffsetEnd = segmentOffsetEnd - block.getSize();
}
}
segmentBegin = segmentEnd - 1;
segmentOffsetBegin = segmentOffsetEnd - block.getSize();
}
}
if (maxSegmentEnd - maxSegmentBegin > 1) {
LOG.debug("Block compaction: {} blocks for {}",
maxSegmentEnd - maxSegmentBegin, key);
// download synchronously all the blocks from the azure storage
ByteArrayOutputStreamInternal blockOutputStream
= new ByteArrayOutputStreamInternal(maxBlockSize.get());
try {
long length = maxSegmentOffsetEnd - maxSegmentOffsetBegin;
blob.downloadRange(maxSegmentOffsetBegin, length, blockOutputStream,
new BlobRequestOptions(), opContext);
} catch(StorageException ex) {
LOG.error(
"Storage exception encountered during block compaction phase"
+ " : {} Storage Exception : {} Error Code: {}",
key, ex, ex.getErrorCode());
throw new AzureException(
"Encountered Exception while committing append blocks " + ex, ex);
}
// upload synchronously new block to the azure storage
String blockId = generateBlockId();
ByteBuffer byteBuffer = ByteBuffer.wrap(
blockOutputStream.getByteArray());
byteBuffer.position(blockOutputStream.size());
writeBlockRequestInternal(blockId, byteBuffer, false);
// replace blocks from the longest segment with new block id
blockEntries.subList(maxSegmentBegin + 1, maxSegmentEnd - 1).clear();
BlockEntry newBlock = blockEntries.get(maxSegmentBegin);
newBlock.setId(blockId);
newBlock.setSearchMode(BlockSearchMode.LATEST);
newBlock.setSize(maxSegmentOffsetEnd - maxSegmentOffsetBegin);
}
}
}
/**
* Prepare block upload command and queue the command in thread pool executor.
*/
private synchronized void addBlockUploadCommand() throws IOException {
maybeThrowFirstError();
if (blobExist && lease.isFreed()) {
throw new AzureException(String.format(
"Attempting to upload a block on blob : %s "
+ " that does not have lease on the Blob. Failing upload", key));
}
int blockSize = outBuffer.position();
if (blockSize > 0) {
UploadCommand command = new UploadBlockCommand(generateBlockId(),
outBuffer);
activeBlockCommands.add(command);
blobLength += blockSize;
outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get());
outBuffer.clear();
ioThreadPool.execute(new WriteRequest(command));
}
}
/**
* Prepare block list commit command and queue the command in thread pool
* executor.
*/
private synchronized UploadCommand addFlushCommand() throws IOException {
maybeThrowFirstError();
if (blobExist && lease.isFreed()) {
throw new AzureException(
String.format("Attempting to upload block list on blob : %s"
+ " that does not have lease on the Blob. Failing upload", key));
}
UploadCommand command = new UploadBlockListCommand();
activeBlockCommands.add(command);
ioThreadPool.execute(new WriteRequest(command));
return command;
}
/**
* Runnable instance that uploads the block of data to azure storage.
*/
private class WriteRequest implements Runnable {
private final UploadCommand command;
WriteRequest(UploadCommand command) {
this.command = command;
}
@Override
public void run() {
try {
command.dump();
long startTime = System.nanoTime();
command.execute();
command.setCompleted();
LOG.debug("command finished for {} ms",
TimeUnit.NANOSECONDS.toMillis(
System.nanoTime() - startTime));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (Exception ex) {
LOG.debug(
"Encountered exception during execution of command for Blob :"
+ " {} Exception : {}", key, ex);
firstError.compareAndSet(null, new AzureException(ex));
}
}
}
}