blob: abd4e4664cbf755f1f0eb163a4c3e3d93706b7ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.cache.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This implementation has heavily borrowed Kafka's implementation
* which borrowed the general structure of Hadoop's implementation.
*/
public class S3OutputStream extends OutputStream {
private static final Logger log = LoggerFactory.getLogger(S3OutputStream.class);
private final AmazonS3 s3;
private final String bucket;
private final String key;
private final ProgressListener progressListener;
private final int partSize;
private boolean closed;
private ByteBuffer buffer;
private MultipartUpload multiPartUpload;
/**
* Creates a new S3 output stream for an object.
* @param bucket The bucket.
* @param key The object's key.
* @param s3 An S3 {@link AmazonS3 client}.
*/
public S3OutputStream(String bucket, String key, AmazonS3 s3) {
this.s3 = s3;
this.bucket = bucket;
this.key = key;
this.partSize = 1024;
this.closed = false;
this.buffer = ByteBuffer.allocate(this.partSize);
this.progressListener = new ConnectProgressListener();
this.multiPartUpload = null;
log.debug("Create S3OutputStream for bucket '{}' key '{}'", bucket, key);
}
@Override
public void write(int b) throws IOException {
buffer.put((byte) b);
if (!buffer.hasRemaining()) {
uploadPart();
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || off > b.length || len < 0 || (off + len) > b.length || (off + len) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
if (buffer.remaining() < len) {
int firstPart = buffer.remaining();
buffer.put(b, off, firstPart);
uploadPart();
write(b, off + firstPart, len - firstPart);
} else {
buffer.put(b, off, len);
}
}
private void uploadPart() throws IOException {
uploadPart(partSize);
buffer.clear();
}
private void uploadPart(int size) throws IOException {
if (multiPartUpload == null) {
log.debug("New multi-part upload for bucket '{}' key '{}'", bucket, key);
multiPartUpload = newMultipartUpload();
}
try {
multiPartUpload.uploadPart(new ByteArrayInputStream(buffer.array()), size);
} catch (Exception e) {
if (multiPartUpload != null) {
multiPartUpload.abort();
log.debug("Multipart upload aborted for bucket '{}' key '{}'.", bucket, key);
}
throw new IOException("Part upload failed: ", e.getCause());
}
}
public void commit() throws IOException {
if (closed) {
log.warn("Tried to commit data for bucket '{}' key '{}' on a closed stream. Ignoring.", bucket, key);
return;
}
try {
if (buffer.hasRemaining()) {
uploadPart(buffer.position());
}
multiPartUpload.complete();
log.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
} catch (Exception e) {
log.error("Multipart upload failed to complete for bucket '{}' key '{}'", bucket, key);
throw new RuntimeException("Multipart upload failed to complete.", e);
} finally {
buffer.clear();
multiPartUpload = null;
close();
}
}
@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (multiPartUpload != null) {
multiPartUpload.abort();
log.debug("Multipart upload aborted for bucket '{}' key '{}'.", bucket, key);
}
super.close();
}
private MultipartUpload newMultipartUpload() throws IOException {
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, key, new ObjectMetadata());
try {
return new MultipartUpload(s3.initiateMultipartUpload(initRequest).getUploadId());
} catch (AmazonClientException e) {
throw new IOException("Unable to initiate MultipartUpload: " + e, e);
}
}
private class MultipartUpload {
private final String uploadId;
private final List<PartETag> partETags;
public MultipartUpload(String uploadId) {
this.uploadId = uploadId;
this.partETags = new ArrayList<>();
log.debug("Initiated multi-part upload for bucket '{}' key '{}' with id '{}'", bucket, key, uploadId);
}
public void uploadPart(ByteArrayInputStream inputStream, int partSize) {
int currentPartNumber = partETags.size() + 1;
UploadPartRequest request = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withInputStream(inputStream)
.withPartNumber(currentPartNumber)
.withPartSize(partSize)
.withGeneralProgressListener(progressListener);
log.debug("Uploading part {} for id '{}'", currentPartNumber, uploadId);
partETags.add(s3.uploadPart(request).getPartETag());
}
public void complete() {
log.debug("Completing multi-part upload for key '{}', id '{}'", key, uploadId);
CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
s3.completeMultipartUpload(completeRequest);
}
public void abort() {
log.warn("Aborting multi-part upload with id '{}'", uploadId);
try {
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId));
} catch (Exception e) {
// ignoring failure on abort.
log.warn("Unable to abort multipart upload, you may need to purge uploaded parts: ", e);
}
}
}
// Dummy listener for now, just logs the event progress.
private static class ConnectProgressListener implements ProgressListener {
@Override
public void progressChanged(ProgressEvent progressEvent) {
log.debug("Progress event: " + progressEvent);
}
}
}