/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  The ASF licenses this file to You
 * under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.  For additional information regarding
 * copyright in this work, please see the NOTICE file in the top level
 * directory of this distribution.
 */
package org.apache.usergrid.persistence.queue.impl;


import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.sqs.model.*;
import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
import org.apache.usergrid.persistence.queue.LegacyQueue;
import org.apache.usergrid.persistence.queue.LegacyQueueFig;
import org.apache.usergrid.persistence.queue.LegacyQueueManager;
import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
import org.apache.usergrid.persistence.queue.LegacyQueueScope;
import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;


public class SNSQueueManagerImpl implements LegacyQueueManager {

    private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class );

    private final LegacyQueueScope scope;
    private final LegacyQueueFig fig;
    private final ClusterFig clusterFig;
    private final CassandraConfig cassandraConfig;
    private final ClientConfiguration clientConfiguration;
    private final AmazonSQSClient sqs;
    private final AmazonSNSClient sns;
    private final AmazonSNSAsyncClient snsAsync;
    private final AmazonSQSAsyncClient sqsAsync;


    private static final JsonFactory JSON_FACTORY = new JsonFactory();
    private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY );
    private static final int MIN_CLIENT_SOCKET_TIMEOUT = 5000; // millis
    private static final int MIN_VISIBILITY_TIMEOUT = 1; //seconds
    private static final String DEAD_LETTER_QUEUE_SUFFIX = "_dead";

    private static final String FAILED_TO_SEND_MESSAGE = "FAILED INDEX REQUEST: Failed to send message to SNS Queue, sending asynchronously. Message:[{}] URL:[{}] Error:[{}]";

    static {

        /**
         * Because of the way SNS escapes all our json, we have to tell jackson to accept it.  See the documentation
         * here for how SNS borks the message body
         *
         *  http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
         */
        mapper.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
    }


    private final LoadingCache<String, String> writeTopicArnMap =
        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, String>() {
            @Override
            public String load( String queueName ) throws Exception {

                return setupTopics( queueName );
            }
        } );

    private final LoadingCache<String, LegacyQueue> readQueueUrlMap =
        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, LegacyQueue>() {
            @Override
            public LegacyQueue load(String queueName ) throws Exception {

                LegacyQueue queue = null;

                try {
                    GetQueueUrlResult result = sqs.getQueueUrl( queueName );
                    queue = new LegacyQueue( result.getQueueUrl() );
                }
                catch ( QueueDoesNotExistException queueDoesNotExistException ) {
                    if (queueName.endsWith(DEAD_LETTER_QUEUE_SUFFIX)) {
                        // don't auto-create dead letter queues
                        logger.error("failed to get dead letter queue from service, won't create", queueDoesNotExistException);
                        throw queueDoesNotExistException;
                    }
                    logger.error( "Queue {} does not exist, will create", queueName );
                }
                catch ( Exception e ) {
                    logger.error( "failed to get queue from service", e );
                    throw e;
                }

                if ( queue == null ) {
                    String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
                    queue = new LegacyQueue( url );
                }

                setupTopics( queueName );

                return queue;
            }
        } );


    @Inject
    public SNSQueueManagerImpl(@Assisted LegacyQueueScope scope, LegacyQueueFig fig, ClusterFig clusterFig,
                               CassandraConfig cassandraConfig, LegacyQueueFig queueFig ) {
        this.scope = scope;
        this.fig = fig;
        this.clusterFig = clusterFig;
        this.cassandraConfig = cassandraConfig;


        // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
        final ExecutorService executor = TaskExecutorFactory
            .createTaskExecutor( "amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
                TaskExecutorFactory.RejectionAction.CALLERRUNS );


        final Region region = getRegion();

        this.clientConfiguration = new ClientConfiguration()
            .withConnectionTimeout(queueFig.getQueueClientConnectionTimeout())
            // don't let the socket timeout be configured less than 5 sec (network delays do happen)
            .withSocketTimeout(Math.max(MIN_CLIENT_SOCKET_TIMEOUT, queueFig.getQueueClientSocketTimeout()))
            .withGzip(true);

        try {
            sqs = createSQSClient( region );
            sns = createSNSClient( region );
            snsAsync = createAsyncSNSClient( region, executor );
            sqsAsync = createAsyncSQSClient( region, executor );
        }
        catch ( Exception e ) {
            throw new RuntimeException( "Error setting up mapper", e );
        }
    }


    private String setupTopics( final String queueName ) throws Exception {

        logger.info( "Setting up setupTopics SNS/SQS..." );

        String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true );

        if ( logger.isTraceEnabled() ) {
            logger.trace( "SNS/SQS Setup: primaryTopicArn={}", primaryTopicArn );
        }

        String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName );
        String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName );

        if ( logger.isTraceEnabled() ) {
            logger.trace( "SNS/SQS Setup: primaryQueueArn={}", primaryQueueArn );
        }

        if ( primaryQueueArn == null ) {
            if ( logger.isTraceEnabled() ) {
                logger.trace( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." );
            }

            queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
            primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl );

            if ( logger.isTraceEnabled() ) {
                logger.trace( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn );
            }
        }

        try {

            SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn );
            sns.subscribe( primarySubscribeRequest );

            // ensure the SNS primary topic has permission to send to the primary SQS queue
            List<String> primaryTopicArnList = new ArrayList<>();
            primaryTopicArnList.add( primaryTopicArn );
            AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList );
        }
        catch ( AmazonServiceException e ) {
            logger.error(
                "Unable to subscribe PRIMARY queue=[{}] to topic=[{}]", queueUrl, primaryTopicArn, e );
        }

        if ( fig.isMultiRegion() && scope.getRegionImplementation() == LegacyQueueScope.RegionImplementation.ALL ) {

            String multiRegion = fig.getRegionList();

            if ( logger.isTraceEnabled() ) {
                logger.trace( "MultiRegion Setup specified, regions: [{}]", multiRegion );
            }

            String[] regionNames = multiRegion.split( "," );

            final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
            final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );

            arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion());
            topicArns.put(primaryTopicArn, fig.getPrimaryRegion());

            for ( String regionName : regionNames ) {

                regionName = regionName.trim();
                Region region = null;
                try {
                    Regions regions = Regions.fromName(regionName);
                    region = Region.getRegion(regions);
                }
                catch (IllegalArgumentException e) {
                    throw new IllegalArgumentException("INVALID REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_CLUSTER_REGION_LIST + ": " + regionName, e);
                }

                AmazonSQSClient sqsClient = createSQSClient( region );
                AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously

                // getTopicArn will create the SNS topic if it doesn't exist
                String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
                topicArns.put( topicArn, regionName );

                // create the SQS queue if it doesn't exist
                String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
                if ( queueArn == null ) {
                    queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
                    queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
                }

                arrQueueArns.put( queueArn, regionName );
            }

            if (logger.isTraceEnabled()) {
                logger.trace("Creating Subscriptions...");
            }

            for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
                String queueARN = queueArnEntry.getKey();
                String strSqsRegion = queueArnEntry.getValue();

                Regions sqsRegions = Regions.fromName( strSqsRegion );
                Region sqsRegion = Region.getRegion( sqsRegions );

                AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion );

                // ensure the URL used to subscribe is for the correct name/region
                String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName );

                // this list used later for adding permissions to queues
                List<String> topicArnList = new ArrayList<>();

                for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) {

                    String topicARN = topicArnEntry.getKey();
                    topicArnList.add( topicARN );

                    String strSnsRegion = topicArnEntry.getValue();
                    Regions snsRegions = Regions.fromName( strSnsRegion );
                    Region snsRegion = Region.getRegion( snsRegions );

                    AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously
                    SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN );

                    try {

                        logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN,
                            strSqsRegion, topicARN, strSnsRegion );

                        SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest );
                        String subscriptionARN = subscribeResult.getSubscriptionArn();
                        if ( logger.isTraceEnabled() ) {
                            logger.trace(
                                "Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]",
                                queueARN, topicARN, subscriptionARN );
                        }
                    }
                    catch ( Exception e ) {
                        logger.error( "ERROR Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
                                queueARN, strSqsRegion, topicARN, strSnsRegion , e );
                    }
                }

                if (logger.isTraceEnabled()) {
                    logger.trace("Adding permission to receive messages...");
                }
                // add permission to each queue, providing a list of topics that it's subscribed to
                AmazonNotificationUtils
                    .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList );
            }
        }

        return primaryTopicArn;
    }


    /**
     * The Asynchronous SNS client is used for publishing events to SNS.
     */

    private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) {

        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
        final AmazonSNSAsyncClient sns =
            new AmazonSNSAsyncClient( ugProvider.getCredentials(), clientConfiguration, executor );

        sns.setRegion( region );

        return sns;
    }


    /**
     * Create the async sqs client
     */
    private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) {

        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
        final AmazonSQSAsyncClient sqs =
            new AmazonSQSAsyncClient( ugProvider.getCredentials(),clientConfiguration,  executor );

        sqs.setRegion( region );

        return sqs;
    }


    /**
     * The Synchronous SNS client is used for creating topics and subscribing queues.
     */
    private AmazonSNSClient createSNSClient( final Region region ) {

        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
        final AmazonSNSClient sns =
            new AmazonSNSClient( ugProvider.getCredentials(), clientConfiguration );

        sns.setRegion( region );

        return sns;
    }


    private String getName(final boolean isDeadLetter) {
        String name =
            clusterFig.getClusterName() + "_" + cassandraConfig.getApplicationKeyspace() + "_" + scope.getName() + "_"
                + scope.getRegionImplementation();
        if (isDeadLetter) {
            name += DEAD_LETTER_QUEUE_SUFFIX;
        }
        name = name.toLowerCase(); //user lower case values
        Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );

        return name;
    }

    private String getName() {
        return getName(false);
    }


    public LegacyQueue getReadQueue() {
        String queueName = getName(scope.isDeadLetterQueue());

        try {
            return readQueueUrlMap.get( queueName );
        }
        catch ( ExecutionException ee ) {
            throw new RuntimeException( ee );
        }
    }


    public String getWriteTopicArn() {
        try {
            return writeTopicArnMap.get( getName() );
        }
        catch ( ExecutionException ee ) {
            throw new RuntimeException( ee );
        }
    }


    @Override
    public List<LegacyQueueMessage> getMessages(final int limit, final Class klass) {

        if ( sqs == null ) {
            logger.error( "SQS is null - was not initialized properly" );
            return new ArrayList<>(0);
        }

        String url = getReadQueue().getUrl();

        if ( logger.isTraceEnabled() ) {
            logger.trace( "Getting up to {} messages from {}", limit, url );
        }

        ArrayList<String> requestMessageAttributeNames = new ArrayList<String>(1);
        requestMessageAttributeNames.add("ApproximateReceiveCount");


        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
        receiveMessageRequest.setAttributeNames(requestMessageAttributeNames);
        receiveMessageRequest.setMaxNumberOfMessages( limit );
        receiveMessageRequest.setVisibilityTimeout(
            Math.max( MIN_VISIBILITY_TIMEOUT, fig.getVisibilityTimeout() / 1000 ) );


        int longPollTimeout = Math.min(20000, fig.getQueuePollTimeout()); // 20000 is the SQS maximum

        // ensure the client's socket timeout is not less than the configure long poll timeout
        if( fig.getQueueClientSocketTimeout() < longPollTimeout){

            longPollTimeout = Math.max(0, fig.getQueueClientSocketTimeout() - 1000);

        }

        receiveMessageRequest.setWaitTimeSeconds( longPollTimeout / 1000 ); // convert to seconds

        try {
            ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest );
            List<Message> messages = result.getMessages();

            if ( logger.isTraceEnabled() ) {
                logger.trace( "Received {} messages from {}", messages.size(), url );
            }

            List<LegacyQueueMessage> queueMessages = new ArrayList<>( messages.size() );

            for ( Message message : messages ) {

                Object payload;
                final String originalBody = message.getBody();

                try {
                    final JsonNode bodyNode = mapper.readTree( message.getBody() );

                    /**
                     * When a message originates from SNS it has a "Message"  we have to extract
                     * it and then process it separately
                     */


                    if ( bodyNode.has( "Message" ) ) {
                        final String snsNode = bodyNode.get( "Message" ).asText();

                        payload = deSerializeSQSMessage( snsNode, klass );
                    }
                    else {
                        payload = deSerializeSQSMessage( originalBody, klass );
                    }
                }
                catch ( Exception e ) {
                    logger.error( "failed to deserialize message: {}", message.getBody(), e );
                    throw new RuntimeException( e );
                }

                LegacyQueueMessage queueMessage = new LegacyQueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
                    message.getAttributes().get( "type" ) );
                queueMessage.setStringBody( originalBody );
                int receiveCount = Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount"));
                queueMessage.setReceiveCount( receiveCount );
                queueMessages.add( queueMessage );
            }

            return  queueMessages ;
        }
        catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) {
            logger.error( "Queue does not exist! [{}]", url , dne );
        }
        catch ( Exception e ) {
            logger.error( "Programming error getting messages from queue=[{}] exist!", url, e );
        }

        return  new ArrayList<>( 0 ) ;
    }


    /**
     * Take a string, possibly escaped via SNS, and run it through our mapper to create an object)
     */
    private Object deSerializeSQSMessage( final String message, final Class type ) {
        try {
            final Object o = mapper.readValue( message, type );
            return o;
        }
        catch ( Exception e ) {
            throw new RuntimeException( "Unable to deserialize message " + message + " for class " + type, e );
        }
    }


    @Override
    public long getQueueDepth() {
        String key = "ApproximateNumberOfMessages";
        try {
            GetQueueAttributesResult result =
                sqs.getQueueAttributes( getReadQueue().getUrl(), Collections.singletonList( key ) );
            String depthString = result.getAttributes().get( key );
            return depthString != null ? Long.parseLong( depthString ) : 0;
        }
        catch ( Exception e ) {
            logger.error( "Exception getting queue depth", e );
            return -1;
        }
    }

    @Override
    public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException {
        boolean sendAsync = async == null || async.booleanValue();
        if (sendAsync) {
            sendMessageToAllRegionsAsync(body);
        } else {
            sendMessageToAllRegionsSync(body);
        }
    }

    private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException {
        if ( sns == null ) {
            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
            return;
        }

        final String stringBody = toString( body );

        String topicArn = getWriteTopicArn();

        if ( logger.isTraceEnabled() ) {
            logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn );
        }

        try {
            PublishResult publishResult = sns.publish(topicArn, toString(body));
            if ( logger.isTraceEnabled() ) {
                logger.trace( "Successfully published... messageID=[{}],  arn=[{}]", publishResult.getMessageId(),
                    topicArn );
            }
        } catch (Exception e) {
            if (logger.isErrorEnabled()) {
                logger.error(FAILED_TO_SEND_MESSAGE,  stringBody, topicArn, e);
            }
            sendMessageToAllRegionsAsync(body);
        }



    }


    private <T extends Serializable> void sendMessageToAllRegionsAsync(final T body ) throws IOException {
        if ( snsAsync == null ) {
            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
            return;
        }

        final String stringBody = toString( body );

        final String topicArn = getWriteTopicArn();

        if ( logger.isTraceEnabled() ) {
            logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn );
        }

        PublishRequest publishRequest = new PublishRequest( topicArn, stringBody );

        snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
            @Override
            public void onError( Exception e ) {
                logger.error( "Error publishing message... {}", e );
                logger.error(FAILED_TO_SEND_MESSAGE, stringBody, topicArn, e);
            }


            @Override
            public void onSuccess( PublishRequest request, PublishResult result ) {
                if ( logger.isTraceEnabled() ) {
                    logger.trace( "Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
                        request.getTopicArn() );
                }
            }
        } );
    }

    @Override
    public void sendMessagesAsync( final List bodies ) throws IOException {
        if ( sqsAsync == null ) {
            logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
            return;
        }

        for ( Object body : bodies ) {
            sendMessageToLocalRegionAsync( ( Serializable ) body );
        }
    }


    @Override
    public void sendMessages( final List bodies ) throws IOException {
        QueueIndexingStrategy queueIndexingStrategy = QueueIndexingStrategy.get(fig.getQueueStrategy());
        for ( Object body : bodies ) {
            if (queueIndexingStrategy == QueueIndexingStrategy.ASYNC) {
                sendMessageToLocalRegionAsync((Serializable) body);
            } else {
                sendMessageToLocalRegionSync((Serializable) body);
            }
        }
    }


    @Override
    public List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException {

        List<LegacyQueueMessage> successMessages = new ArrayList<>();

        if ( sqs == null ) {
            logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
            return successMessages;
        }

        String url = getReadQueue().getUrl();

        List<SendMessageBatchRequestEntry> entries = new ArrayList<>();

        for (LegacyQueueMessage queueMessage : queueMessages) {
            entries.add(new SendMessageBatchRequestEntry(queueMessage.getMessageId(), queueMessage.getStringBody()));
        }

        SendMessageBatchResult result = sqs.sendMessageBatch(url, entries);

        Set<String> successIDs = new HashSet<>();
        logger.debug("sendQueueMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size());

        for (SendMessageBatchResultEntry batchResultEntry : result.getSuccessful()) {
            successIDs.add(batchResultEntry.getId());
        }

        for (LegacyQueueMessage queueMessage : queueMessages) {
            if (successIDs.contains(queueMessage.getMessageId())) {
                successMessages.add(queueMessage);
            }
        }

        return successMessages;
    }

    @Override
    public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
        boolean sendAsync = async.booleanValue();
        if (sendAsync) {
            sendMessageToLocalRegionAsync(body);
        } else {
            sendMessageToLocalRegionSync(body);
        }
    }

    private <T extends Serializable> void sendMessageToLocalRegionSync(final T body) throws IOException {

        if ( sqs == null ) {
            logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
            return;
        }
        final String stringBody = toString( body );

        if (logger.isDebugEnabled()) {
            logger.debug(" sendMessageToLocalRegion " + stringBody);
        }

        String url = getReadQueue().getUrl();

        if ( logger.isTraceEnabled() ) {
            logger.trace( "Publishing Message...{} to url: {}", stringBody, url );
        }

        SendMessageRequest messageRequest = new SendMessageRequest(url, stringBody);
        try {
            SendMessageResult result = sqs.sendMessage(messageRequest);
            if (logger.isTraceEnabled()) {
                logger.trace("Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
                    url);
            }
        } catch (Exception e) {
            logger.error(FAILED_TO_SEND_MESSAGE,  messageRequest.getMessageBody(), url, e);
            sendMessageToLocalRegionAsync(body);
        }


    }


    private <T extends Serializable> void sendMessageToLocalRegionAsync(final T body ) throws IOException {

        if ( sqsAsync == null ) {
            logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
            return;
        }
        final String stringBody = toString( body );
        String url = getReadQueue().getUrl();

        if ( logger.isTraceEnabled() ) {
            logger.trace( "Publishing Message...{} to url: {}", stringBody, url );
        }

        SendMessageRequest request = new SendMessageRequest( url, stringBody );

        sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() {

            @Override
            public void onError( final Exception e ) {
                logger.error(FAILED_TO_SEND_MESSAGE, stringBody, url, e);
            }


            @Override
            public void onSuccess( final SendMessageRequest request, final SendMessageResult sendMessageResult ) {
                if ( logger.isTraceEnabled() ) {
                    logger.trace( "Successfully send... messageBody=[{}],  url=[{}]", request.getMessageBody(),
                        request.getQueueUrl() );
                }
            }
        } );
    }


    @Override
    public void deleteQueue() {
        logger.warn( "Deleting queue: " + getReadQueue().getUrl() );
        sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() ) );
        logger.warn( "Deleting queue: " + getReadQueue().getUrl() + "_dead" );
        sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() + "_dead" ) );
    }


    @Override
    public void commitMessage( final LegacyQueueMessage queueMessage ) {
        String url = getReadQueue().getUrl();
        if ( logger.isTraceEnabled() ) {
            logger.trace( "Commit message {} to queue {}", queueMessage.getMessageId(), url );
        }

        sqs.deleteMessage(
            new DeleteMessageRequest().withQueueUrl( url ).withReceiptHandle( queueMessage.getHandle() ) );
    }


    @Override
    public void commitMessages( final List<LegacyQueueMessage> queueMessages ) {
        String url = getReadQueue().getUrl();

        if ( logger.isTraceEnabled() ) {
            logger.trace( "Commit messages {} to queue {}", queueMessages.size(), url );
        }

        List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();

        for ( LegacyQueueMessage message : queueMessages ) {
            entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) );
        }

        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest( url, entries );
        DeleteMessageBatchResult result = sqs.deleteMessageBatch( request );

        boolean successful = result.getFailed().size() <= 0;
        logger.debug("commitMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size());

        if ( !successful ) {
            for ( BatchResultErrorEntry failed : result.getFailed() ) {
                logger.error( "Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId() );
            }
        }
    }


    /**
     * Write the object to a Base64 string.
     */
    private String toString( final Object o ) throws IOException {
        return mapper.writeValueAsString( o );
    }


    /**
     * Get the region
     */
    private Region getRegion() {
        String regionName = fig.getPrimaryRegion();
        try {
            Regions regions = Regions.fromName(regionName);
            return Region.getRegion(regions);
        }
        catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("INVALID PRIMARY REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_CLUSTER_REGION_LOCAL + ": " + regionName, e);
        }
    }


    /**
     * Create the SQS client for the specified settings
     */
    private AmazonSQSClient createSQSClient( final Region region ) {

        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
        final AmazonSQSClient sqs =
            new AmazonSQSClient( ugProvider.getCredentials(), clientConfiguration );

        sqs.setRegion( region );

        return sqs;
    }

    @Override
    public void clearQueueNameCache(){
       //no-op
    }
}
