Merge commit 'refs/pull/372/head' of github.com:apache/usergrid into two-dot-o-dev
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 6dd3a24..39ae84e 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -405,6 +405,13 @@
#
#usergrid.queue.deliveryLimit=5
+# Set the number of async workers used to publish messages to SNS
+#
+#usergrid.queue.publish.threads=100
+
+# Set the queue size for the number of messages that can be queued during async publishing to SNS
+#
+#usergrid.queue.publish.queuesize=850000
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index c653458..3c6a750 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -20,11 +20,10 @@
package org.apache.usergrid.persistence.core.executor;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,7 +32,12 @@
*/
public class TaskExecutorFactory {
+ private static final Logger log = LoggerFactory.getLogger(TaskExecutorFactory.class);
+ public enum RejectionAction {
+ ABORT,
+ CALLERRUNS
+ }
/**
* Create a task executor
* @param schedulerName
@@ -42,19 +46,28 @@
* @return
*/
public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
- final int maxQueueSize ) {
+ final int maxQueueSize, RejectionAction rejectionAction ) {
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
- final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
+ if(rejectionAction.equals(RejectionAction.ABORT)){
+ return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
- return threadPool;
+ }
+ else if(rejectionAction.equals(RejectionAction.CALLERRUNS)){
+
+ return new MaxSizeThreadPoolCallerRuns( queue, schedulerName, maxThreadCount );
+
+ }else{
+ //default to the thread pool with ABORT policy
+ return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
+ }
+
}
-
/**
* Create a thread pool that will reject work if our audit tasks become overwhelmed
*/
@@ -65,6 +78,17 @@
}
}
+ /**
+ * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
+ */
+ private static final class MaxSizeThreadPoolCallerRuns extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
+ super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue,
+ new CountingThreadFactory( poolName ), new RejectedHandler(poolName) );
+ }
+ }
+
/**
* Thread factory that will name and count threads for easier debugging
@@ -87,9 +111,29 @@
Thread t = new Thread( r, threadName );
//set it to be a daemon thread so it doesn't block shutdown
- t.setDaemon( true );
+ t.setDaemon(true);
return t;
}
}
+
+ /**
+ * The handler that will handle rejected executions and signal the interface
+ */
+ private static final class RejectedHandler implements RejectedExecutionHandler {
+
+ private final String poolName;
+
+ private RejectedHandler (final String poolName) {this.poolName = poolName;}
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+ log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
+
+ //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
+
+ r.run();
+ }
+
+ }
}
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 2194400..c9e389f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -126,7 +126,7 @@
this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
.createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(),
- graphFig.getShardAuditWorkerQueueSize() ) );
+ graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 2a9f321..66f8af4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -20,7 +20,7 @@
*/
@Key( "usergrid.queue.region" )
@Default("us-east-1")
- public String getRegion();
+ String getRegion();
/**
* Flag to determine if Usergrid should use a multi-region Amazon queue
@@ -28,7 +28,7 @@
*/
@Key( "usergrid.queue.multiregion" )
@Default("false")
- public boolean isMultiRegion();
+ boolean isMultiRegion();
/**
* Comma-separated list of one or more Amazon regions to use if multiregion
@@ -36,7 +36,7 @@
*/
@Key( "usergrid.queue.regionList" )
@Default("us-east-1")
- public String getRegionList();
+ String getRegionList();
/**
@@ -45,7 +45,7 @@
*/
@Key( "usergrid.queue.retention" )
@Default("1209600")
- public String getRetentionPeriod();
+ String getRetentionPeriod();
/**
* Set the amount of time (in minutes) to retain messages in a dead letter queue.
@@ -53,16 +53,25 @@
*/
@Key( "usergrid.queue.deadletter.retention" )
@Default("1209600")
- public String getDeadletterRetentionPeriod();
+ String getDeadletterRetentionPeriod();
/**
* The maximum number of messages to deliver to a dead letter queue.
*/
@Key( "usergrid.queue.deliveryLimit" )
@Default("5")
- public String getQueueDeliveryLimit();
+ String getQueueDeliveryLimit();
@Key("usergrid.use.default.queue")
@Default("false")
- public boolean overrideQueueForDefault();
+ boolean overrideQueueForDefault();
+
+ @Key("usergrid.queue.publish.threads")
+ @Default("100")
+ int getAsyncMaxThreads();
+
+ // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap)
+ @Key("usergrid.queue.publish.queuesize")
+ @Default("850000")
+ int getAsyncQueueSize();
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index bcf4499..8fb0f52 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -19,8 +19,10 @@
import com.amazonaws.AmazonServiceException;
+import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.*;
import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -39,15 +41,15 @@
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.Queue;
import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
-import rx.Observable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
public class SNSQueueManagerImpl implements QueueManager {
@@ -57,9 +59,10 @@
private final QueueFig fig;
private final ClusterFig clusterFig;
private final CassandraFig cassandraFig;
+ private final QueueFig queueFig;
private final AmazonSQSClient sqs;
private final AmazonSNSClient sns;
- private final RxTaskScheduler rxTaskScheduler;
+ private final AmazonSNSAsyncClient snsAsync;
private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -67,57 +70,59 @@
private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<String, String>() {
- @Override
- public String load(String queueName)
- throws Exception {
+ .maximumSize(1000)
+ .build(new CacheLoader<String, String>() {
+ @Override
+ public String load(String queueName)
+ throws Exception {
- return setupTopics(queueName);
- }
- });
+ return setupTopics(queueName);
+ }
+ });
private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<String, Queue>() {
- @Override
- public Queue load(String queueName) throws Exception {
+ .maximumSize(1000)
+ .build(new CacheLoader<String, Queue>() {
+ @Override
+ public Queue load(String queueName) throws Exception {
- Queue queue = null;
+ Queue queue = null;
- try {
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
- queue = new Queue(result.getQueueUrl());
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
- logger.error("Queue {} does not exist, will create", queueName);
- } catch (Exception e) {
- logger.error("failed to get queue from service", e);
- throw e;
- }
-
- if (queue == null) {
- String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
- queue = new Queue(url);
- }
-
- setupTopics(queueName);
-
- return queue;
+ try {
+ GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+ queue = new Queue(result.getQueueUrl());
+ } catch (QueueDoesNotExistException queueDoesNotExistException) {
+ logger.error("Queue {} does not exist, will create", queueName);
+ } catch (Exception e) {
+ logger.error("failed to get queue from service", e);
+ throw e;
}
- });
+
+ if (queue == null) {
+ String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
+ queue = new Queue(url);
+ }
+
+ setupTopics(queueName);
+
+ return queue;
+ }
+ });
@Inject
- public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, CassandraFig cassandraFig, final RxTaskScheduler rxTaskScheduler) {
+ public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
+ CassandraFig cassandraFig, QueueFig queueFig) {
this.scope = scope;
this.fig = fig;
this.clusterFig = clusterFig;
this.cassandraFig = cassandraFig;
- this.rxTaskScheduler = rxTaskScheduler;
+ this.queueFig = queueFig;
try {
sqs = createSQSClient(getRegion());
sns = createSNSClient(getRegion());
+ snsAsync = createAsyncSNSClient(getRegion());
} catch (Exception e) {
throw new RuntimeException("Error setting up mapper", e);
@@ -125,7 +130,7 @@
}
private String setupTopics(final String queueName)
- throws Exception {
+ throws Exception {
logger.info("Setting up setupTopics SNS/SQS...");
@@ -247,10 +252,10 @@
} catch (Exception e) {
logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
- queueARN,
- strSqsRegion,
- topicARN,
- strSnsRegion), e);
+ queueARN,
+ strSqsRegion,
+ topicARN,
+ strSnsRegion), e);
}
@@ -267,6 +272,27 @@
}
/**
+ * The Asynchronous SNS client is used for publishing events to SNS.
+ *
+ */
+
+ private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+
+ // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
+ final Executor executor = TaskExecutorFactory
+ .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+ TaskExecutorFactory.RejectionAction.CALLERRUNS);
+
+ final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), (ExecutorService) executor);
+
+ sns.setRegion(region);
+
+ return sns;
+ }
+
+ /**
* The Synchronous SNS client is used for creating topics and subscribing queues.
*
*/
@@ -380,7 +406,7 @@
@Override
public void sendMessages(final List bodies) throws IOException {
- if (sns == null) {
+ if (snsAsync == null) {
logger.error("SNS client is null, perhaps it failed to initialize successfully");
return;
}
@@ -393,40 +419,33 @@
@Override
public void sendMessage(final Object body) throws IOException {
- Observable.just(body).doOnNext(message->{
- if (sns == null) {
- logger.error("SNS client is null, perhaps it failed to initialize successfully");
- return;
+ if (snsAsync == null) {
+ logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ return;
+ }
+
+ final String stringBody = toString(body);
+
+ String topicArn = getWriteTopicArn();
+
+ if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+
+ PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+
+ snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+ @Override
+ public void onError(Exception e) {
+ logger.error("Error publishing message... {}", e);
}
- final String stringBody;
- try {
+ @Override
+ public void onSuccess(PublishRequest request, PublishResult result) {
+ if (logger.isDebugEnabled())
+ logger.debug("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), request.getTopicArn());
- stringBody = toString(body);
- String topicArn = getWriteTopicArn();
-
- if (logger.isDebugEnabled()){
- logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
- }
-
- PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
-
- // publish message to SNS
- PublishResult publishResult = sns.publish(publishRequest);
-
- if(logger.isDebugEnabled()){
- logger.debug("Successfully published... messageID=[{}], arn=[{}]",
- publishResult.getMessageId(), publishRequest.getTopicArn());
- }
-
- } catch (IOException e) {
- logger.error("Unable to convert queue object to a string message body", e);
}
-
- }).doOnError(e ->{
- logger.error("Error while publishing SNS message: ", e);
- }).subscribeOn(rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+ });
}
@@ -438,8 +457,8 @@
logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
sqs.deleteMessage(new DeleteMessageRequest()
- .withQueueUrl(url)
- .withReceiptHandle(queueMessage.getHandle()));
+ .withQueueUrl(url)
+ .withReceiptHandle(queueMessage.getHandle()));
}
@@ -473,7 +492,7 @@
*/
private Object fromString(final String s, final Class klass)
- throws IOException, ClassNotFoundException {
+ throws IOException, ClassNotFoundException {
Object o = mapper.readValue(s, klass);
return o;