Merge commit 'refs/pull/372/head' of github.com:apache/usergrid into two-dot-o-dev
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 6dd3a24..39ae84e 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -405,6 +405,13 @@
 #
 #usergrid.queue.deliveryLimit=5
 
+# Set the number of async workers used to publish messages to SNS
+#
+#usergrid.queue.publish.threads=100
+
+# Set the queue size for the number of messages that can be queued during async publishing to SNS
+#
+#usergrid.queue.publish.queuesize=850000
 
 
 
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index c653458..3c6a750 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -20,11 +20,10 @@
 package org.apache.usergrid.persistence.core.executor;
 
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -33,7 +32,12 @@
  */
 public class TaskExecutorFactory {
 
+    private static final Logger log = LoggerFactory.getLogger(TaskExecutorFactory.class);
 
+    public enum RejectionAction {
+        ABORT,
+        CALLERRUNS
+    }
     /**
      * Create a task executor
      * @param schedulerName
@@ -42,19 +46,28 @@
      * @return
      */
     public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
-                                                         final int maxQueueSize ) {
+                                                         final int maxQueueSize, RejectionAction rejectionAction ) {
 
 
         final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
 
 
-        final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
+        if(rejectionAction.equals(RejectionAction.ABORT)){
 
+            return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
 
-        return threadPool;
+        }
+        else if(rejectionAction.equals(RejectionAction.CALLERRUNS)){
+
+            return new MaxSizeThreadPoolCallerRuns( queue, schedulerName, maxThreadCount );
+
+        }else{
+            //default to the thread pool with ABORT policy
+            return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
+        }
+
     }
 
-
     /**
      * Create a thread pool that will reject work if our audit tasks become overwhelmed
      */
@@ -65,6 +78,17 @@
         }
     }
 
+    /**
+     * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
+     */
+    private static final class MaxSizeThreadPoolCallerRuns extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
+            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue,
+                new CountingThreadFactory( poolName ), new RejectedHandler(poolName) );
+        }
+    }
+
 
     /**
      * Thread factory that will name and count threads for easier debugging
@@ -87,9 +111,29 @@
             Thread t = new Thread( r, threadName );
 
             //set it to be a daemon thread so it doesn't block shutdown
-            t.setDaemon( true );
+            t.setDaemon(true);
 
             return t;
         }
     }
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private static final class RejectedHandler implements RejectedExecutionHandler {
+
+        private final String poolName;
+
+        private RejectedHandler (final String poolName) {this.poolName = poolName;}
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+            log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
+
+            //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
+
+            r.run();
+        }
+
+    }
 }
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 2194400..c9e389f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -126,7 +126,7 @@
 
         this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
             .createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(),
-                graphFig.getShardAuditWorkerQueueSize() ) );
+                graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
     }
 
 
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 2a9f321..66f8af4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -20,7 +20,7 @@
      */
     @Key( "usergrid.queue.region" )
     @Default("us-east-1")
-    public String getRegion();
+    String getRegion();
 
     /**
      * Flag to determine if Usergrid should use a multi-region Amazon queue
@@ -28,7 +28,7 @@
      */
     @Key( "usergrid.queue.multiregion" )
     @Default("false")
-    public boolean isMultiRegion();
+    boolean isMultiRegion();
 
     /**
      * Comma-separated list of one or more Amazon regions to use if multiregion
@@ -36,7 +36,7 @@
      */
     @Key( "usergrid.queue.regionList" )
     @Default("us-east-1")
-    public String getRegionList();
+    String getRegionList();
 
 
     /**
@@ -45,7 +45,7 @@
      */
     @Key( "usergrid.queue.retention" )
     @Default("1209600")
-    public String getRetentionPeriod();
+    String getRetentionPeriod();
 
     /**
      * Set the amount of time (in minutes) to retain messages in a dead letter queue.
@@ -53,16 +53,25 @@
      */
     @Key( "usergrid.queue.deadletter.retention" )
     @Default("1209600")
-    public String getDeadletterRetentionPeriod();
+    String getDeadletterRetentionPeriod();
 
     /**
      * The maximum number of messages to deliver to a dead letter queue.
      */
     @Key( "usergrid.queue.deliveryLimit" )
     @Default("5")
-    public String getQueueDeliveryLimit();
+    String getQueueDeliveryLimit();
 
     @Key("usergrid.use.default.queue")
     @Default("false")
-    public boolean overrideQueueForDefault();
+    boolean overrideQueueForDefault();
+
+    @Key("usergrid.queue.publish.threads")
+    @Default("100")
+    int getAsyncMaxThreads();
+
+    // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap)
+    @Key("usergrid.queue.publish.queuesize")
+    @Default("850000")
+    int getAsyncQueueSize();
 }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index bcf4499..8fb0f52 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -19,8 +19,10 @@
 
 
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.handlers.AsyncHandler;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sns.AmazonSNSAsyncClient;
 import com.amazonaws.services.sns.AmazonSNSClient;
 import com.amazonaws.services.sns.model.*;
 import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -39,15 +41,15 @@
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.Queue;
 import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
-import rx.Observable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 public class SNSQueueManagerImpl implements QueueManager {
 
@@ -57,9 +59,10 @@
     private final QueueFig fig;
     private final ClusterFig clusterFig;
     private final CassandraFig cassandraFig;
+    private final QueueFig queueFig;
     private final AmazonSQSClient sqs;
     private final AmazonSNSClient sns;
-    private final RxTaskScheduler rxTaskScheduler;
+    private final AmazonSNSAsyncClient snsAsync;
 
 
     private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -67,57 +70,59 @@
 
 
     private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder()
-            .maximumSize(1000)
-            .build(new CacheLoader<String, String>() {
-                @Override
-                public String load(String queueName)
-                        throws Exception {
+        .maximumSize(1000)
+        .build(new CacheLoader<String, String>() {
+            @Override
+            public String load(String queueName)
+                throws Exception {
 
-                    return setupTopics(queueName);
-                }
-            });
+                return setupTopics(queueName);
+            }
+        });
 
     private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder()
-            .maximumSize(1000)
-            .build(new CacheLoader<String, Queue>() {
-                @Override
-                public Queue load(String queueName) throws Exception {
+        .maximumSize(1000)
+        .build(new CacheLoader<String, Queue>() {
+            @Override
+            public Queue load(String queueName) throws Exception {
 
-                    Queue queue = null;
+                Queue queue = null;
 
-                    try {
-                        GetQueueUrlResult result = sqs.getQueueUrl(queueName);
-                        queue = new Queue(result.getQueueUrl());
-                    } catch (QueueDoesNotExistException queueDoesNotExistException) {
-                        logger.error("Queue {} does not exist, will create", queueName);
-                    } catch (Exception e) {
-                        logger.error("failed to get queue from service", e);
-                        throw e;
-                    }
-
-                    if (queue == null) {
-                        String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
-                        queue = new Queue(url);
-                    }
-
-                    setupTopics(queueName);
-
-                    return queue;
+                try {
+                    GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+                    queue = new Queue(result.getQueueUrl());
+                } catch (QueueDoesNotExistException queueDoesNotExistException) {
+                    logger.error("Queue {} does not exist, will create", queueName);
+                } catch (Exception e) {
+                    logger.error("failed to get queue from service", e);
+                    throw e;
                 }
-            });
+
+                if (queue == null) {
+                    String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
+                    queue = new Queue(url);
+                }
+
+                setupTopics(queueName);
+
+                return queue;
+            }
+        });
 
 
     @Inject
-    public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, CassandraFig cassandraFig, final RxTaskScheduler rxTaskScheduler) {
+    public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
+                               CassandraFig cassandraFig, QueueFig queueFig) {
         this.scope = scope;
         this.fig = fig;
         this.clusterFig = clusterFig;
         this.cassandraFig = cassandraFig;
-        this.rxTaskScheduler = rxTaskScheduler;
+        this.queueFig = queueFig;
 
         try {
             sqs = createSQSClient(getRegion());
             sns = createSNSClient(getRegion());
+            snsAsync = createAsyncSNSClient(getRegion());
 
         } catch (Exception e) {
             throw new RuntimeException("Error setting up mapper", e);
@@ -125,7 +130,7 @@
     }
 
     private String setupTopics(final String queueName)
-            throws Exception {
+        throws Exception {
 
         logger.info("Setting up setupTopics SNS/SQS...");
 
@@ -247,10 +252,10 @@
 
                     } catch (Exception e) {
                         logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
-                                queueARN,
-                                strSqsRegion,
-                                topicARN,
-                                strSnsRegion), e);
+                            queueARN,
+                            strSqsRegion,
+                            topicARN,
+                            strSnsRegion), e);
 
 
                     }
@@ -267,6 +272,27 @@
     }
 
     /**
+     * The Asynchronous SNS client is used for publishing events to SNS.
+     *
+     */
+
+    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+
+        // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
+        final Executor executor = TaskExecutorFactory
+            .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+                TaskExecutorFactory.RejectionAction.CALLERRUNS);
+
+        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), (ExecutorService) executor);
+
+        sns.setRegion(region);
+
+        return sns;
+    }
+
+    /**
      * The Synchronous SNS client is used for creating topics and subscribing queues.
      *
      */
@@ -380,7 +406,7 @@
     @Override
     public void sendMessages(final List bodies) throws IOException {
 
-        if (sns == null) {
+        if (snsAsync == null) {
             logger.error("SNS client is null, perhaps it failed to initialize successfully");
             return;
         }
@@ -393,40 +419,33 @@
 
     @Override
     public void sendMessage(final Object body) throws IOException {
-        Observable.just(body).doOnNext(message->{
 
-            if (sns == null) {
-                logger.error("SNS client is null, perhaps it failed to initialize successfully");
-                return;
+        if (snsAsync == null) {
+            logger.error("SNS client is null, perhaps it failed to initialize successfully");
+            return;
+        }
+
+        final String stringBody = toString(body);
+
+        String topicArn = getWriteTopicArn();
+
+        if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+
+        PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+
+        snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+            @Override
+            public void onError(Exception e) {
+                logger.error("Error publishing message... {}", e);
             }
 
-            final String stringBody;
-            try {
+            @Override
+            public void onSuccess(PublishRequest request, PublishResult result) {
+                if (logger.isDebugEnabled())
+                    logger.debug("Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(), request.getTopicArn());
 
-                stringBody = toString(body);
-                String topicArn = getWriteTopicArn();
-
-                if (logger.isDebugEnabled()){
-                    logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
-                }
-
-                PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
-
-                // publish message to SNS
-                PublishResult publishResult = sns.publish(publishRequest);
-
-                if(logger.isDebugEnabled()){
-                    logger.debug("Successfully published... messageID=[{}],  arn=[{}]",
-                        publishResult.getMessageId(), publishRequest.getTopicArn());
-                }
-
-            } catch (IOException e) {
-                logger.error("Unable to convert queue object to a string message body", e);
             }
-
-        }).doOnError(e ->{
-            logger.error("Error while publishing SNS message: ", e);
-        }).subscribeOn(rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+        });
 
     }
 
@@ -438,8 +457,8 @@
             logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
 
         sqs.deleteMessage(new DeleteMessageRequest()
-                .withQueueUrl(url)
-                .withReceiptHandle(queueMessage.getHandle()));
+            .withQueueUrl(url)
+            .withReceiptHandle(queueMessage.getHandle()));
     }
 
 
@@ -473,7 +492,7 @@
      */
 
     private Object fromString(final String s, final Class klass)
-            throws IOException, ClassNotFoundException {
+        throws IOException, ClassNotFoundException {
 
         Object o = mapper.readValue(s, klass);
         return o;