| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.aws.sqs; |
| |
| import java.util.HashMap; |
| import java.util.Map.Entry; |
| |
| import com.amazonaws.ClientConfiguration; |
| import com.amazonaws.Protocol; |
| import com.amazonaws.auth.AWSCredentials; |
| import com.amazonaws.auth.AWSCredentialsProvider; |
| import com.amazonaws.auth.AWSStaticCredentialsProvider; |
| import com.amazonaws.auth.BasicAWSCredentials; |
| import com.amazonaws.client.builder.AwsClientBuilder; |
| import com.amazonaws.regions.Regions; |
| import com.amazonaws.services.sqs.AmazonSQS; |
| import com.amazonaws.services.sqs.AmazonSQSClientBuilder; |
| import com.amazonaws.services.sqs.model.CreateQueueRequest; |
| import com.amazonaws.services.sqs.model.CreateQueueResult; |
| import com.amazonaws.services.sqs.model.GetQueueUrlRequest; |
| import com.amazonaws.services.sqs.model.GetQueueUrlResult; |
| import com.amazonaws.services.sqs.model.ListQueuesResult; |
| import com.amazonaws.services.sqs.model.MessageAttributeValue; |
| import com.amazonaws.services.sqs.model.QueueAttributeName; |
| import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; |
| import org.apache.camel.Consumer; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.ExchangePattern; |
| import org.apache.camel.Message; |
| import org.apache.camel.Processor; |
| import org.apache.camel.Producer; |
| import org.apache.camel.spi.HeaderFilterStrategy; |
| import org.apache.camel.spi.HeaderFilterStrategyAware; |
| import org.apache.camel.spi.Metadata; |
| import org.apache.camel.spi.UriEndpoint; |
| import org.apache.camel.spi.UriParam; |
| import org.apache.camel.spi.UriPath; |
| import org.apache.camel.support.DefaultScheduledPollConsumerScheduler; |
| import org.apache.camel.support.ScheduledPollEndpoint; |
| import org.apache.camel.util.FileUtil; |
| import org.apache.camel.util.ObjectHelper; |
| |
| /** |
| * The aws-sqs component is used for sending and receiving messages to Amazon's |
| * SQS service. |
| */ |
| @UriEndpoint(firstVersion = "2.6.0", scheme = "aws-sqs", title = "AWS Simple Queue Service", syntax = "aws-sqs:queueNameOrArn", label = "cloud,messaging") |
| public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterStrategyAware { |
| |
| private AmazonSQS client; |
| private String queueUrl; |
| |
| @UriPath(description = "Queue name or ARN") |
| @Metadata(required = true) |
| private String queueNameOrArn; // to support component docs |
| @UriParam |
| private SqsConfiguration configuration; |
| @UriParam(label = "consumer") |
| private int maxMessagesPerPoll; |
| @UriParam |
| private HeaderFilterStrategy headerFilterStrategy; |
| |
| public SqsEndpoint(String uri, SqsComponent component, SqsConfiguration configuration) { |
| super(uri, component); |
| this.configuration = configuration; |
| } |
| |
| @Override |
| public HeaderFilterStrategy getHeaderFilterStrategy() { |
| return headerFilterStrategy; |
| } |
| |
| /** |
| * To use a custom HeaderFilterStrategy to map headers to/from Camel. |
| */ |
| @Override |
| public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { |
| this.headerFilterStrategy = strategy; |
| } |
| |
| @Override |
| public Producer createProducer() throws Exception { |
| return new SqsProducer(this); |
| } |
| |
| @Override |
| public Consumer createConsumer(Processor processor) throws Exception { |
| SqsConsumer sqsConsumer = new SqsConsumer(this, processor); |
| configureConsumer(sqsConsumer); |
| sqsConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll); |
| DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(); |
| scheduler.setConcurrentTasks(configuration.getConcurrentConsumers()); |
| sqsConsumer.setScheduler(scheduler); |
| return sqsConsumer; |
| } |
| |
| /* |
| If using a different AWS host, do not assume specific parts of the AWS host |
| and, instead, just return whatever is provided as the host. |
| */ |
| private String getFullyQualifiedAWSHost() { |
| String host = configuration.getAmazonAWSHost(); |
| host = FileUtil.stripTrailingSeparator(host); |
| |
| if (host.equals("amazonaws.com")) { |
| return "sqs." + Regions.valueOf(configuration.getRegion()).getName() + "." + host; |
| } |
| |
| return host; |
| } |
| |
| @Override |
| protected void doInit() throws Exception { |
| super.doInit(); |
| client = getConfiguration().getAmazonSQSClient() != null ? getConfiguration().getAmazonSQSClient() : getClient(); |
| |
| // check the setting the headerFilterStrategy |
| if (headerFilterStrategy == null) { |
| headerFilterStrategy = new SqsHeaderFilterStrategy(); |
| } |
| |
| if (configuration.getQueueUrl() != null) { |
| queueUrl = configuration.getQueueUrl(); |
| } else { |
| // If both region and Account ID is provided the queue URL can be |
| // built manually. |
| // This allows accessing queues where you don't have permission to |
| // list queues or query queues |
| if (configuration.getRegion() != null && configuration.getQueueOwnerAWSAccountId() != null) { |
| String protocol = configuration.getProtocol(); |
| |
| queueUrl = protocol + "://" + getFullyQualifiedAWSHost() + "/" + configuration.getQueueOwnerAWSAccountId() + "/" |
| + configuration.getQueueName(); |
| } else if (configuration.getQueueOwnerAWSAccountId() != null) { |
| GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(); |
| getQueueUrlRequest.setQueueName(configuration.getQueueName()); |
| getQueueUrlRequest.setQueueOwnerAWSAccountId(configuration.getQueueOwnerAWSAccountId()); |
| GetQueueUrlResult getQueueUrlResult = client.getQueueUrl(getQueueUrlRequest); |
| queueUrl = getQueueUrlResult.getQueueUrl(); |
| } else { |
| // check whether the queue already exists |
| ListQueuesResult listQueuesResult = client.listQueues(); |
| for (String url : listQueuesResult.getQueueUrls()) { |
| if (url.endsWith("/" + configuration.getQueueName())) { |
| queueUrl = url; |
| log.trace("Queue available at '{}'.", queueUrl); |
| break; |
| } |
| } |
| } |
| } |
| |
| if (queueUrl == null && configuration.isAutoCreateQueue()) { |
| createQueue(client); |
| } else { |
| log.debug("Using Amazon SQS queue url: {}", queueUrl); |
| updateQueueAttributes(client); |
| } |
| } |
| |
| protected void createQueue(AmazonSQS client) { |
| log.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName()); |
| |
| // creates a new queue, or returns the URL of an existing one |
| CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName()); |
| if (getConfiguration().isFifoQueue()) { |
| request.getAttributes().put(QueueAttributeName.FifoQueue.name(), String.valueOf(true)); |
| boolean useContentBasedDeduplication = getConfiguration().getMessageDeduplicationIdStrategy() instanceof NullMessageDeduplicationIdStrategy; |
| request.getAttributes().put(QueueAttributeName.ContentBasedDeduplication.name(), String.valueOf(useContentBasedDeduplication)); |
| } |
| if (getConfiguration().getDefaultVisibilityTimeout() != null) { |
| request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); |
| } |
| if (getConfiguration().getMaximumMessageSize() != null) { |
| request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize())); |
| } |
| if (getConfiguration().getMessageRetentionPeriod() != null) { |
| request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod())); |
| } |
| if (getConfiguration().getPolicy() != null) { |
| request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy())); |
| } |
| if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) { |
| request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds())); |
| } |
| if (getConfiguration().getDelaySeconds() != null && getConfiguration().isDelayQueue()) { |
| request.getAttributes().put(QueueAttributeName.DelaySeconds.name(), String.valueOf(getConfiguration().getDelaySeconds())); |
| } |
| if (getConfiguration().getRedrivePolicy() != null) { |
| request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy()); |
| } |
| if (getConfiguration().isServerSideEncryptionEnabled()) { |
| if (getConfiguration().getKmsMasterKeyId() != null) { |
| request.getAttributes().put(QueueAttributeName.KmsMasterKeyId.name(), getConfiguration().getKmsMasterKeyId()); |
| } |
| if (getConfiguration().getKmsDataKeyReusePeriodSeconds() != null) { |
| request.getAttributes().put(QueueAttributeName.KmsDataKeyReusePeriodSeconds.name(), String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds())); |
| } |
| } |
| log.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); |
| |
| CreateQueueResult queueResult = client.createQueue(request); |
| queueUrl = queueResult.getQueueUrl(); |
| |
| log.trace("Queue created and available at: {}", queueUrl); |
| } |
| |
| private void updateQueueAttributes(AmazonSQS client) { |
| SetQueueAttributesRequest request = new SetQueueAttributesRequest(); |
| request.setQueueUrl(queueUrl); |
| if (getConfiguration().getDefaultVisibilityTimeout() != null) { |
| request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); |
| } |
| if (getConfiguration().getMaximumMessageSize() != null) { |
| request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize())); |
| } |
| if (getConfiguration().getMessageRetentionPeriod() != null) { |
| request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod())); |
| } |
| if (getConfiguration().getPolicy() != null) { |
| request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy())); |
| } |
| if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) { |
| request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds())); |
| } |
| if (getConfiguration().getDelaySeconds() != null && getConfiguration().isDelayQueue()) { |
| request.getAttributes().put(QueueAttributeName.DelaySeconds.name(), String.valueOf(getConfiguration().getDelaySeconds())); |
| } |
| if (getConfiguration().getRedrivePolicy() != null) { |
| request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy()); |
| } |
| if (getConfiguration().isServerSideEncryptionEnabled()) { |
| if (getConfiguration().getKmsMasterKeyId() != null) { |
| request.getAttributes().put(QueueAttributeName.KmsMasterKeyId.name(), getConfiguration().getKmsMasterKeyId()); |
| } |
| if (getConfiguration().getKmsDataKeyReusePeriodSeconds() != null) { |
| request.getAttributes().put(QueueAttributeName.KmsDataKeyReusePeriodSeconds.name(), String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds())); |
| } |
| } |
| if (!request.getAttributes().isEmpty()) { |
| log.trace("Updating queue '{}' with the provided queue attributes...", configuration.getQueueName()); |
| client.setQueueAttributes(request); |
| log.trace("Queue '{}' updated and available at {}'", configuration.getQueueName(), queueUrl); |
| } |
| } |
| |
| @Override |
| public void doStop() throws Exception { |
| if (ObjectHelper.isEmpty(configuration.getAmazonSQSClient())) { |
| if (client != null) { |
| client.shutdown(); |
| } |
| } |
| super.doStop(); |
| } |
| |
| public Exchange createExchange(com.amazonaws.services.sqs.model.Message msg) { |
| return createExchange(getExchangePattern(), msg); |
| } |
| |
| private Exchange createExchange(ExchangePattern pattern, com.amazonaws.services.sqs.model.Message msg) { |
| Exchange exchange = super.createExchange(pattern); |
| Message message = exchange.getIn(); |
| message.setBody(msg.getBody()); |
| message.setHeaders(new HashMap<>(msg.getAttributes())); |
| message.setHeader(SqsConstants.MESSAGE_ID, msg.getMessageId()); |
| message.setHeader(SqsConstants.MD5_OF_BODY, msg.getMD5OfBody()); |
| message.setHeader(SqsConstants.RECEIPT_HANDLE, msg.getReceiptHandle()); |
| message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes()); |
| message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes()); |
| |
| // Need to apply the SqsHeaderFilterStrategy this time |
| HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy(); |
| // add all sqs message attributes as camel message headers so that |
| // knowledge of |
| // the Sqs class MessageAttributeValue will not leak to the client |
| for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) { |
| String header = entry.getKey(); |
| Object value = translateValue(entry.getValue()); |
| if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange)) { |
| message.setHeader(header, value); |
| } |
| } |
| return exchange; |
| } |
| |
| public SqsConfiguration getConfiguration() { |
| return configuration; |
| } |
| |
| public void setConfiguration(SqsConfiguration configuration) { |
| this.configuration = configuration; |
| } |
| |
| public AmazonSQS getClient() { |
| if (client == null) { |
| client = createClient(); |
| } |
| return client; |
| } |
| |
| public void setClient(AmazonSQS client) { |
| this.client = client; |
| } |
| |
| /** |
| * Provide the possibility to override this method for an mock |
| * implementation |
| * |
| * @return AmazonSQSClient |
| */ |
| AmazonSQS createClient() { |
| AmazonSQS client; |
| AmazonSQSClientBuilder clientBuilder; |
| ClientConfiguration clientConfiguration = null; |
| boolean isClientConfigFound = false; |
| if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { |
| clientConfiguration = new ClientConfiguration(); |
| clientConfiguration.setProxyProtocol(configuration.getProxyProtocol()); |
| clientConfiguration.setProxyHost(configuration.getProxyHost()); |
| clientConfiguration.setProxyPort(configuration.getProxyPort()); |
| isClientConfigFound = true; |
| } |
| |
| final String protocol = configuration.getProtocol(); |
| |
| if (protocol.equals("http")) { |
| log.trace("Configuring AWS-SQS for HTTP protocol"); |
| if (isClientConfigFound) { |
| clientConfiguration = clientConfiguration.withProtocol(Protocol.HTTP); |
| } else { |
| clientConfiguration = new ClientConfiguration().withProtocol(Protocol.HTTP); |
| isClientConfigFound = true; |
| } |
| } |
| |
| if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) { |
| AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey()); |
| AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); |
| if (isClientConfigFound) { |
| clientBuilder = AmazonSQSClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider); |
| } else { |
| clientBuilder = AmazonSQSClientBuilder.standard().withCredentials(credentialsProvider); |
| } |
| } else { |
| if (isClientConfigFound) { |
| clientBuilder = AmazonSQSClientBuilder.standard().withClientConfiguration(clientConfiguration); |
| } else { |
| clientBuilder = AmazonSQSClientBuilder.standard(); |
| } |
| } |
| |
| final String host = getFullyQualifiedAWSHost(); |
| final String region = Regions.valueOf(configuration.getRegion()).getName(); |
| |
| log.debug("Creating endpoint for host {} on region {}", host, region); |
| clientBuilder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(host, region)); |
| |
| client = clientBuilder.build(); |
| return client; |
| } |
| |
| protected String getQueueUrl() { |
| return queueUrl; |
| } |
| |
| public int getMaxMessagesPerPoll() { |
| return maxMessagesPerPoll; |
| } |
| |
| /** |
| * Gets the maximum number of messages as a limit to poll at each polling. |
| * <p/> |
| * Is default unlimited, but use 0 or negative number to disable it as |
| * unlimited. |
| */ |
| public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { |
| this.maxMessagesPerPoll = maxMessagesPerPoll; |
| } |
| |
| private Object translateValue(MessageAttributeValue mav) { |
| Object result = null; |
| if (mav.getStringValue() != null) { |
| result = mav.getStringValue(); |
| } else if (mav.getBinaryValue() != null) { |
| result = mav.getBinaryValue(); |
| } |
| return result; |
| } |
| } |