blob: be2ab6715c18c5f25dd3a59015bc519892149615 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
/**
* The BlobFsOutputStream for Rest AbfsClient.
*/
public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities {
private final AbfsClient client;
private final String path;
private long position;
private boolean closed;
private boolean supportFlush;
private boolean disableOutputStreamFlush;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private final int bufferSize;
private byte[] buffer;
private int bufferIndex;
private final int maxConcurrentRequestCount;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
/**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
private final ElasticByteBufferPool byteBufferPool
= new ElasticByteBufferPool();
public AbfsOutputStream(
final AbfsClient client,
final String path,
final long position,
final int bufferSize,
final boolean supportFlush,
final boolean disableOutputStreamFlush) {
this.client = client;
this.path = path;
this.position = position;
this.closed = false;
this.supportFlush = supportFlush;
this.disableOutputStreamFlush = disableOutputStreamFlush;
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
}
/**
* Query the stream for a specific capability.
*
* @param capability string to query the stream support for.
* @return true for hsync and hflush.
*/
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.HSYNC:
case StreamCapabilities.HFLUSH:
return supportFlush;
default:
return false;
}
}
/**
* Writes the specified byte to this output stream. The general contract for
* write is that one byte is written to the output stream. The byte to be
* written is the eight low-order bits of the argument b. The 24 high-order
* bits of b are ignored.
*
* @param byteVal the byteValue to write.
* @throws IOException if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public void write(final int byteVal) throws IOException {
write(new byte[]{(byte) (byteVal & 0xFF)});
}
/**
* Writes length bytes from the specified byte array starting at off to
* this output stream.
*
* @param data the byte array to write.
* @param off the start off in the data.
* @param length the number of bytes to write.
* @throws IOException if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public synchronized void write(final byte[] data, final int off, final int length)
throws IOException {
maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data");
if (off < 0 || length < 0 || length > data.length - off) {
throw new IndexOutOfBoundsException();
}
int currentOffset = off;
int writableBytes = bufferSize - bufferIndex;
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes;
writeCurrentBufferToService();
currentOffset += writableBytes;
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
} else {
System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
bufferIndex += numberOfBytesToWrite;
numberOfBytesToWrite = 0;
}
writableBytes = bufferSize - bufferIndex;
}
}
/**
* Throw the last error recorded if not null.
* After the stream is closed, this is always set to
* an exception, so acts as a guard against method invocation once
* closed.
* @throws IOException if lastError is set
*/
private void maybeThrowLastError() throws IOException {
if (lastError != null) {
throw lastError;
}
}
/**
* Flushes this output stream and forces any buffered output bytes to be
* written out. If any data remains in the payload it is committed to the
* service. Data is queued for writing and forced out to the service
* before the call returns.
*/
@Override
public void flush() throws IOException {
if (!disableOutputStreamFlush) {
flushInternalAsync();
}
}
/** Similar to posix fsync, flush out the data in client's user buffer
* all the way to the disk device (but the disk may have it in its cache).
* @throws IOException if error occurs
*/
@Override
public void hsync() throws IOException {
if (supportFlush) {
flushInternal(false);
}
}
/** Flush out the data in client's user buffer. After the return of
* this call, new readers will see the data.
* @throws IOException if any error occurs
*/
@Override
public void hflush() throws IOException {
if (supportFlush) {
flushInternal(false);
}
}
/**
* Force all data in the output stream to be written to Azure storage.
* Wait to return until this is complete. Close the access to the stream and
* shutdown the upload thread pool.
* If the blob was created, its lease will be released.
* Any error encountered caught in threads and stored will be rethrown here
* after cleanup.
*/
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
try {
flushInternal(true);
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
buffer = null;
bufferIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
threadExecutor.shutdownNow();
}
}
}
@Override
public void sync() throws IOException {
throw new UnsupportedOperationException();
}
private synchronized void flushInternal(boolean isClose) throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToService(isClose);
}
private synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToServiceAsync();
}
private synchronized void writeCurrentBufferToService() throws IOException {
if (bufferIndex == 0) {
return;
}
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0;
final long offset = position;
position += bytesLength;
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
client.append(path, offset, bytes, 0,
bytesLength);
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
return null;
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
}
private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
for (WriteOperation writeOperation : writeOperations) {
try {
writeOperation.task.get();
} catch (Exception ex) {
if (ex.getCause() instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
}
if (ex.getCause() instanceof AzureBlobFileSystemException) {
ex = (AzureBlobFileSystemException) ex.getCause();
}
lastError = new IOException(ex);
throw lastError;
}
}
flushWrittenBytesToServiceInternal(position, false, isClose);
}
private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
shrinkWriteOperationQueue();
if (this.lastTotalAppendOffset > this.lastFlushOffset) {
this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
false/*Async flush on close not permitted*/);
}
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset,
final boolean retainUncommitedData, final boolean isClose) throws IOException {
try {
client.flush(path, offset, retainUncommitedData, isClose);
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
}
throw new IOException(ex);
}
this.lastFlushOffset = offset;
}
/**
* Try to remove the completed write operations from the beginning of write
* operation FIFO queue.
*/
private synchronized void shrinkWriteOperationQueue() throws IOException {
try {
while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
writeOperations.peek().task.get();
lastTotalAppendOffset += writeOperations.peek().length;
writeOperations.remove();
}
} catch (Exception e) {
if (e.getCause() instanceof AzureBlobFileSystemException) {
lastError = (AzureBlobFileSystemException) e.getCause();
} else {
lastError = new IOException(e);
}
throw lastError;
}
}
private void waitForTaskToComplete() throws IOException {
boolean completed;
for (completed = false; completionService.poll() != null; completed = true) {
// keep polling until there is no data
}
if (!completed) {
try {
completionService.take();
} catch (InterruptedException e) {
lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
throw lastError;
}
}
}
private static class WriteOperation {
private final Future<Void> task;
private final long startOffset;
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
Preconditions.checkNotNull(task, "task");
Preconditions.checkArgument(startOffset >= 0, "startOffset");
Preconditions.checkArgument(length >= 0, "length");
this.task = task;
this.startOffset = startOffset;
this.length = length;
}
}
@VisibleForTesting
public synchronized void waitForPendingUploads() throws IOException {
waitForTaskToComplete();
}
}