| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. The ASF licenses this file to You |
| * under the Apache License, Version 2.0 (the "License"); you may not |
| * use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. For additional information regarding |
| * copyright in this work, please see the NOTICE file in the top level |
| * directory of this distribution. |
| */ |
| package org.apache.usergrid.persistence.queue.impl; |
| |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.*; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| |
| import com.amazonaws.ClientConfiguration; |
| import com.amazonaws.services.sqs.model.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.usergrid.persistence.core.CassandraConfig; |
| import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; |
| import org.apache.usergrid.persistence.core.guicyfig.ClusterFig; |
| import org.apache.usergrid.persistence.queue.LegacyQueue; |
| import org.apache.usergrid.persistence.queue.LegacyQueueFig; |
| import org.apache.usergrid.persistence.queue.LegacyQueueManager; |
| import org.apache.usergrid.persistence.queue.LegacyQueueMessage; |
| import org.apache.usergrid.persistence.queue.LegacyQueueScope; |
| import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; |
| |
| import com.amazonaws.AmazonServiceException; |
| import com.amazonaws.handlers.AsyncHandler; |
| import com.amazonaws.regions.Region; |
| import com.amazonaws.regions.Regions; |
| import com.amazonaws.services.sns.AmazonSNSAsyncClient; |
| import com.amazonaws.services.sns.AmazonSNSClient; |
| import com.amazonaws.services.sns.model.PublishRequest; |
| import com.amazonaws.services.sns.model.PublishResult; |
| import com.amazonaws.services.sns.model.SubscribeRequest; |
| import com.amazonaws.services.sns.model.SubscribeResult; |
| import com.amazonaws.services.sqs.AmazonSQSAsyncClient; |
| import com.amazonaws.services.sqs.AmazonSQSClient; |
| import com.fasterxml.jackson.core.JsonFactory; |
| import com.fasterxml.jackson.core.JsonParser; |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Preconditions; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.inject.Inject; |
| import com.google.inject.assistedinject.Assisted; |
| |
| |
| public class SNSQueueManagerImpl implements LegacyQueueManager { |
| |
| private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class ); |
| |
| private final LegacyQueueScope scope; |
| private final LegacyQueueFig fig; |
| private final ClusterFig clusterFig; |
| private final CassandraConfig cassandraConfig; |
| private final ClientConfiguration clientConfiguration; |
| private final AmazonSQSClient sqs; |
| private final AmazonSNSClient sns; |
| private final AmazonSNSAsyncClient snsAsync; |
| private final AmazonSQSAsyncClient sqsAsync; |
| |
| |
| private static final JsonFactory JSON_FACTORY = new JsonFactory(); |
| private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY ); |
| private static final int MIN_CLIENT_SOCKET_TIMEOUT = 5000; // millis |
| private static final int MIN_VISIBILITY_TIMEOUT = 1; //seconds |
| private static final String DEAD_LETTER_QUEUE_SUFFIX = "_dead"; |
| |
| private static final String FAILED_TO_SEND_MESSAGE = "FAILED INDEX REQUEST: Failed to send message to SNS Queue, sending asynchronously. Message:[{}] URL:[{}] Error:[{}]"; |
| |
| static { |
| |
| /** |
| * Because of the way SNS escapes all our json, we have to tell jackson to accept it. See the documentation |
| * here for how SNS borks the message body |
| * |
| * http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html |
| */ |
| mapper.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true ); |
| } |
| |
| |
| private final LoadingCache<String, String> writeTopicArnMap = |
| CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, String>() { |
| @Override |
| public String load( String queueName ) throws Exception { |
| |
| return setupTopics( queueName ); |
| } |
| } ); |
| |
| private final LoadingCache<String, LegacyQueue> readQueueUrlMap = |
| CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, LegacyQueue>() { |
| @Override |
| public LegacyQueue load(String queueName ) throws Exception { |
| |
| LegacyQueue queue = null; |
| |
| try { |
| GetQueueUrlResult result = sqs.getQueueUrl( queueName ); |
| queue = new LegacyQueue( result.getQueueUrl() ); |
| } |
| catch ( QueueDoesNotExistException queueDoesNotExistException ) { |
| if (queueName.endsWith(DEAD_LETTER_QUEUE_SUFFIX)) { |
| // don't auto-create dead letter queues |
| logger.error("failed to get dead letter queue from service, won't create", queueDoesNotExistException); |
| throw queueDoesNotExistException; |
| } |
| logger.error( "Queue {} does not exist, will create", queueName ); |
| } |
| catch ( Exception e ) { |
| logger.error( "failed to get queue from service", e ); |
| throw e; |
| } |
| |
| if ( queue == null ) { |
| String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig ); |
| queue = new LegacyQueue( url ); |
| } |
| |
| setupTopics( queueName ); |
| |
| return queue; |
| } |
| } ); |
| |
| |
| @Inject |
| public SNSQueueManagerImpl(@Assisted LegacyQueueScope scope, LegacyQueueFig fig, ClusterFig clusterFig, |
| CassandraConfig cassandraConfig, LegacyQueueFig queueFig ) { |
| this.scope = scope; |
| this.fig = fig; |
| this.clusterFig = clusterFig; |
| this.cassandraConfig = cassandraConfig; |
| |
| |
| // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks |
| final ExecutorService executor = TaskExecutorFactory |
| .createTaskExecutor( "amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(), |
| TaskExecutorFactory.RejectionAction.CALLERRUNS ); |
| |
| |
| final Region region = getRegion(); |
| |
| this.clientConfiguration = new ClientConfiguration() |
| .withConnectionTimeout(queueFig.getQueueClientConnectionTimeout()) |
| // don't let the socket timeout be configured less than 5 sec (network delays do happen) |
| .withSocketTimeout(Math.max(MIN_CLIENT_SOCKET_TIMEOUT, queueFig.getQueueClientSocketTimeout())) |
| .withGzip(true); |
| |
| try { |
| sqs = createSQSClient( region ); |
| sns = createSNSClient( region ); |
| snsAsync = createAsyncSNSClient( region, executor ); |
| sqsAsync = createAsyncSQSClient( region, executor ); |
| } |
| catch ( Exception e ) { |
| throw new RuntimeException( "Error setting up mapper", e ); |
| } |
| } |
| |
| |
| private String setupTopics( final String queueName ) throws Exception { |
| |
| logger.info( "Setting up setupTopics SNS/SQS..." ); |
| |
| String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true ); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "SNS/SQS Setup: primaryTopicArn={}", primaryTopicArn ); |
| } |
| |
| String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName ); |
| String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName ); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "SNS/SQS Setup: primaryQueueArn={}", primaryQueueArn ); |
| } |
| |
| if ( primaryQueueArn == null ) { |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." ); |
| } |
| |
| queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig ); |
| primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl ); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn ); |
| } |
| } |
| |
| try { |
| |
| SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn ); |
| sns.subscribe( primarySubscribeRequest ); |
| |
| // ensure the SNS primary topic has permission to send to the primary SQS queue |
| List<String> primaryTopicArnList = new ArrayList<>(); |
| primaryTopicArnList.add( primaryTopicArn ); |
| AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList ); |
| } |
| catch ( AmazonServiceException e ) { |
| logger.error( |
| "Unable to subscribe PRIMARY queue=[{}] to topic=[{}]", queueUrl, primaryTopicArn, e ); |
| } |
| |
| if ( fig.isMultiRegion() && scope.getRegionImplementation() == LegacyQueueScope.RegionImplementation.ALL ) { |
| |
| String multiRegion = fig.getRegionList(); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "MultiRegion Setup specified, regions: [{}]", multiRegion ); |
| } |
| |
| String[] regionNames = multiRegion.split( "," ); |
| |
| final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 ); |
| final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 ); |
| |
| arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion()); |
| topicArns.put(primaryTopicArn, fig.getPrimaryRegion()); |
| |
| for ( String regionName : regionNames ) { |
| |
| regionName = regionName.trim(); |
| Region region = null; |
| try { |
| Regions regions = Regions.fromName(regionName); |
| region = Region.getRegion(regions); |
| } |
| catch (IllegalArgumentException e) { |
| throw new IllegalArgumentException("INVALID REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_QUEUE_REGION_LIST + ": " + regionName, e); |
| } |
| |
| AmazonSQSClient sqsClient = createSQSClient( region ); |
| AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously |
| |
| // getTopicArn will create the SNS topic if it doesn't exist |
| String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true ); |
| topicArns.put( topicArn, regionName ); |
| |
| // create the SQS queue if it doesn't exist |
| String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName ); |
| if ( queueArn == null ) { |
| queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig ); |
| queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl ); |
| } |
| |
| arrQueueArns.put( queueArn, regionName ); |
| } |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("Creating Subscriptions..."); |
| } |
| |
| for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) { |
| String queueARN = queueArnEntry.getKey(); |
| String strSqsRegion = queueArnEntry.getValue(); |
| |
| Regions sqsRegions = Regions.fromName( strSqsRegion ); |
| Region sqsRegion = Region.getRegion( sqsRegions ); |
| |
| AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion ); |
| |
| // ensure the URL used to subscribe is for the correct name/region |
| String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName ); |
| |
| // this list used later for adding permissions to queues |
| List<String> topicArnList = new ArrayList<>(); |
| |
| for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) { |
| |
| String topicARN = topicArnEntry.getKey(); |
| topicArnList.add( topicARN ); |
| |
| String strSnsRegion = topicArnEntry.getValue(); |
| Regions snsRegions = Regions.fromName( strSnsRegion ); |
| Region snsRegion = Region.getRegion( snsRegions ); |
| |
| AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously |
| SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN ); |
| |
| try { |
| |
| logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN, |
| strSqsRegion, topicARN, strSnsRegion ); |
| |
| SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest ); |
| String subscriptionARN = subscribeResult.getSubscriptionArn(); |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( |
| "Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", |
| queueARN, topicARN, subscriptionARN ); |
| } |
| } |
| catch ( Exception e ) { |
| logger.error( "ERROR Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", |
| queueARN, strSqsRegion, topicARN, strSnsRegion , e ); |
| } |
| } |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("Adding permission to receive messages..."); |
| } |
| // add permission to each queue, providing a list of topics that it's subscribed to |
| AmazonNotificationUtils |
| .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList ); |
| } |
| } |
| |
| return primaryTopicArn; |
| } |
| |
| |
| /** |
| * The Asynchronous SNS client is used for publishing events to SNS. |
| */ |
| |
| private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) { |
| |
| final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); |
| final AmazonSNSAsyncClient sns = |
| new AmazonSNSAsyncClient( ugProvider.getCredentials(), clientConfiguration, executor ); |
| |
| sns.setRegion( region ); |
| |
| return sns; |
| } |
| |
| |
| /** |
| * Create the async sqs client |
| */ |
| private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) { |
| |
| final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); |
| final AmazonSQSAsyncClient sqs = |
| new AmazonSQSAsyncClient( ugProvider.getCredentials(),clientConfiguration, executor ); |
| |
| sqs.setRegion( region ); |
| |
| return sqs; |
| } |
| |
| |
| /** |
| * The Synchronous SNS client is used for creating topics and subscribing queues. |
| */ |
| private AmazonSNSClient createSNSClient( final Region region ) { |
| |
| final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); |
| final AmazonSNSClient sns = |
| new AmazonSNSClient( ugProvider.getCredentials(), clientConfiguration ); |
| |
| sns.setRegion( region ); |
| |
| return sns; |
| } |
| |
| |
| private String getName(final boolean isDeadLetter) { |
| String name = |
| clusterFig.getClusterName() + "_" + cassandraConfig.getApplicationKeyspace() + "_" + scope.getName() + "_" |
| + scope.getRegionImplementation(); |
| if (isDeadLetter) { |
| name += DEAD_LETTER_QUEUE_SUFFIX; |
| } |
| name = name.toLowerCase(); //user lower case values |
| Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" ); |
| |
| return name; |
| } |
| |
| private String getName() { |
| return getName(false); |
| } |
| |
| |
| public LegacyQueue getReadQueue() { |
| String queueName = getName(scope.isDeadLetterQueue()); |
| |
| try { |
| return readQueueUrlMap.get( queueName ); |
| } |
| catch ( ExecutionException ee ) { |
| throw new RuntimeException( ee ); |
| } |
| } |
| |
| |
| public String getWriteTopicArn() { |
| try { |
| return writeTopicArnMap.get( getName() ); |
| } |
| catch ( ExecutionException ee ) { |
| throw new RuntimeException( ee ); |
| } |
| } |
| |
| |
| @Override |
| public List<LegacyQueueMessage> getMessages(final int limit, final Class klass) { |
| |
| if ( sqs == null ) { |
| logger.error( "SQS is null - was not initialized properly" ); |
| return new ArrayList<>(0); |
| } |
| |
| String url = getReadQueue().getUrl(); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Getting up to {} messages from {}", limit, url ); |
| } |
| |
| ArrayList<String> requestMessageAttributeNames = new ArrayList<String>(1); |
| requestMessageAttributeNames.add("ApproximateReceiveCount"); |
| |
| |
| ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url ); |
| receiveMessageRequest.setAttributeNames(requestMessageAttributeNames); |
| receiveMessageRequest.setMaxNumberOfMessages( limit ); |
| receiveMessageRequest.setVisibilityTimeout( |
| Math.max( MIN_VISIBILITY_TIMEOUT, fig.getVisibilityTimeout() / 1000 ) ); |
| |
| |
| int longPollTimeout = Math.min(20000, fig.getQueuePollTimeout()); // 20000 is the SQS maximum |
| |
| // ensure the client's socket timeout is not less than the configure long poll timeout |
| if( fig.getQueueClientSocketTimeout() < longPollTimeout){ |
| |
| longPollTimeout = Math.max(0, fig.getQueueClientSocketTimeout() - 1000); |
| |
| } |
| |
| receiveMessageRequest.setWaitTimeSeconds( longPollTimeout / 1000 ); // convert to seconds |
| |
| try { |
| ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest ); |
| List<Message> messages = result.getMessages(); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Received {} messages from {}", messages.size(), url ); |
| } |
| |
| List<LegacyQueueMessage> queueMessages = new ArrayList<>( messages.size() ); |
| |
| for ( Message message : messages ) { |
| |
| Object payload; |
| final String originalBody = message.getBody(); |
| |
| try { |
| final JsonNode bodyNode = mapper.readTree( message.getBody() ); |
| |
| /** |
| * When a message originates from SNS it has a "Message" we have to extract |
| * it and then process it separately |
| */ |
| |
| |
| if ( bodyNode.has( "Message" ) ) { |
| final String snsNode = bodyNode.get( "Message" ).asText(); |
| |
| payload = deSerializeSQSMessage( snsNode, klass ); |
| } |
| else { |
| payload = deSerializeSQSMessage( originalBody, klass ); |
| } |
| } |
| catch ( Exception e ) { |
| logger.error( "failed to deserialize message: {}", message.getBody(), e ); |
| throw new RuntimeException( e ); |
| } |
| |
| LegacyQueueMessage queueMessage = new LegacyQueueMessage( message.getMessageId(), message.getReceiptHandle(), payload, |
| message.getAttributes().get( "type" ) ); |
| queueMessage.setStringBody( originalBody ); |
| int receiveCount = Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount")); |
| queueMessage.setReceiveCount( receiveCount ); |
| queueMessages.add( queueMessage ); |
| } |
| |
| return queueMessages ; |
| } |
| catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) { |
| logger.error( "Queue does not exist! [{}]", url , dne ); |
| } |
| catch ( Exception e ) { |
| logger.error( "Programming error getting messages from queue=[{}] exist!", url, e ); |
| } |
| |
| return new ArrayList<>( 0 ) ; |
| } |
| |
| |
| /** |
| * Take a string, possibly escaped via SNS, and run it through our mapper to create an object) |
| */ |
| private Object deSerializeSQSMessage( final String message, final Class type ) { |
| try { |
| final Object o = mapper.readValue( message, type ); |
| return o; |
| } |
| catch ( Exception e ) { |
| throw new RuntimeException( "Unable to deserialize message " + message + " for class " + type, e ); |
| } |
| } |
| |
| |
| @Override |
| public long getQueueDepth() { |
| String key = "ApproximateNumberOfMessages"; |
| try { |
| GetQueueAttributesResult result = |
| sqs.getQueueAttributes( getReadQueue().getUrl(), Collections.singletonList( key ) ); |
| String depthString = result.getAttributes().get( key ); |
| return depthString != null ? Long.parseLong( depthString ) : 0; |
| } |
| catch ( Exception e ) { |
| logger.error( "Exception getting queue depth", e ); |
| return -1; |
| } |
| } |
| |
| |
| @Override |
| public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException { |
| boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue(); |
| if (sendAsync) { |
| sendMessageToAllRegionsAsync(body); |
| } else { |
| sendMessageToAllRegionsSync(body); |
| } |
| } |
| |
| |
| private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException { |
| if ( sns == null ) { |
| logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); |
| return; |
| } |
| |
| final String stringBody = toString( body ); |
| |
| String topicArn = getWriteTopicArn(); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn ); |
| } |
| |
| try { |
| PublishResult publishResult = sns.publish(topicArn, toString(body)); |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Successfully published... messageID=[{}], arn=[{}]", publishResult.getMessageId(), |
| topicArn ); |
| } |
| } catch (Exception e) { |
| if (logger.isErrorEnabled()) { |
| logger.error(FAILED_TO_SEND_MESSAGE, stringBody, topicArn, e); |
| } |
| sendMessageToAllRegionsAsync(body); |
| } |
| |
| |
| |
| } |
| |
| |
| private <T extends Serializable> void sendMessageToAllRegionsAsync(final T body ) throws IOException { |
| if ( snsAsync == null ) { |
| logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); |
| return; |
| } |
| |
| final String stringBody = toString( body ); |
| |
| final String topicArn = getWriteTopicArn(); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn ); |
| } |
| |
| PublishRequest publishRequest = new PublishRequest( topicArn, stringBody ); |
| |
| snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() { |
| @Override |
| public void onError( Exception e ) { |
| logger.error( "Error publishing message... {}", e ); |
| logger.error(FAILED_TO_SEND_MESSAGE, stringBody, topicArn, e); |
| } |
| |
| |
| @Override |
| public void onSuccess( PublishRequest request, PublishResult result ) { |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), |
| request.getTopicArn() ); |
| } |
| } |
| } ); |
| } |
| |
| @Override |
| public void sendMessagesAsync( final List bodies ) throws IOException { |
| if ( sqsAsync == null ) { |
| logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); |
| return; |
| } |
| |
| for ( Object body : bodies ) { |
| sendMessageToLocalRegionAsync( ( Serializable ) body ); |
| } |
| } |
| |
| |
| @Override |
| public void sendMessages( final List bodies ) throws IOException { |
| for ( Object body : bodies ) { |
| if (fig.isAsyncQueue()) { |
| sendMessageToLocalRegionAsync((Serializable) body); |
| } else { |
| sendMessageToLocalRegionSync((Serializable) body); |
| } |
| } |
| } |
| |
| |
| @Override |
| public List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException { |
| |
| List<LegacyQueueMessage> successMessages = new ArrayList<>(); |
| |
| if ( sqs == null ) { |
| logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); |
| return successMessages; |
| } |
| |
| String url = getReadQueue().getUrl(); |
| |
| List<SendMessageBatchRequestEntry> entries = new ArrayList<>(); |
| |
| for (LegacyQueueMessage queueMessage : queueMessages) { |
| entries.add(new SendMessageBatchRequestEntry(queueMessage.getMessageId(), queueMessage.getStringBody())); |
| } |
| |
| SendMessageBatchResult result = sqs.sendMessageBatch(url, entries); |
| |
| Set<String> successIDs = new HashSet<>(); |
| logger.debug("sendQueueMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size()); |
| |
| for (SendMessageBatchResultEntry batchResultEntry : result.getSuccessful()) { |
| successIDs.add(batchResultEntry.getId()); |
| } |
| |
| for (LegacyQueueMessage queueMessage : queueMessages) { |
| if (successIDs.contains(queueMessage.getMessageId())) { |
| successMessages.add(queueMessage); |
| } |
| } |
| |
| return successMessages; |
| } |
| |
| @Override |
| public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException { |
| boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue(); |
| if (sendAsync) { |
| sendMessageToLocalRegionAsync(body); |
| } else { |
| sendMessageToLocalRegionSync(body); |
| } |
| } |
| |
| private <T extends Serializable> void sendMessageToLocalRegionSync(final T body) throws IOException { |
| |
| if ( sqs == null ) { |
| logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); |
| return; |
| } |
| final String stringBody = toString( body ); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug(" sendMessageToLocalRegion " + stringBody); |
| } |
| |
| String url = getReadQueue().getUrl(); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Publishing Message...{} to url: {}", stringBody, url ); |
| } |
| |
| SendMessageRequest messageRequest = new SendMessageRequest(url, stringBody); |
| try { |
| SendMessageResult result = sqs.sendMessage(messageRequest); |
| if (logger.isTraceEnabled()) { |
| logger.trace("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), |
| url); |
| } |
| } catch (Exception e) { |
| logger.error(FAILED_TO_SEND_MESSAGE, messageRequest.getMessageBody(), url, e); |
| sendMessageToLocalRegionAsync(body); |
| } |
| |
| |
| } |
| |
| |
| private <T extends Serializable> void sendMessageToLocalRegionAsync(final T body ) throws IOException { |
| |
| if ( sqsAsync == null ) { |
| logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); |
| return; |
| } |
| final String stringBody = toString( body ); |
| String url = getReadQueue().getUrl(); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Publishing Message...{} to url: {}", stringBody, url ); |
| } |
| |
| SendMessageRequest request = new SendMessageRequest( url, stringBody ); |
| |
| sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() { |
| |
| @Override |
| public void onError( final Exception e ) { |
| logger.error(FAILED_TO_SEND_MESSAGE, stringBody, url, e); |
| } |
| |
| |
| @Override |
| public void onSuccess( final SendMessageRequest request, final SendMessageResult sendMessageResult ) { |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Successfully send... messageBody=[{}], url=[{}]", request.getMessageBody(), |
| request.getQueueUrl() ); |
| } |
| } |
| } ); |
| } |
| |
| |
| @Override |
| public void deleteQueue() { |
| logger.warn( "Deleting queue: " + getReadQueue().getUrl() ); |
| sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() ) ); |
| logger.warn( "Deleting queue: " + getReadQueue().getUrl() + "_dead" ); |
| sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() + "_dead" ) ); |
| } |
| |
| |
| @Override |
| public void commitMessage( final LegacyQueueMessage queueMessage ) { |
| String url = getReadQueue().getUrl(); |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Commit message {} to queue {}", queueMessage.getMessageId(), url ); |
| } |
| |
| sqs.deleteMessage( |
| new DeleteMessageRequest().withQueueUrl( url ).withReceiptHandle( queueMessage.getHandle() ) ); |
| } |
| |
| |
| @Override |
| public void commitMessages( final List<LegacyQueueMessage> queueMessages ) { |
| String url = getReadQueue().getUrl(); |
| |
| if ( logger.isTraceEnabled() ) { |
| logger.trace( "Commit messages {} to queue {}", queueMessages.size(), url ); |
| } |
| |
| List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); |
| |
| for ( LegacyQueueMessage message : queueMessages ) { |
| entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) ); |
| } |
| |
| DeleteMessageBatchRequest request = new DeleteMessageBatchRequest( url, entries ); |
| DeleteMessageBatchResult result = sqs.deleteMessageBatch( request ); |
| |
| boolean successful = result.getFailed().size() <= 0; |
| logger.debug("commitMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size()); |
| |
| if ( !successful ) { |
| for ( BatchResultErrorEntry failed : result.getFailed() ) { |
| logger.error( "Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId() ); |
| } |
| } |
| } |
| |
| |
| /** |
| * Write the object to a Base64 string. |
| */ |
| private String toString( final Object o ) throws IOException { |
| return mapper.writeValueAsString( o ); |
| } |
| |
| |
| /** |
| * Get the region |
| */ |
| private Region getRegion() { |
| String regionName = fig.getPrimaryRegion(); |
| try { |
| Regions regions = Regions.fromName(regionName); |
| return Region.getRegion(regions); |
| } |
| catch (IllegalArgumentException e) { |
| throw new IllegalArgumentException("INVALID PRIMARY REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_QUEUE_REGION_LOCAL + ": " + regionName, e); |
| } |
| } |
| |
| |
| /** |
| * Create the SQS client for the specified settings |
| */ |
| private AmazonSQSClient createSQSClient( final Region region ) { |
| |
| final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); |
| final AmazonSQSClient sqs = |
| new AmazonSQSClient( ugProvider.getCredentials(), clientConfiguration ); |
| |
| sqs.setRegion( region ); |
| |
| return sqs; |
| } |
| |
| @Override |
| public void clearQueueNameCache(){ |
| //no-op |
| } |
| } |