blob: e61a78595e88d4cbebb68885872e14b099f12cfa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.minio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import io.minio.BucketExistsArgs;
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.ObjectStat;
import io.minio.SetBucketPolicyArgs;
import io.minio.StatObjectArgs;
import io.minio.errors.InvalidBucketNameException;
import org.apache.camel.Category;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.minio.client.MinioClientFactory;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.ScheduledPollEndpoint;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ObjectHelper.isNotEmpty;
/**
* Store and retrieve objects from Minio Storage Service using Minio SDK.
*/
@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
category = {Category.CLOUD, Category.FILE})
public class MinioEndpoint extends ScheduledPollEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
private MinioClient minioClient;
@UriPath(description = "Bucket name")
@Metadata(required = true)
private String bucketName;
@UriParam
private MinioConfiguration configuration;
@UriParam(label = "consumer", defaultValue = "10")
private int maxMessagesPerPoll = 10;
@UriParam(label = "consumer", defaultValue = "60")
private int maxConnections = 50 + maxMessagesPerPoll;
public MinioEndpoint(String uri, Component component, MinioConfiguration configuration) {
super(uri, component);
this.configuration = configuration;
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
MinioConsumer minioConsumer = new MinioConsumer(this, processor);
configureConsumer(minioConsumer);
minioConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
return minioConsumer;
}
@Override
public Producer createProducer() {
return new MinioProducer(this);
}
@Override
public void doStart() throws Exception {
super.doStart();
minioClient = isNotEmpty(getConfiguration().getMinioClient())
? getConfiguration().getMinioClient()
: MinioClientFactory.getClient(getConfiguration()).getMinioClient();
String objectName = getConfiguration().getObjectName();
if (isNotEmpty(objectName)) {
LOG.trace("Object name {} requested, so skipping bucket check...", objectName);
return;
}
String bucketName = getConfiguration().getBucketName();
LOG.trace("Querying whether bucket {} already exists...", bucketName);
if (bucketExists(bucketName)) {
LOG.trace("Bucket {} already exists", bucketName);
} else {
if (getConfiguration().isAutoCreateBucket()) {
LOG.trace("AutoCreateBucket set to true, Creating bucket {}...", bucketName);
makeBucket(bucketName);
LOG.trace("Bucket created");
} else {
throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", bucketName);
}
}
if (isNotEmpty(getConfiguration().getPolicy())) {
LOG.trace("Updating bucket {} with policy {}", bucketName, configuration.getPolicy());
setBucketPolicy(bucketName);
LOG.trace("Bucket policy updated");
}
}
@Override
public void doStop() throws Exception {
super.doStop();
}
public Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
return createExchange(getExchangePattern(), minioObject, objectName);
}
public Exchange createExchange(ExchangePattern pattern,
InputStream minioObject, String objectName) throws Exception {
LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
Exchange exchange = super.createExchange(pattern);
Message message = exchange.getIn();
LOG.trace("Got object!");
getObjectStat(objectName, message);
if (getConfiguration().isIncludeBody()) {
try {
message.setBody(readInputStream(minioObject));
if (getConfiguration().isAutoCloseBody()) {
exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
@Override
public void onDone(Exchange exchange) {
IOHelper.close(minioObject);
}
});
}
} catch (IOException e) {
// TODO Auto-generated catch block
LOG.warn("Error setting message body");
}
} else {
message.setBody(null);
IOHelper.close(minioObject);
}
return exchange;
}
public MinioConfiguration getConfiguration() {
return configuration;
}
public void setConfiguration(MinioConfiguration configuration) {
this.configuration = configuration;
}
public MinioClient getMinioClient() {
return minioClient;
}
public void setMinioClient(MinioClient minioClient) {
this.minioClient = minioClient;
}
public int getMaxMessagesPerPoll() {
return maxMessagesPerPoll;
}
/**
* Gets the maximum number of messages as a limit to poll at each polling.
* <p/>
* Gets the maximum number of messages as a limit to poll at each polling.
* The default value is 10. Use 0 or a negative number to set it as
* unlimited.
*/
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
public int getMaxConnections() {
return maxConnections;
}
/**
* Set the maxConnections parameter in the minio client configuration
*/
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
private String readInputStream(InputStream minioObject) throws IOException {
StringBuilder textBuilder = new StringBuilder();
try (Reader reader = new BufferedReader(new InputStreamReader(minioObject, StandardCharsets.UTF_8))) {
int c;
while ((c = reader.read()) != -1) {
textBuilder.append((char) c);
}
}
return textBuilder.toString();
}
private boolean bucketExists(String bucketName) throws Exception {
return minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
}
private void makeBucket(String bucketName) throws Exception {
MakeBucketArgs.Builder makeBucketRequest = MakeBucketArgs.builder().bucket(bucketName).objectLock(getConfiguration().isObjectLock());
if (isNotEmpty(getConfiguration().getRegion())) {
makeBucketRequest.region(getConfiguration().getRegion());
}
minioClient.makeBucket(makeBucketRequest.build());
}
private void setBucketPolicy(String bucketName) throws Exception {
LOG.trace("Updating bucket {} with policy...", bucketName);
minioClient.setBucketPolicy(
SetBucketPolicyArgs.builder().bucket(bucketName).config(getConfiguration().getPolicy()).build());
LOG.trace("Bucket policy updated");
}
private void getObjectStat(String objectName, Message message) throws Exception {
String bucketName = getConfiguration().getBucketName();
StatObjectArgs.Builder statObjectRequest = StatObjectArgs.builder().bucket(bucketName).object(objectName);
MinioChecks.checkServerSideEncryptionCustomerKeyConfig(getConfiguration(), statObjectRequest::ssec);
MinioChecks.checkOffsetConfig(getConfiguration(), statObjectRequest::offset);
MinioChecks.checkLengthConfig(getConfiguration(), statObjectRequest::length);
MinioChecks.checkVersionIdConfig(getConfiguration(), statObjectRequest::versionId);
MinioChecks.checkMatchETagConfig(getConfiguration(), statObjectRequest::matchETag);
MinioChecks.checkNotMatchETagConfig(getConfiguration(), statObjectRequest::notMatchETag);
MinioChecks.checkModifiedSinceConfig(getConfiguration(), statObjectRequest::modifiedSince);
MinioChecks.checkUnModifiedSinceConfig(getConfiguration(), statObjectRequest::unmodifiedSince);
ObjectStat stat = minioClient.statObject(statObjectRequest.build());
// set all stat as message headers
message.setHeader(MinioConstants.OBJECT_NAME, stat.name());
message.setHeader(MinioConstants.BUCKET_NAME, stat.bucketName());
message.setHeader(MinioConstants.E_TAG, stat.etag());
message.setHeader(MinioConstants.LAST_MODIFIED, stat.httpHeaders().get("last-modified"));
message.setHeader(MinioConstants.VERSION_ID, stat.httpHeaders().get("x-amz-version-id"));
message.setHeader(MinioConstants.CONTENT_TYPE, stat.contentType());
message.setHeader(MinioConstants.CONTENT_LENGTH, stat.length());
message.setHeader(MinioConstants.CONTENT_ENCODING, stat.httpHeaders().get("content-encoding"));
message.setHeader(MinioConstants.CONTENT_DISPOSITION, stat.httpHeaders().get("content-disposition"));
message.setHeader(MinioConstants.CACHE_CONTROL, stat.httpHeaders().get("cache-control"));
message.setHeader(MinioConstants.SERVER_SIDE_ENCRYPTION, stat.httpHeaders().get("x-amz-server-side-encryption"));
message.setHeader(MinioConstants.EXPIRATION_TIME, stat.httpHeaders().get("x-amz-expiration"));
message.setHeader(MinioConstants.REPLICATION_STATUS, stat.httpHeaders().get("x-amz-replication-status"));
message.setHeader(MinioConstants.STORAGE_CLASS, stat.httpHeaders().get("x-amz-storage-class"));
}
}