Merge commit 'refs/pull/372/head' of github.com:apache/usergrid into two-dot-o-dev
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties index 6dd3a24..39ae84e 100644 --- a/stack/config/src/main/resources/usergrid-default.properties +++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -405,6 +405,13 @@ # #usergrid.queue.deliveryLimit=5 +# Set the number of async workers used to publish messages to SNS +# +#usergrid.queue.publish.threads=100 + +# Set the queue size for the number of messages that can be queued during async publishing to SNS +# +#usergrid.queue.publish.queuesize=850000
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java index c653458..3c6a750 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -20,11 +20,10 @@ package org.apache.usergrid.persistence.core.executor; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -33,7 +32,12 @@ */ public class TaskExecutorFactory { + private static final Logger log = LoggerFactory.getLogger(TaskExecutorFactory.class); + public enum RejectionAction { + ABORT, + CALLERRUNS + } /** * Create a task executor * @param schedulerName @@ -42,19 +46,28 @@ * @return */ public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount, - final int maxQueueSize ) { + final int maxQueueSize, RejectionAction rejectionAction ) { final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize ); - final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount ); + if(rejectionAction.equals(RejectionAction.ABORT)){ + return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount ); - return threadPool; + } + else if(rejectionAction.equals(RejectionAction.CALLERRUNS)){ + + return new MaxSizeThreadPoolCallerRuns( queue, schedulerName, maxThreadCount ); + + }else{ + //default to the thread pool with ABORT policy + return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount ); + } + } - /** * Create a thread pool that will reject work if our audit tasks become overwhelmed */ @@ -65,6 +78,17 @@ } } + /** + * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed + */ + private static final class MaxSizeThreadPoolCallerRuns extends ThreadPoolExecutor { + + public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) { + super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, + new CountingThreadFactory( poolName ), new RejectedHandler(poolName) ); + } + } + /** * Thread factory that will name and count threads for easier debugging @@ -87,9 +111,29 @@ Thread t = new Thread( r, threadName ); //set it to be a daemon thread so it doesn't block shutdown - t.setDaemon( true ); + t.setDaemon(true); return t; } } + + /** + * The handler that will handle rejected executions and signal the interface + */ + private static final class RejectedHandler implements RejectedExecutionHandler { + + private final String poolName; + + private RejectedHandler (final String poolName) {this.poolName = poolName;} + + @Override + public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) { + log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() ); + + //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected + + r.run(); + } + + } }
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index 2194400..c9e389f 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -126,7 +126,7 @@ this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory .createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(), - graphFig.getShardAuditWorkerQueueSize() ) ); + graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) ); }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index 2a9f321..66f8af4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -20,7 +20,7 @@ */ @Key( "usergrid.queue.region" ) @Default("us-east-1") - public String getRegion(); + String getRegion(); /** * Flag to determine if Usergrid should use a multi-region Amazon queue @@ -28,7 +28,7 @@ */ @Key( "usergrid.queue.multiregion" ) @Default("false") - public boolean isMultiRegion(); + boolean isMultiRegion(); /** * Comma-separated list of one or more Amazon regions to use if multiregion @@ -36,7 +36,7 @@ */ @Key( "usergrid.queue.regionList" ) @Default("us-east-1") - public String getRegionList(); + String getRegionList(); /** @@ -45,7 +45,7 @@ */ @Key( "usergrid.queue.retention" ) @Default("1209600") - public String getRetentionPeriod(); + String getRetentionPeriod(); /** * Set the amount of time (in minutes) to retain messages in a dead letter queue. @@ -53,16 +53,25 @@ */ @Key( "usergrid.queue.deadletter.retention" ) @Default("1209600") - public String getDeadletterRetentionPeriod(); + String getDeadletterRetentionPeriod(); /** * The maximum number of messages to deliver to a dead letter queue. */ @Key( "usergrid.queue.deliveryLimit" ) @Default("5") - public String getQueueDeliveryLimit(); + String getQueueDeliveryLimit(); @Key("usergrid.use.default.queue") @Default("false") - public boolean overrideQueueForDefault(); + boolean overrideQueueForDefault(); + + @Key("usergrid.queue.publish.threads") + @Default("100") + int getAsyncMaxThreads(); + + // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap) + @Key("usergrid.queue.publish.queuesize") + @Default("850000") + int getAsyncQueueSize(); }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index bcf4499..8fb0f52 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -19,8 +19,10 @@ import com.amazonaws.AmazonServiceException; +import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; +import com.amazonaws.services.sns.AmazonSNSAsyncClient; import com.amazonaws.services.sns.AmazonSNSClient; import com.amazonaws.services.sns.model.*; import com.amazonaws.services.sqs.AmazonSQSClient; @@ -39,15 +41,15 @@ import org.apache.usergrid.persistence.queue.*; import org.apache.usergrid.persistence.queue.Queue; import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; - +import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; -import rx.Observable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; public class SNSQueueManagerImpl implements QueueManager { @@ -57,9 +59,10 @@ private final QueueFig fig; private final ClusterFig clusterFig; private final CassandraFig cassandraFig; + private final QueueFig queueFig; private final AmazonSQSClient sqs; private final AmazonSNSClient sns; - private final RxTaskScheduler rxTaskScheduler; + private final AmazonSNSAsyncClient snsAsync; private final JsonFactory JSON_FACTORY = new JsonFactory(); @@ -67,57 +70,59 @@ private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder() - .maximumSize(1000) - .build(new CacheLoader<String, String>() { - @Override - public String load(String queueName) - throws Exception { + .maximumSize(1000) + .build(new CacheLoader<String, String>() { + @Override + public String load(String queueName) + throws Exception { - return setupTopics(queueName); - } - }); + return setupTopics(queueName); + } + }); private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder() - .maximumSize(1000) - .build(new CacheLoader<String, Queue>() { - @Override - public Queue load(String queueName) throws Exception { + .maximumSize(1000) + .build(new CacheLoader<String, Queue>() { + @Override + public Queue load(String queueName) throws Exception { - Queue queue = null; + Queue queue = null; - try { - GetQueueUrlResult result = sqs.getQueueUrl(queueName); - queue = new Queue(result.getQueueUrl()); - } catch (QueueDoesNotExistException queueDoesNotExistException) { - logger.error("Queue {} does not exist, will create", queueName); - } catch (Exception e) { - logger.error("failed to get queue from service", e); - throw e; - } - - if (queue == null) { - String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig); - queue = new Queue(url); - } - - setupTopics(queueName); - - return queue; + try { + GetQueueUrlResult result = sqs.getQueueUrl(queueName); + queue = new Queue(result.getQueueUrl()); + } catch (QueueDoesNotExistException queueDoesNotExistException) { + logger.error("Queue {} does not exist, will create", queueName); + } catch (Exception e) { + logger.error("failed to get queue from service", e); + throw e; } - }); + + if (queue == null) { + String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig); + queue = new Queue(url); + } + + setupTopics(queueName); + + return queue; + } + }); @Inject - public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, CassandraFig cassandraFig, final RxTaskScheduler rxTaskScheduler) { + public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, + CassandraFig cassandraFig, QueueFig queueFig) { this.scope = scope; this.fig = fig; this.clusterFig = clusterFig; this.cassandraFig = cassandraFig; - this.rxTaskScheduler = rxTaskScheduler; + this.queueFig = queueFig; try { sqs = createSQSClient(getRegion()); sns = createSNSClient(getRegion()); + snsAsync = createAsyncSNSClient(getRegion()); } catch (Exception e) { throw new RuntimeException("Error setting up mapper", e); @@ -125,7 +130,7 @@ } private String setupTopics(final String queueName) - throws Exception { + throws Exception { logger.info("Setting up setupTopics SNS/SQS..."); @@ -247,10 +252,10 @@ } catch (Exception e) { logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]", - queueARN, - strSqsRegion, - topicARN, - strSnsRegion), e); + queueARN, + strSqsRegion, + topicARN, + strSnsRegion), e); } @@ -267,6 +272,27 @@ } /** + * The Asynchronous SNS client is used for publishing events to SNS. + * + */ + + private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) { + final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); + + + // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks + final Executor executor = TaskExecutorFactory + .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(), + TaskExecutorFactory.RejectionAction.CALLERRUNS); + + final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), (ExecutorService) executor); + + sns.setRegion(region); + + return sns; + } + + /** * The Synchronous SNS client is used for creating topics and subscribing queues. * */ @@ -380,7 +406,7 @@ @Override public void sendMessages(final List bodies) throws IOException { - if (sns == null) { + if (snsAsync == null) { logger.error("SNS client is null, perhaps it failed to initialize successfully"); return; } @@ -393,40 +419,33 @@ @Override public void sendMessage(final Object body) throws IOException { - Observable.just(body).doOnNext(message->{ - if (sns == null) { - logger.error("SNS client is null, perhaps it failed to initialize successfully"); - return; + if (snsAsync == null) { + logger.error("SNS client is null, perhaps it failed to initialize successfully"); + return; + } + + final String stringBody = toString(body); + + String topicArn = getWriteTopicArn(); + + if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn); + + PublishRequest publishRequest = new PublishRequest(topicArn, stringBody); + + snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() { + @Override + public void onError(Exception e) { + logger.error("Error publishing message... {}", e); } - final String stringBody; - try { + @Override + public void onSuccess(PublishRequest request, PublishResult result) { + if (logger.isDebugEnabled()) + logger.debug("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), request.getTopicArn()); - stringBody = toString(body); - String topicArn = getWriteTopicArn(); - - if (logger.isDebugEnabled()){ - logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn); - } - - PublishRequest publishRequest = new PublishRequest(topicArn, stringBody); - - // publish message to SNS - PublishResult publishResult = sns.publish(publishRequest); - - if(logger.isDebugEnabled()){ - logger.debug("Successfully published... messageID=[{}], arn=[{}]", - publishResult.getMessageId(), publishRequest.getTopicArn()); - } - - } catch (IOException e) { - logger.error("Unable to convert queue object to a string message body", e); } - - }).doOnError(e ->{ - logger.error("Error while publishing SNS message: ", e); - }).subscribeOn(rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + }); } @@ -438,8 +457,8 @@ logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url); sqs.deleteMessage(new DeleteMessageRequest() - .withQueueUrl(url) - .withReceiptHandle(queueMessage.getHandle())); + .withQueueUrl(url) + .withReceiptHandle(queueMessage.getHandle())); } @@ -473,7 +492,7 @@ */ private Object fromString(final String s, final Class klass) - throws IOException, ClassNotFoundException { + throws IOException, ClassNotFoundException { Object o = mapper.readValue(s, klass); return o;