blob: 37896ce9cd897fcbaa19d69f60b2d9988d5041c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.minio;
import java.io.IOException;
import java.io.InputStream;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import io.minio.BucketExistsArgs;
import io.minio.CopyObjectArgs;
import io.minio.CopySource;
import io.minio.GetObjectArgs;
import io.minio.ListObjectsArgs;
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.RemoveObjectArgs;
import io.minio.Result;
import io.minio.errors.InvalidBucketNameException;
import io.minio.errors.MinioException;
import io.minio.messages.Item;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ObjectHelper.isEmpty;
import static org.apache.camel.util.ObjectHelper.isNotEmpty;
import static org.apache.camel.util.ObjectHelper.cast;
/**
* A Consumer of messages from the Minio Storage Service.
*/
public class MinioConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
int totalCounter;
private String continuationToken;
private transient String minioConsumerToString;
public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
super(endpoint, processor);
}
@Override
protected void doStart() throws Exception {
super.doStart();
if (getConfiguration().isMoveAfterRead()) {
String destinationBucketName = getConfiguration().getDestinationBucketName();
if (isNotEmpty(destinationBucketName)) {
if (bucketExists(destinationBucketName)) {
LOG.trace("Bucket {} already exists", destinationBucketName);
} else {
LOG.trace("Destination Bucket {} doesn't exist yet", destinationBucketName);
if (getConfiguration().isAutoCreateBucket()) {
// creates the new bucket because it doesn't exist yet
LOG.trace("Creating Destination bucket {}...", destinationBucketName);
makeBucket(destinationBucketName);
LOG.trace("Destination Bucket created");
} else {
throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", destinationBucketName);
}
}
} else {
LOG.warn("invalid destinationBucketName found: {}", destinationBucketName);
}
}
}
private boolean bucketExists(String bucketName) throws Exception {
return getMinioClient().bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
}
private void makeBucket(String bucketName) throws Exception {
MakeBucketArgs.Builder makeBucketRequest = MakeBucketArgs.builder().bucket(bucketName).objectLock(getConfiguration().isObjectLock());
if (isNotEmpty(getConfiguration().getRegion())) {
makeBucketRequest.region(getConfiguration().getRegion());
}
getMinioClient().makeBucket(makeBucketRequest.build());
}
@Override
protected int poll() throws Exception {
// must reset for each poll
shutdownRunningTask = null;
pendingExchanges = 0;
String bucketName = getConfiguration().getBucketName();
String objectName = getConfiguration().getObjectName();
MinioClient minioClient = getMinioClient();
Queue<Exchange> exchanges;
if (isNotEmpty(objectName)) {
LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
InputStream minioObject = getObject(bucketName, minioClient, objectName);
exchanges = createExchanges(minioObject, objectName);
return processBatch(CastUtils.cast(exchanges));
} else {
LOG.trace("Queueing objects in bucket {}...", bucketName);
ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
.bucket(bucketName)
.includeUserMetadata(getConfiguration().isIncludeUserMetadata())
.includeVersions(getConfiguration().isIncludeVersions())
.recursive(getConfiguration().isRecursive())
.useApiVersion1(getConfiguration().isUseVersion1());
if (isNotEmpty(getConfiguration().getDelimiter())) {
listObjectRequest.delimiter(getConfiguration().getDelimiter());
}
if (maxMessagesPerPoll > 0) {
listObjectRequest.maxKeys(maxMessagesPerPoll);
}
if (isNotEmpty(getConfiguration().getPrefix())) {
listObjectRequest.prefix(getConfiguration().getPrefix());
}
if (isNotEmpty(getConfiguration().getStartAfter())) {
listObjectRequest.startAfter(getConfiguration().getStartAfter());
continuationToken = null;
}
// if there was a marker from previous poll then use that to
// continue from where we left last time
if (isNotEmpty(continuationToken)) {
LOG.trace("Resuming from marker: {}", continuationToken);
listObjectRequest.startAfter(continuationToken);
}
Iterator<Result<Item>> listObjects = getMinioClient().listObjects(listObjectRequest.build()).iterator();
if (listObjects.hasNext()) {
exchanges = createExchanges(listObjects);
if (LOG.isTraceEnabled()) {
LOG.trace("Found {} objects in bucket {}...", totalCounter, bucketName);
}
return processBatch(CastUtils.cast(exchanges));
} else {
// no more data so clear marker
continuationToken = null;
return 0;
}
}
}
protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
Queue<Exchange> answer = new LinkedList<>();
Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
answer.add(exchange);
IOHelper.close(objectStream);
return answer;
}
protected Queue<Exchange> createExchanges(Iterator<Result<Item>> minioObjectSummaries) throws Exception {
int messageCounter = 0;
String bucketName = getConfiguration().getBucketName();
Collection<InputStream> minioObjects = new ArrayList<>();
Queue<Exchange> answer = new LinkedList<>();
try {
if (getConfiguration().isIncludeFolders()) {
do {
messageCounter++;
Item minioObjectSummary = minioObjectSummaries.next().get();
InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
minioObjects.add(minioObject);
Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
answer.add(exchange);
continuationToken = minioObjectSummary.objectName();
} while (minioObjectSummaries.hasNext());
} else {
do {
messageCounter++;
Item minioObjectSummary = minioObjectSummaries.next().get();
// ignore if directory
if (!minioObjectSummary.isDir()) {
InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
minioObjects.add(minioObject);
Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
answer.add(exchange);
continuationToken = minioObjectSummary.objectName();
}
} while (minioObjectSummaries.hasNext());
}
if (LOG.isTraceEnabled()) {
LOG.trace("Received {} messages in this poll", messageCounter);
totalCounter += messageCounter;
}
} catch (Throwable e) {
LOG.warn("Error getting MinioObject due: {}", e.getMessage());
throw e;
} finally {
// ensure all previous gathered minio objects are closed
// if there was an exception creating the exchanges in this batch
minioObjects.forEach(IOHelper::close);
}
return answer;
}
private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
MinioChecks.checkServerSideEncryptionCustomerKeyConfig(getConfiguration(), getObjectRequest::ssec);
MinioChecks.checkOffsetConfig(getConfiguration(), getObjectRequest::offset);
MinioChecks.checkLengthConfig(getConfiguration(), getObjectRequest::length);
MinioChecks.checkVersionIdConfig(getConfiguration(), getObjectRequest::versionId);
MinioChecks.checkMatchETagConfig(getConfiguration(), getObjectRequest::matchETag);
MinioChecks.checkNotMatchETagConfig(getConfiguration(), getObjectRequest::notMatchETag);
MinioChecks.checkModifiedSinceConfig(getConfiguration(), getObjectRequest::modifiedSince);
MinioChecks.checkUnModifiedSinceConfig(getConfiguration(), getObjectRequest::unmodifiedSince);
return minioClient.getObject(getObjectRequest.build());
}
@Override
public int processBatch(Queue<Object> exchanges) {
int total = exchanges.size();
for (int index = 0; index < total && isBatchAllowed(); index++) {
// only loop if we are started (allowed to run)
final Exchange exchange = cast(Exchange.class, exchanges.poll());
// add current index and total as properties
exchange.setProperty(Exchange.BATCH_INDEX, index);
exchange.setProperty(Exchange.BATCH_SIZE, total);
exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
// update pending number of exchanges
pendingExchanges = total - index - 1;
// add on completion to handle after work when the exchange is done
exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
public void onComplete(Exchange exchange) {
processCommit(exchange);
}
public void onFailure(Exchange exchange) {
processRollback(exchange);
}
@Override
public String toString() {
return "MinioConsumerOnCompletion";
}
});
LOG.trace("Processing exchange ...");
getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
}
return total;
}
/**
* Strategy to delete the message after being processed.
*
* @param exchange the exchange
*/
protected void processCommit(Exchange exchange) {
try {
String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
if (getConfiguration().isMoveAfterRead()) {
copyObject(srcBucketName, srcObjectName);
LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
}
LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
removeObject(srcBucketName, srcObjectName);
LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
}
} catch (MinioException | NoSuchAlgorithmException | InvalidKeyException | IOException e) {
getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
exchange, e);
}
}
private void removeObject(String srcBucketName, String srcObjectName) throws MinioException, IOException, InvalidKeyException, NoSuchAlgorithmException {
RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
.bucket(srcBucketName)
.object(srcObjectName)
.bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
if (isNotEmpty(getConfiguration().getVersionId())) {
removeObjectRequest.versionId(getConfiguration().getVersionId());
}
getMinioClient().removeObject(removeObjectRequest.build());
}
private void copyObject(String srcBucketName, String srcObjectName) throws MinioException, IOException, InvalidKeyException, NoSuchAlgorithmException {
String destinationBucketName = getConfiguration().getDestinationBucketName();
if (isEmpty(destinationBucketName)) {
throw new IllegalArgumentException("Destination Bucket name must be specified to copy operation");
}
// set destination object name as source object name, if not specified
String destinationObjectName = (isNotEmpty(getConfiguration().getDestinationObjectName()))
? getConfiguration().getDestinationObjectName()
: srcObjectName;
LOG.trace("Copying object from bucket {} with objectName {} to bucket {}...",
srcBucketName, srcObjectName, destinationBucketName);
CopySource.Builder copySourceBuilder = CopySource.builder().bucket(srcBucketName).object(srcObjectName);
MinioChecks.checkServerSideEncryptionCustomerKeyConfig(getConfiguration(), copySourceBuilder::ssec);
MinioChecks.checkOffsetConfig(getConfiguration(), copySourceBuilder::offset);
MinioChecks.checkLengthConfig(getConfiguration(), copySourceBuilder::length);
MinioChecks.checkVersionIdConfig(getConfiguration(), copySourceBuilder::versionId);
MinioChecks.checkMatchETagConfig(getConfiguration(), copySourceBuilder::matchETag);
MinioChecks.checkNotMatchETagConfig(getConfiguration(), copySourceBuilder::notMatchETag);
MinioChecks.checkModifiedSinceConfig(getConfiguration(), copySourceBuilder::modifiedSince);
MinioChecks.checkUnModifiedSinceConfig(getConfiguration(), copySourceBuilder::unmodifiedSince);
CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
.source(copySourceBuilder.build())
.bucket(getConfiguration().getDestinationBucketName())
.object(destinationObjectName);
MinioChecks.checkServerSideEncryptionConfig(getConfiguration(), copyObjectRequest::sse);
getMinioClient().copyObject(copyObjectRequest.build());
}
/**
* Strategy when processing the exchange failed.
*
* @param exchange the exchange
*/
protected void processRollback(Exchange exchange) {
Exception cause = exchange.getException();
if (isNotEmpty(cause)) {
LOG.warn("Exchange failed, so rolling back message status: {}", exchange, cause);
} else {
LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
}
}
protected MinioConfiguration getConfiguration() {
return getEndpoint().getConfiguration();
}
protected MinioClient getMinioClient() {
return getEndpoint().getMinioClient();
}
@Override
public MinioEndpoint getEndpoint() {
return (MinioEndpoint) super.getEndpoint();
}
@Override
public String toString() {
if (isEmpty(minioConsumerToString)) {
minioConsumerToString = "MinioConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
}
return minioConsumerToString;
}
}