Merge commit 'refs/pull/573/head' of github.com:apache/usergrid
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 0d1a193..d505a96 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -305,10 +305,15 @@
#
#elasticsearch.refresh_search_max=10
-# Set the amount of time to wait when Elasticsearch rejects a requests before
+# Set the amount of time to wait when indexing or utility queue rejects a request before
# retrying. This provides simple backpressure. (in milliseconds)
#
-#elasticsearch.rejected_retry_wait
+#elasticsearch.rejected_retry_wait=1000
+
+# Set the amount of time to wait when indexing or utility dead letter queue rejects a request before
+# retrying. This provides simple backpressure. (in milliseconds)
+#
+#elasticsearch.deadletter.rejected_retry_wait=2000
@@ -332,18 +337,29 @@
#
#usergrid.use.default.queue=false
-# The number of worker threads used to read index write requests from the queue.
+# The number of worker threads used to read index write requests from the indexing queue.
#
#elasticsearch.worker_count=8
+# The number of worker threads used to read index write requests from the utility queue.
+#
+#elasticsearch.worker_count_utility=2
+
+# The number of worker threads used to read dead letter messages from the indexing dead letter queue.
+#
+#elasticsearch.worker_count_deadletter=1
+
+# The number of worker threads used to read dead letter messages from the utility dead letter queue.
+#
+#elasticsearch.worker_count_utility_deadletter=1
+
# Set the number of worker threads used for processing index write requests to
# Elasticsearch from the buffer.
#
#index.flush.workers=10
# Set the implementation to use for queuing in Usergrid.
-# Valid values: TEST, LOCAL, SQS, SNS
-# NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
+# Valid values: LOCAL, DISTRIBUTED, DISTRIBUTED_SNS
#
#elasticsearch.queue_impl=LOCAL
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 75d2ce0..530cf7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -74,6 +74,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.commons.lang.StringUtils.indexOf;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
@@ -103,10 +104,13 @@
public int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+ public static final String DEAD_LETTER_SUFFIX = "_dead";
- private final LegacyQueueManager queue;
+ private final LegacyQueueManager indexQueue;
private final LegacyQueueManager utilityQueue;
+ private final LegacyQueueManager indexQueueDead;
+ private final LegacyQueueManager utilityQueueDead;
private final IndexProcessorFig indexProcessorFig;
private final LegacyQueueFig queueFig;
private final IndexProducer indexProducer;
@@ -128,6 +132,8 @@
private final Counter indexErrorCounter;
private final AtomicLong counter = new AtomicLong();
private final AtomicLong counterUtility = new AtomicLong();
+ private final AtomicLong counterIndexDead = new AtomicLong();
+ private final AtomicLong counterUtilityDead = new AtomicLong();
private final AtomicLong inFlight = new AtomicLong();
private final Histogram messageCycle;
private final MapManager esMapPersistence;
@@ -162,14 +168,22 @@
this.rxTaskScheduler = rxTaskScheduler;
- LegacyQueueScope queueScope =
+ LegacyQueueScope indexQueueScope =
new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
LegacyQueueScope utilityQueueScope =
new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
- this.queue = queueManagerFactory.getQueueManager(queueScope);
+ LegacyQueueScope indexQueueDeadScope =
+ new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
+
+ LegacyQueueScope utilityQueueDeadScope =
+ new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
+
+ this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+ this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
+ this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
this.indexProcessorFig = indexProcessorFig;
this.queueFig = queueFig;
@@ -201,7 +215,7 @@
try {
//signal to SQS
- this.queue.sendMessageToLocalRegion( operation );
+ this.indexQueue.sendMessageToLocalRegion( operation );
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -218,7 +232,7 @@
if (forUtilityQueue) {
this.utilityQueue.sendMessageToAllRegions(operation);
} else {
- this.queue.sendMessageToAllRegions(operation);
+ this.indexQueue.sendMessageToAllRegions(operation);
}
}
catch ( IOException e ) {
@@ -237,7 +251,7 @@
if( forUtilityQueue ){
this.utilityQueue.sendMessages(operations);
}else{
- this.queue.sendMessages(operations);
+ this.indexQueue.sendMessages(operations);
}
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
@@ -264,7 +278,7 @@
final Timer.Context timer = this.readTimer.time();
try {
- return queue.getMessages(MAX_TAKE, AsyncEvent.class);
+ return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
}
finally {
//stop our timer
@@ -288,6 +302,38 @@
}
}
+ /**
+ * Take message from index dead letter queue
+ */
+ private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
+
+ final Timer.Context timer = this.readTimer.time();
+
+ try {
+ return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+ }
+ finally {
+ //stop our timer
+ timer.stop();
+ }
+ }
+
+ /**
+ * Take message from SQS utility dead letter queue
+ */
+ private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
+
+ final Timer.Context timer = this.readTimer.time();
+
+ try {
+ return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+ }
+ finally {
+ //stop our timer
+ timer.stop();
+ }
+ }
+
/**
* Ack message
@@ -300,7 +346,7 @@
for ( LegacyQueueMessage legacyQueueMessage : messages ) {
try {
- queue.commitMessage( legacyQueueMessage );
+ indexQueue.commitMessage( legacyQueueMessage );
inFlight.decrementAndGet();
} catch ( Throwable t ) {
@@ -331,6 +377,28 @@
}
/**
+ * ack messages in index dead letter queue
+ */
+ public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
+ try{
+ indexQueueDead.commitMessages( messages );
+ }catch(Exception e){
+ throw new RuntimeException("Unable to ack messages", e);
+ }
+ }
+
+ /**
+ * ack messages in utility dead letter queue
+ */
+ public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
+ try{
+ utilityQueueDead.commitMessages( messages );
+ }catch(Exception e){
+ throw new RuntimeException("Unable to ack messages", e);
+ }
+ }
+
+ /**
* calls the event handlers and returns a result with information on whether
* it needs to be ack'd and whether it needs to be indexed
* @param messages
@@ -656,10 +724,13 @@
indexOperationMessage =
ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
+ } else if (System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getMapMessageTimeout()) {
+ // if esMapPersistence message hasn't been received yet, log and return (will be acked)
+ logger.error("ES map message never received, removing message from queue. indexBatchId={}", messageId);
+ return;
} else {
-
- throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
-
+ logger.warn("ES map message not received yet. indexBatchId={} elapsedTimeMsec={}", messageId, System.currentTimeMillis() - elasticsearchIndexEvent.getCreationTime());
+ throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
}
} else {
@@ -744,7 +815,7 @@
@Override
public long getQueueDepth() {
- return queue.getQueueDepth();
+ return indexQueue.getQueueDepth();
}
@Override
@@ -806,16 +877,26 @@
* Loop through and start the workers
*/
public void start() {
- final int count = indexProcessorFig.getWorkerCount();
+ final int indexCount = indexProcessorFig.getWorkerCount();
final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+ final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
+ final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < indexCount; i++) {
startWorker(QUEUE_NAME);
}
for (int i = 0; i < utilityCount; i++) {
startWorker(QUEUE_NAME_UTILITY);
}
+
+ for (int i = 0; i < indexDeadCount; i++) {
+ startDeadQueueWorker(QUEUE_NAME);
+ }
+
+ for (int i = 0; i < utilityDeadCount; i++) {
+ startDeadQueueWorker(QUEUE_NAME_UTILITY);
+ }
}
@@ -840,47 +921,166 @@
boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
Observable<List<LegacyQueueMessage>> consumer =
+ Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
+ @Override
+ public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
+
+ //name our thread so it's easy to see
+ long threadNum = isUtilityQueue ?
+ counterUtility.incrementAndGet() : counter.incrementAndGet();
+ Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+
+ List<LegacyQueueMessage> drainList = null;
+
+ do {
+ try {
+ if ( isUtilityQueue ){
+ drainList = takeFromUtilityQueue();
+ }else{
+ drainList = take();
+
+ }
+ //emit our list in it's entity to hand off to a worker pool
+ subscriber.onNext(drainList);
+
+ //take since we're in flight
+ inFlight.addAndGet( drainList.size() );
+
+ } catch ( Throwable t ) {
+
+ final long sleepTime = indexProcessorFig.getFailureRetryTime();
+
+ // there might be an error here during tests, just clean the cache
+ indexQueue.clearQueueNameCache();
+
+ if ( t instanceof InvalidQueryException ) {
+
+ // don't fill up log with exceptions when keyspace and column
+ // families are not ready during bootstrap/setup
+ logger.warn( "Failed to dequeue due to '{}'. Sleeping for {} ms",
+ t.getMessage(), sleepTime );
+
+ } else {
+ logger.error( "Failed to dequeue. Sleeping for {} ms", sleepTime, t);
+ }
+
+ if ( drainList != null ) {
+ inFlight.addAndGet( -1 * drainList.size() );
+ }
+
+ try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
+
+ indexErrorCounter.inc();
+ }
+ }
+ while ( true );
+ }
+ } ) //this won't block our read loop, just reads and proceeds
+ .flatMap( sqsMessages -> {
+
+ //do this on a different schedule, and introduce concurrency
+ // with flatmap for faster processing
+ return Observable.just( sqsMessages )
+
+ .map( messages -> {
+ if ( messages == null || messages.size() == 0 ) {
+ // no messages came from the queue, move on
+ return null;
+ }
+
+ try {
+ // process the messages
+ List<IndexEventResult> indexEventResults =
+ callEventHandlers( messages );
+
+ // submit the processed messages to index producer
+ List<LegacyQueueMessage> messagesToAck =
+ submitToIndex( indexEventResults, isUtilityQueue );
+
+ if ( messagesToAck.size() < messages.size() ) {
+ logger.warn(
+ "Missing {} message(s) from index processing",
+ messages.size() - messagesToAck.size() );
+ }
+
+ // ack each message if making it to this point
+ if( messagesToAck.size() > 0 ){
+
+ if ( isUtilityQueue ){
+ ackUtilityQueue( messagesToAck );
+ }else{
+ ack( messagesToAck );
+ }
+ }
+
+ return messagesToAck;
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to ack messages", e );
+ return null;
+ //do not rethrow so we can process all of them
+ }
+ } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+
+ //end flatMap
+ }, indexProcessorFig.getEventConcurrencyFactor() );
+
+ //start in the background
+
+ final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
+
+ subscriptions.add(subscription);
+ }
+ }
+
+
+ private void startDeadQueueWorker(final String type) {
+ Preconditions.checkNotNull(type, "Worker type required");
+ synchronized (mutex) {
+
+ boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+
+ Observable<List<LegacyQueueMessage>> consumer =
Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@Override
public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
//name our thread so it's easy to see
- long threadNum = isUtilityQueue ?
- counterUtility.incrementAndGet() : counter.incrementAndGet();
- Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+ long threadNum = isUtilityDeadQueue ?
+ counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
+ Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum );
List<LegacyQueueMessage> drainList = null;
do {
try {
- if ( isUtilityQueue ){
- drainList = takeFromUtilityQueue();
+ if ( isUtilityDeadQueue ){
+ drainList = takeFromUtilityDeadQueue();
}else{
- drainList = take();
-
+ drainList = takeFromIndexDeadQueue();
}
//emit our list in it's entity to hand off to a worker pool
- subscriber.onNext(drainList);
+ subscriber.onNext(drainList);
//take since we're in flight
inFlight.addAndGet( drainList.size() );
} catch ( Throwable t ) {
- final long sleepTime = indexProcessorFig.getFailureRetryTime();
+ final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
// there might be an error here during tests, just clean the cache
- queue.clearQueueNameCache();
+ indexQueueDead.clearQueueNameCache();
if ( t instanceof InvalidQueryException ) {
// don't fill up log with exceptions when keyspace and column
// families are not ready during bootstrap/setup
- logger.warn( "Failed to dequeue due to '{}'. Sleeping for {} ms",
+ logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
t.getMessage(), sleepTime );
} else {
- logger.error( "Failed to dequeue. Sleeping for {} ms", sleepTime, t);
+ logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
}
if ( drainList != null ) {
@@ -888,8 +1088,6 @@
}
try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
-
- indexErrorCounter.inc();
}
}
while ( true );
@@ -908,31 +1106,47 @@
}
try {
- // process the messages
- List<IndexEventResult> indexEventResults =
- callEventHandlers( messages );
-
- // submit the processed messages to index producer
- List<LegacyQueueMessage> messagesToAck =
- submitToIndex( indexEventResults, isUtilityQueue );
-
- if ( messagesToAck.size() < messages.size() ) {
- logger.warn(
- "Missing {} message(s) from index processing",
- messages.size() - messagesToAck.size() );
+ // put the dead letter messages back in the appropriate queue
+ LegacyQueueManager returnQueue = null;
+ String queueType;
+ if (isUtilityDeadQueue) {
+ returnQueue = utilityQueue;
+ queueType = "utility";
+ } else {
+ returnQueue = indexQueue;
+ queueType = "index";
}
-
- // ack each message if making it to this point
- if( messagesToAck.size() > 0 ){
-
- if ( isUtilityQueue ){
- ackUtilityQueue( messagesToAck );
- }else{
- ack( messagesToAck );
+ List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
+ for (LegacyQueueMessage msg : successMessages) {
+ logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
+ }
+ int unsuccessfulMessagesSize = messages.size() - successMessages.size();
+ if (unsuccessfulMessagesSize > 0) {
+ // some messages couldn't be sent to originating queue, log
+ Set<String> successMessageIds = new HashSet<>();
+ for (LegacyQueueMessage msg : successMessages) {
+ String messageId = msg.getMessageId();
+ if (successMessageIds.contains(messageId)) {
+ logger.warn("Found duplicate messageId in returned messages: {}", messageId);
+ } else {
+ successMessageIds.add(messageId);
+ }
+ }
+ for (LegacyQueueMessage msg : messages) {
+ String messageId = msg.getMessageId();
+ if (!successMessageIds.contains(messageId)) {
+ logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody());
+ }
}
}
- return messagesToAck;
+ if (isUtilityDeadQueue) {
+ ackUtilityDeadQueue(successMessages);
+ } else {
+ ackIndexDeadQueue(successMessages);
+ }
+
+ return messages;
}
catch ( Exception e ) {
logger.error( "Failed to ack messages", e );
@@ -1042,7 +1256,7 @@
public String getQueueManagerClass() {
- return queue.getClass().getSimpleName();
+ return indexQueue.getClass().getSimpleName();
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 45dff1c..7eecf04 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -34,10 +34,16 @@
String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+ String DLQ_FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.deadletter.rejected_retry_wait";
+
String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
+ String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
+
+ String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
+
String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -50,13 +56,21 @@
/**
- * Set the amount of time to wait when Elasticsearch rejects a requests before
+ * Set the amount of time to wait when indexing or utility queue rejects a request before
* retrying. This provides simple back pressure. (in milliseconds)
*/
@Default("1000")
@Key(FAILURE_REJECTED_RETRY_WAIT_TIME)
long getFailureRetryTime();
+ /**
+ * Set the amount of time to wait when indexing or utility dead letter queue rejects a request before
+ * retrying. This provides simple back pressure. (in milliseconds)
+ */
+ @Default("2000")
+ @Key(DLQ_FAILURE_REJECTED_RETRY_WAIT_TIME)
+ long getDeadLetterFailureRetryTime();
+
/**
* Set the visibility timeout for messages received from the queue. (in milliseconds).
@@ -91,6 +105,20 @@
int getWorkerCountUtility();
/**
+ * The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue.
+ */
+ @Default("1")
+ @Key(ELASTICSEARCH_WORKER_COUNT_DEADLETTER)
+ int getWorkerCountDeadLetter();
+
+ /**
+ * The number of worker threads used to read dead messages from the utility dead letter queue and reload them into the utility queue.
+ */
+ @Default("1")
+ @Key(ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER)
+ int getWorkerCountUtilityDeadLetter();
+
+ /**
* Set the implementation to use for queuing.
* Valid values: TEST, LOCAL, SQS, SNS
* NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 40f8eea..0ebcc7b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -14,11 +14,14 @@
* http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html*
*/
+ String USERGRID_CLUSTER_REGION_LIST = "usergrid.cluster.region.list";
+ String USERGRID_CLUSTER_REGION_LOCAL = "usergrid.cluster.region.local";
+
/**
* Primary region to use for Amazon queues.
*/
- @Key( "usergrid.cluster.region.local" )
+ @Key( USERGRID_CLUSTER_REGION_LOCAL )
@Default("us-east-1")
String getPrimaryRegion();
@@ -34,7 +37,7 @@
* Comma-separated list of one or more Amazon regions to use if multiregion
* is set to true.
*/
- @Key( "usergrid.cluster.region.list" )
+ @Key( USERGRID_CLUSTER_REGION_LIST )
@Default("us-east-1")
String getRegionList();
@@ -103,4 +106,7 @@
@Default("false") // 30 seconds
boolean getQuorumFallback();
+ @Key("usergrid.queue.map.message.timeout")
+ @Default("900000") // 15 minutes
+ int getMapMessageTimeout();
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
index 117ce1c..f153610 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import java.util.Set;
/**ctor
* Manages queues for usergrid. Current implementation is sqs based.
@@ -68,6 +69,14 @@
void sendMessages(List bodies) throws IOException;
/**
+ * send messages to queue
+ * @param queueMessages
+ * @throws IOException
+ * @return set of receipt handles for successfully sent messages
+ */
+ List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException;
+
+ /**
* send a message to queue
* @param body
* @throws IOException
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
index 3856738..6882718 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
@@ -40,4 +40,9 @@
* Get the Usergrid region enum
*/
RegionImplementation getRegionImplementation();
+
+ /**
+ * Is this for the dead letter queue?
+ */
+ boolean isDeadLetterQueue();
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 7a793b4..cbba0b1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -27,9 +27,7 @@
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -92,6 +90,22 @@
}
}
+ @Override
+ public List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException {
+ List<LegacyQueueMessage> successMessages = new ArrayList<>();
+ for(LegacyQueueMessage queueMessage : queueMessages){
+ String uuid = UUID.randomUUID().toString();
+ try {
+ LegacyQueueMessage msg = new LegacyQueueMessage(uuid, "handle_" + uuid, queueMessage.getBody(), "put type here");
+ queue.put(msg);
+ successMessages.add(queueMessage);
+ }catch (InterruptedException ie){
+ throw new RuntimeException(ie);
+ }
+ }
+ return successMessages;
+ }
+
@Override
public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index b38eeb8..a485f55 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -27,18 +27,22 @@
import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl;
import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
import org.safehaus.guicyfig.GuicyFigModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Simple module for wiring our collection api
*/
public class QueueModule extends AbstractModule {
+ private static final Logger logger = LoggerFactory.getLogger( QueueModule.class );
private LegacyQueueManager.Implementation implementation;
public QueueModule( String queueManagerType ) {
+ logger.info("QueueManagerType={}", queueManagerType);
if ( "DISTRIBUTED_SNS".equals( queueManagerType ) ) {
this.implementation = LegacyQueueManager.Implementation.DISTRIBUTED_SNS;
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
index 9dd0421..a3e87e1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
@@ -23,10 +23,18 @@
private final String name;
private final RegionImplementation regionImpl;
+ private final boolean isDeadLetterQueue;
public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl) {
this.name = name;
this.regionImpl = regionImpl;
+ this.isDeadLetterQueue = false;
+ }
+
+ public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl, final boolean isDeadLetterQueue) {
+ this.name = name;
+ this.regionImpl = regionImpl;
+ this.isDeadLetterQueue = isDeadLetterQueue;
}
@Override
@@ -38,6 +46,9 @@
public RegionImplementation getRegionImplementation() {return regionImpl;}
@Override
+ public boolean isDeadLetterQueue() {return isDeadLetterQueue;}
+
+ @Override
public boolean equals( final Object o ) {
if ( this == o ) {
return true;
@@ -52,6 +63,13 @@
return false;
}
+ if ( regionImpl != queueScope.getRegionImplementation() ) {
+ return false;
+ }
+
+ if ( isDeadLetterQueue != queueScope.isDeadLetterQueue ) {
+ return false;
+ }
return true;
}
@@ -59,6 +77,11 @@
@Override
public int hashCode() {
- return name.hashCode();
+ String deadLetter = "REGULAR";
+ if (isDeadLetterQueue) {
+ deadLetter = "DEADLETTER";
+ }
+ String hashString = name + "|" + regionImpl.name() + "|" + deadLetter;
+ return hashString.hashCode();
}
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index b6ca429..e7fa47b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -195,6 +195,19 @@
@Override
+ public List<LegacyQueueMessage> sendQueueMessages( List<LegacyQueueMessage> queueMessages ) throws IOException {
+
+ List<LegacyQueueMessage> successMessages = new ArrayList<>();
+ for ( LegacyQueueMessage queueMessage : queueMessages ) {
+ sendMessageToLocalRegion( (Serializable)queueMessage.getBody() );
+ successMessages.add(queueMessage);
+ }
+
+ return successMessages;
+ }
+
+
+ @Override
public void deleteQueue() {
queueManager.deleteQueue( scope.getName() );
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index b2a7680..5b49bc7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -20,15 +20,12 @@
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.sqs.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,20 +51,6 @@
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
-import com.amazonaws.services.sqs.model.DeleteMessageRequest;
-import com.amazonaws.services.sqs.model.DeleteQueueRequest;
-import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import com.amazonaws.services.sqs.model.SendMessageRequest;
-import com.amazonaws.services.sqs.model.SendMessageResult;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
@@ -99,6 +82,7 @@
private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY );
private static final int MIN_CLIENT_SOCKET_TIMEOUT = 5000; // millis
private static final int MIN_VISIBILITY_TIMEOUT = 1; //seconds
+ private static final String DEAD_LETTER_QUEUE_SUFFIX = "_dead";
static {
@@ -133,6 +117,11 @@
queue = new LegacyQueue( result.getQueueUrl() );
}
catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+ if (queueName.endsWith(DEAD_LETTER_QUEUE_SUFFIX)) {
+ // don't auto-create dead letter queues
+ logger.error("failed to get dead letter queue from service, won't create", queueDoesNotExistException);
+ throw queueDoesNotExistException;
+ }
logger.error( "Queue {} does not exist, will create", queueName );
}
catch ( Exception e ) {
@@ -251,8 +240,14 @@
for ( String regionName : regionNames ) {
regionName = regionName.trim();
- Regions regions = Regions.fromName( regionName );
- Region region = Region.getRegion( regions );
+ Region region = null;
+ try {
+ Regions regions = Regions.fromName(regionName);
+ region = Region.getRegion(regions);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("INVALID REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_CLUSTER_REGION_LIST + ": " + regionName, e);
+ }
AmazonSQSClient sqsClient = createSQSClient( region );
AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
@@ -380,19 +375,26 @@
}
- private String getName() {
+ private String getName(final boolean isDeadLetter) {
String name =
clusterFig.getClusterName() + "_" + cassandraConfig.getApplicationKeyspace() + "_" + scope.getName() + "_"
+ scope.getRegionImplementation();
+ if (isDeadLetter) {
+ name += DEAD_LETTER_QUEUE_SUFFIX;
+ }
name = name.toLowerCase(); //user lower case values
Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );
return name;
}
+ private String getName() {
+ return getName(false);
+ }
+
public LegacyQueue getReadQueue() {
- String queueName = getName();
+ String queueName = getName(scope.isDeadLetterQueue());
try {
return readQueueUrlMap.get( queueName );
@@ -588,6 +590,43 @@
@Override
+ public List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException {
+
+ List<LegacyQueueMessage> successMessages = new ArrayList<>();
+
+ if ( sqs == null ) {
+ logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
+ return successMessages;
+ }
+
+ String url = getReadQueue().getUrl();
+
+ List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
+
+ for (LegacyQueueMessage queueMessage : queueMessages) {
+ entries.add(new SendMessageBatchRequestEntry(queueMessage.getMessageId(), queueMessage.getStringBody()));
+ }
+
+ SendMessageBatchResult result = sqs.sendMessageBatch(url, entries);
+
+ Set<String> successIDs = new HashSet<>();
+ logger.debug("sendQueueMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size());
+
+ for (SendMessageBatchResultEntry batchResultEntry : result.getSuccessful()) {
+ successIDs.add(batchResultEntry.getId());
+ }
+
+ for (LegacyQueueMessage queueMessage : queueMessages) {
+ if (successIDs.contains(queueMessage.getMessageId())) {
+ successMessages.add(queueMessage);
+ }
+ }
+
+ return successMessages;
+ }
+
+
+ @Override
public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
if ( sqsAsync == null ) {
@@ -663,6 +702,7 @@
DeleteMessageBatchResult result = sqs.deleteMessageBatch( request );
boolean successful = result.getFailed().size() <= 0;
+ logger.debug("commitMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size());
if ( !successful ) {
for ( BatchResultErrorEntry failed : result.getFailed() ) {
@@ -684,8 +724,14 @@
* Get the region
*/
private Region getRegion() {
- Regions regions = Regions.fromName(fig.getPrimaryRegion());
- return Region.getRegion(regions);
+ String regionName = fig.getPrimaryRegion();
+ try {
+ Regions regions = Regions.fromName(regionName);
+ return Region.getRegion(regions);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("INVALID PRIMARY REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_CLUSTER_REGION_LOCAL + ": " + regionName, e);
+ }
}
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
index 6d99250..6cdad3b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
@@ -19,6 +19,7 @@
import java.util.*;
+import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -442,6 +443,14 @@
item = em.createItemInCollection( context.getOwner(), context.getCollectionName(), getEntityType(),
p );
}
+ catch (DuplicateUniquePropertyExistsException e) {
+ // this is not an error (caller tried to create entity with a duplicate unique value)
+ logger.info("Entity [{}] unable to be created in collection [{}] due to [{} - {}]", p, context.getCollectionName(),
+ e.getClass().getSimpleName(), e.getMessage());
+
+ // would be nice if status for each batch entry was returned...
+ continue;
+ }
catch ( Exception e ) {
logger.error("Entity [{}] unable to be created in collection [{}] due to [{} - {}]", p, context.getCollectionName(),
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index c00575f..f7e107d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -24,7 +24,9 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.usergrid.persistence.queue.LegacyQueueManager;
import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
@@ -65,6 +67,12 @@
@Override
+ public List<LegacyQueueMessage> sendQueueMessages(final List<LegacyQueueMessage> queueMessages ) throws IOException {
+ return new ArrayList<>();
+ }
+
+
+ @Override
public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
}