Merge commit 'refs/pull/573/head' of github.com:apache/usergrid
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 0d1a193..d505a96 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -305,10 +305,15 @@
 #
 #elasticsearch.refresh_search_max=10
 
-# Set the amount of time to wait when Elasticsearch rejects a requests before
+# Set the amount of time to wait when indexing or utility queue rejects a request before
 # retrying.  This provides simple backpressure. (in milliseconds)
 #
-#elasticsearch.rejected_retry_wait
+#elasticsearch.rejected_retry_wait=1000
+
+# Set the amount of time to wait when indexing or utility dead letter queue rejects a request before
+# retrying.  This provides simple backpressure. (in milliseconds)
+#
+#elasticsearch.deadletter.rejected_retry_wait=2000
 
 
 
@@ -332,18 +337,29 @@
 #
 #usergrid.use.default.queue=false
 
-# The number of worker threads used to read index write requests from the queue.
+# The number of worker threads used to read index write requests from the indexing queue.
 #
 #elasticsearch.worker_count=8
 
+# The number of worker threads used to read index write requests from the utility queue.
+#
+#elasticsearch.worker_count_utility=2
+
+# The number of worker threads used to read dead letter messages from the indexing dead letter queue.
+#
+#elasticsearch.worker_count_deadletter=1
+
+# The number of worker threads used to read dead letter messages from the utility dead letter queue.
+#
+#elasticsearch.worker_count_utility_deadletter=1
+
 # Set the number of worker threads used for processing index write requests to
 # Elasticsearch from the buffer.
 #
 #index.flush.workers=10
 
 # Set the implementation to use for queuing in Usergrid.
-# Valid values: TEST, LOCAL, SQS, SNS
-# NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
+# Valid values: LOCAL, DISTRIBUTED, DISTRIBUTED_SNS
 #
 #elasticsearch.queue_impl=LOCAL
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 75d2ce0..530cf7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -74,6 +74,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.commons.lang.StringUtils.indexOf;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 
 
@@ -103,10 +104,13 @@
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+    public static final String DEAD_LETTER_SUFFIX = "_dead";
 
 
-    private final LegacyQueueManager queue;
+    private final LegacyQueueManager indexQueue;
     private final LegacyQueueManager utilityQueue;
+    private final LegacyQueueManager indexQueueDead;
+    private final LegacyQueueManager utilityQueueDead;
     private final IndexProcessorFig indexProcessorFig;
     private final LegacyQueueFig queueFig;
     private final IndexProducer indexProducer;
@@ -128,6 +132,8 @@
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong counterUtility = new AtomicLong();
+    private final AtomicLong counterIndexDead = new AtomicLong();
+    private final AtomicLong counterUtilityDead = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -162,14 +168,22 @@
 
         this.rxTaskScheduler = rxTaskScheduler;
 
-        LegacyQueueScope queueScope =
+        LegacyQueueScope indexQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
 
         LegacyQueueScope utilityQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
 
-        this.queue = queueManagerFactory.getQueueManager(queueScope);
+        LegacyQueueScope indexQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
+
+        LegacyQueueScope utilityQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
+
+        this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
         this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+        this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
+        this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -201,7 +215,7 @@
 
         try {
             //signal to SQS
-            this.queue.sendMessageToLocalRegion( operation );
+            this.indexQueue.sendMessageToLocalRegion( operation );
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -218,7 +232,7 @@
             if (forUtilityQueue) {
                 this.utilityQueue.sendMessageToAllRegions(operation);
             } else {
-                this.queue.sendMessageToAllRegions(operation);
+                this.indexQueue.sendMessageToAllRegions(operation);
             }
         }
         catch ( IOException e ) {
@@ -237,7 +251,7 @@
             if( forUtilityQueue ){
                 this.utilityQueue.sendMessages(operations);
             }else{
-                this.queue.sendMessages(operations);
+                this.indexQueue.sendMessages(operations);
             }
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
@@ -264,7 +278,7 @@
         final Timer.Context timer = this.readTimer.time();
 
         try {
-            return queue.getMessages(MAX_TAKE, AsyncEvent.class);
+            return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
         }
         finally {
             //stop our timer
@@ -288,6 +302,38 @@
         }
     }
 
+    /**
+     * Take message from index dead letter queue
+     */
+    private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+        }
+        finally {
+            //stop our timer
+            timer.stop();
+        }
+    }
+
+    /**
+     * Take message from SQS utility dead letter queue
+     */
+    private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+        }
+        finally {
+            //stop our timer
+            timer.stop();
+        }
+    }
+
 
     /**
      * Ack message
@@ -300,7 +346,7 @@
 
             for ( LegacyQueueMessage legacyQueueMessage : messages ) {
                 try {
-                    queue.commitMessage( legacyQueueMessage );
+                    indexQueue.commitMessage( legacyQueueMessage );
                     inFlight.decrementAndGet();
 
                 } catch ( Throwable t ) {
@@ -331,6 +377,28 @@
     }
 
     /**
+     * ack messages in index dead letter queue
+     */
+    public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
+        try{
+            indexQueueDead.commitMessages( messages );
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }
+    }
+
+    /**
+     * ack messages in utility dead letter queue
+     */
+    public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
+        try{
+            utilityQueueDead.commitMessages( messages );
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }
+    }
+
+    /**
      * calls the event handlers and returns a result with information on whether
      * it needs to be ack'd and whether it needs to be indexed
      * @param messages
@@ -656,10 +724,13 @@
                indexOperationMessage =
                    ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
 
+           } else if (System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getMapMessageTimeout()) {
+                // if esMapPersistence message hasn't been received yet, log and return (will be acked)
+                logger.error("ES map message never received, removing message from queue. indexBatchId={}", messageId);
+                return;
            } else {
-
-               throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
-
+                logger.warn("ES map message not received yet. indexBatchId={} elapsedTimeMsec={}", messageId, System.currentTimeMillis() - elasticsearchIndexEvent.getCreationTime());
+                throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
            }
 
         } else {
@@ -744,7 +815,7 @@
 
     @Override
     public long getQueueDepth() {
-        return queue.getQueueDepth();
+        return indexQueue.getQueueDepth();
     }
 
     @Override
@@ -806,16 +877,26 @@
      * Loop through and start the workers
      */
     public void start() {
-        final int count = indexProcessorFig.getWorkerCount();
+        final int indexCount = indexProcessorFig.getWorkerCount();
         final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+        final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
+        final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
 
-        for (int i = 0; i < count; i++) {
+        for (int i = 0; i < indexCount; i++) {
             startWorker(QUEUE_NAME);
         }
 
         for (int i = 0; i < utilityCount; i++) {
             startWorker(QUEUE_NAME_UTILITY);
         }
+
+        for (int i = 0; i < indexDeadCount; i++) {
+            startDeadQueueWorker(QUEUE_NAME);
+        }
+
+        for (int i = 0; i < utilityDeadCount; i++) {
+            startDeadQueueWorker(QUEUE_NAME_UTILITY);
+        }
     }
 
 
@@ -840,47 +921,166 @@
             boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
 
             Observable<List<LegacyQueueMessage>> consumer =
+                Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
+                    @Override
+                    public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
+
+                        //name our thread so it's easy to see
+                        long threadNum = isUtilityQueue ?
+                            counterUtility.incrementAndGet() : counter.incrementAndGet();
+                        Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+
+                        List<LegacyQueueMessage> drainList = null;
+
+                        do {
+                            try {
+                                if ( isUtilityQueue ){
+                                    drainList = takeFromUtilityQueue();
+                                }else{
+                                    drainList = take();
+
+                                }
+                                //emit our list in it's entity to hand off to a worker pool
+                                subscriber.onNext(drainList);
+
+                                //take since  we're in flight
+                                inFlight.addAndGet( drainList.size() );
+
+                            } catch ( Throwable t ) {
+
+                                final long sleepTime = indexProcessorFig.getFailureRetryTime();
+
+                                // there might be an error here during tests, just clean the cache
+                                indexQueue.clearQueueNameCache();
+
+                                if ( t instanceof InvalidQueryException ) {
+
+                                    // don't fill up log with exceptions when keyspace and column
+                                    // families are not ready during bootstrap/setup
+                                    logger.warn( "Failed to dequeue due to '{}'. Sleeping for {} ms",
+                                        t.getMessage(), sleepTime );
+
+                                } else {
+                                    logger.error( "Failed to dequeue. Sleeping for {} ms", sleepTime, t);
+                                }
+
+                                if ( drainList != null ) {
+                                    inFlight.addAndGet( -1 * drainList.size() );
+                                }
+
+                                try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
+
+                                indexErrorCounter.inc();
+                            }
+                        }
+                        while ( true );
+                    }
+                } )        //this won't block our read loop, just reads and proceeds
+                    .flatMap( sqsMessages -> {
+
+                        //do this on a different schedule, and introduce concurrency
+                        // with flatmap for faster processing
+                        return Observable.just( sqsMessages )
+
+                            .map( messages -> {
+                                if ( messages == null || messages.size() == 0 ) {
+                                    // no messages came from the queue, move on
+                                    return null;
+                                }
+
+                                try {
+                                    // process the messages
+                                    List<IndexEventResult> indexEventResults =
+                                        callEventHandlers( messages );
+
+                                    // submit the processed messages to index producer
+                                    List<LegacyQueueMessage> messagesToAck =
+                                        submitToIndex( indexEventResults, isUtilityQueue );
+
+                                    if ( messagesToAck.size() < messages.size() ) {
+                                        logger.warn(
+                                            "Missing {} message(s) from index processing",
+                                            messages.size() - messagesToAck.size() );
+                                    }
+
+                                    // ack each message if making it to this point
+                                    if( messagesToAck.size() > 0 ){
+
+                                        if ( isUtilityQueue ){
+                                            ackUtilityQueue( messagesToAck );
+                                        }else{
+                                            ack( messagesToAck );
+                                        }
+                                    }
+
+                                    return messagesToAck;
+                                }
+                                catch ( Exception e ) {
+                                    logger.error( "Failed to ack messages", e );
+                                    return null;
+                                    //do not rethrow so we can process all of them
+                                }
+                            } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+
+                        //end flatMap
+                    }, indexProcessorFig.getEventConcurrencyFactor() );
+
+            //start in the background
+
+            final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
+
+            subscriptions.add(subscription);
+        }
+    }
+
+
+    private void startDeadQueueWorker(final String type) {
+        Preconditions.checkNotNull(type, "Worker type required");
+        synchronized (mutex) {
+
+            boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+
+            Observable<List<LegacyQueueMessage>> consumer =
                     Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
                         @Override
                         public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
                             //name our thread so it's easy to see
-                            long threadNum = isUtilityQueue ?
-                                counterUtility.incrementAndGet() : counter.incrementAndGet();
-                            Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+                            long threadNum = isUtilityDeadQueue ?
+                                counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
+                            Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum );
 
                             List<LegacyQueueMessage> drainList = null;
 
                             do {
                                 try {
-                                    if ( isUtilityQueue ){
-                                        drainList = takeFromUtilityQueue();
+                                    if ( isUtilityDeadQueue ){
+                                        drainList = takeFromUtilityDeadQueue();
                                     }else{
-                                        drainList = take();
-
+                                        drainList = takeFromIndexDeadQueue();
                                     }
                                     //emit our list in it's entity to hand off to a worker pool
-                                        subscriber.onNext(drainList);
+                                    subscriber.onNext(drainList);
 
                                     //take since  we're in flight
                                     inFlight.addAndGet( drainList.size() );
 
                                 } catch ( Throwable t ) {
 
-                                    final long sleepTime = indexProcessorFig.getFailureRetryTime();
+                                    final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
 
                                     // there might be an error here during tests, just clean the cache
-                                    queue.clearQueueNameCache();
+                                    indexQueueDead.clearQueueNameCache();
 
                                     if ( t instanceof InvalidQueryException ) {
 
                                         // don't fill up log with exceptions when keyspace and column
                                         // families are not ready during bootstrap/setup
-                                        logger.warn( "Failed to dequeue due to '{}'. Sleeping for {} ms",
+                                        logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
                                             t.getMessage(), sleepTime );
 
                                     } else {
-                                        logger.error( "Failed to dequeue. Sleeping for {} ms", sleepTime, t);
+                                        logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
                                     }
 
                                     if ( drainList != null ) {
@@ -888,8 +1088,6 @@
                                     }
 
                                     try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
-
-                                    indexErrorCounter.inc();
                                 }
                             }
                             while ( true );
@@ -908,31 +1106,47 @@
                                                  }
 
                                                  try {
-                                                     // process the messages
-                                                     List<IndexEventResult> indexEventResults =
-                                                         callEventHandlers( messages );
-
-                                                     // submit the processed messages to index producer
-                                                     List<LegacyQueueMessage> messagesToAck =
-                                                         submitToIndex( indexEventResults, isUtilityQueue );
-
-                                                     if ( messagesToAck.size() < messages.size() ) {
-                                                         logger.warn(
-                                                             "Missing {} message(s) from index processing",
-                                                            messages.size() - messagesToAck.size() );
+                                                     // put the dead letter messages back in the appropriate queue
+                                                     LegacyQueueManager returnQueue = null;
+                                                     String queueType;
+                                                     if (isUtilityDeadQueue) {
+                                                         returnQueue = utilityQueue;
+                                                         queueType = "utility";
+                                                     } else {
+                                                         returnQueue = indexQueue;
+                                                         queueType = "index";
                                                      }
-
-                                                     // ack each message if making it to this point
-                                                     if( messagesToAck.size() > 0 ){
-
-                                                         if ( isUtilityQueue ){
-                                                             ackUtilityQueue( messagesToAck );
-                                                         }else{
-                                                             ack( messagesToAck );
+                                                     List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
+                                                     for (LegacyQueueMessage msg : successMessages) {
+                                                         logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
+                                                     }
+                                                     int unsuccessfulMessagesSize = messages.size() - successMessages.size();
+                                                     if (unsuccessfulMessagesSize > 0) {
+                                                         // some messages couldn't be sent to originating queue, log
+                                                         Set<String> successMessageIds = new HashSet<>();
+                                                         for (LegacyQueueMessage msg : successMessages) {
+                                                             String messageId = msg.getMessageId();
+                                                             if (successMessageIds.contains(messageId)) {
+                                                                 logger.warn("Found duplicate messageId in returned messages: {}", messageId);
+                                                             } else {
+                                                                 successMessageIds.add(messageId);
+                                                             }
+                                                         }
+                                                         for (LegacyQueueMessage msg : messages) {
+                                                             String messageId = msg.getMessageId();
+                                                             if (!successMessageIds.contains(messageId)) {
+                                                                 logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody());
+                                                             }
                                                          }
                                                      }
 
-                                                     return messagesToAck;
+                                                     if (isUtilityDeadQueue) {
+                                                         ackUtilityDeadQueue(successMessages);
+                                                     } else {
+                                                         ackIndexDeadQueue(successMessages);
+                                                     }
+
+                                                     return messages;
                                                  }
                                                  catch ( Exception e ) {
                                                      logger.error( "Failed to ack messages", e );
@@ -1042,7 +1256,7 @@
 
     public String getQueueManagerClass() {
 
-        return queue.getClass().getSimpleName();
+        return indexQueue.getClass().getSimpleName();
 
     }
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 45dff1c..7eecf04 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -34,10 +34,16 @@
 
     String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
 
+    String DLQ_FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.deadletter.rejected_retry_wait";
+
     String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
 
+    String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
+
+    String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -50,13 +56,21 @@
 
 
     /**
-     * Set the amount of time to wait when Elasticsearch rejects a requests before
+     * Set the amount of time to wait when indexing or utility queue rejects a request before
      * retrying.  This provides simple back pressure. (in milliseconds)
      */
     @Default("1000")
     @Key(FAILURE_REJECTED_RETRY_WAIT_TIME)
     long getFailureRetryTime();
 
+    /**
+     * Set the amount of time to wait when indexing or utility dead letter queue rejects a request before
+     * retrying.  This provides simple back pressure. (in milliseconds)
+     */
+    @Default("2000")
+    @Key(DLQ_FAILURE_REJECTED_RETRY_WAIT_TIME)
+    long getDeadLetterFailureRetryTime();
+
 
     /**
      * Set the visibility timeout for messages received from the queue. (in milliseconds).
@@ -91,6 +105,20 @@
     int getWorkerCountUtility();
 
     /**
+     * The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DEADLETTER)
+    int getWorkerCountDeadLetter();
+
+    /**
+     * The number of worker threads used to read dead messages from the utility dead letter queue and reload them into the utility queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER)
+    int getWorkerCountUtilityDeadLetter();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 40f8eea..0ebcc7b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -14,11 +14,14 @@
      * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html*
      */
 
+    String USERGRID_CLUSTER_REGION_LIST = "usergrid.cluster.region.list";
+    String USERGRID_CLUSTER_REGION_LOCAL = "usergrid.cluster.region.local";
+
 
     /**
      * Primary region to use for Amazon queues.
      */
-    @Key( "usergrid.cluster.region.local" )
+    @Key( USERGRID_CLUSTER_REGION_LOCAL )
     @Default("us-east-1")
     String getPrimaryRegion();
 
@@ -34,7 +37,7 @@
      * Comma-separated list of one or more Amazon regions to use if multiregion
      * is set to true.
      */
-    @Key( "usergrid.cluster.region.list" )
+    @Key( USERGRID_CLUSTER_REGION_LIST )
     @Default("us-east-1")
     String getRegionList();
 
@@ -103,4 +106,7 @@
     @Default("false") // 30 seconds
     boolean getQuorumFallback();
 
+    @Key("usergrid.queue.map.message.timeout")
+    @Default("900000") // 15 minutes
+    int getMapMessageTimeout();
 }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
index 117ce1c..f153610 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.Set;
 
 /**ctor
  * Manages queues for usergrid.  Current implementation is sqs based.
@@ -68,6 +69,14 @@
     void sendMessages(List bodies) throws IOException;
 
     /**
+     * send messages to queue
+     * @param queueMessages
+     * @throws IOException
+     * @return set of receipt handles for successfully sent messages
+     */
+    List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException;
+
+    /**
      * send a message to queue
      * @param body
      * @throws IOException
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
index 3856738..6882718 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
@@ -40,4 +40,9 @@
      * Get the Usergrid region enum
      */
     RegionImplementation getRegionImplementation();
+
+    /**
+     * Is this for the dead letter queue?
+     */
+    boolean isDeadLetterQueue();
 }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 7a793b4..cbba0b1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -27,9 +27,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -92,6 +90,22 @@
         }
     }
 
+    @Override
+    public List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException {
+        List<LegacyQueueMessage> successMessages = new ArrayList<>();
+        for(LegacyQueueMessage queueMessage : queueMessages){
+            String uuid = UUID.randomUUID().toString();
+            try {
+                LegacyQueueMessage msg = new LegacyQueueMessage(uuid, "handle_" + uuid, queueMessage.getBody(), "put type here");
+                queue.put(msg);
+                successMessages.add(queueMessage);
+            }catch (InterruptedException ie){
+                throw new RuntimeException(ie);
+            }
+        }
+        return successMessages;
+    }
+
 
     @Override
     public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index b38eeb8..a485f55 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -27,18 +27,22 @@
 import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl;
 import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Simple module for wiring our collection api
  */
 public class QueueModule extends AbstractModule {
+    private static final Logger logger = LoggerFactory.getLogger( QueueModule.class );
 
     private LegacyQueueManager.Implementation implementation;
 
 
     public QueueModule( String queueManagerType ) {
 
+        logger.info("QueueManagerType={}", queueManagerType);
         if ( "DISTRIBUTED_SNS".equals( queueManagerType ) ) {
             this.implementation = LegacyQueueManager.Implementation.DISTRIBUTED_SNS;
         }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
index 9dd0421..a3e87e1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
@@ -23,10 +23,18 @@
 
     private final String name;
     private final RegionImplementation regionImpl;
+    private final boolean isDeadLetterQueue;
 
     public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl) {
         this.name = name;
         this.regionImpl = regionImpl;
+        this.isDeadLetterQueue = false;
+    }
+
+    public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl, final boolean isDeadLetterQueue) {
+        this.name = name;
+        this.regionImpl = regionImpl;
+        this.isDeadLetterQueue = isDeadLetterQueue;
     }
 
     @Override
@@ -38,6 +46,9 @@
     public RegionImplementation getRegionImplementation() {return regionImpl;}
 
     @Override
+    public boolean isDeadLetterQueue() {return isDeadLetterQueue;}
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;
@@ -52,6 +63,13 @@
             return false;
         }
 
+        if ( regionImpl != queueScope.getRegionImplementation() ) {
+            return false;
+        }
+
+        if ( isDeadLetterQueue != queueScope.isDeadLetterQueue ) {
+            return false;
+        }
 
         return true;
     }
@@ -59,6 +77,11 @@
 
     @Override
     public int hashCode() {
-        return name.hashCode();
+        String deadLetter = "REGULAR";
+        if (isDeadLetterQueue) {
+            deadLetter = "DEADLETTER";
+        }
+        String hashString = name + "|" + regionImpl.name() + "|" + deadLetter;
+        return hashString.hashCode();
     }
 }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index b6ca429..e7fa47b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -195,6 +195,19 @@
 
 
     @Override
+    public List<LegacyQueueMessage> sendQueueMessages( List<LegacyQueueMessage> queueMessages ) throws IOException {
+
+        List<LegacyQueueMessage> successMessages = new ArrayList<>();
+        for ( LegacyQueueMessage queueMessage : queueMessages ) {
+            sendMessageToLocalRegion( (Serializable)queueMessage.getBody() );
+            successMessages.add(queueMessage);
+        }
+
+        return successMessages;
+    }
+
+
+    @Override
     public void deleteQueue() {
         queueManager.deleteQueue( scope.getName() );
     }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index b2a7680..5b49bc7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -20,15 +20,12 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.sqs.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,20 +51,6 @@
 import com.amazonaws.services.sns.model.SubscribeResult;
 import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
-import com.amazonaws.services.sqs.model.DeleteMessageRequest;
-import com.amazonaws.services.sqs.model.DeleteQueueRequest;
-import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import com.amazonaws.services.sqs.model.SendMessageRequest;
-import com.amazonaws.services.sqs.model.SendMessageResult;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -99,6 +82,7 @@
     private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY );
     private static final int MIN_CLIENT_SOCKET_TIMEOUT = 5000; // millis
     private static final int MIN_VISIBILITY_TIMEOUT = 1; //seconds
+    private static final String DEAD_LETTER_QUEUE_SUFFIX = "_dead";
 
     static {
 
@@ -133,6 +117,11 @@
                     queue = new LegacyQueue( result.getQueueUrl() );
                 }
                 catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+                    if (queueName.endsWith(DEAD_LETTER_QUEUE_SUFFIX)) {
+                        // don't auto-create dead letter queues
+                        logger.error("failed to get dead letter queue from service, won't create", queueDoesNotExistException);
+                        throw queueDoesNotExistException;
+                    }
                     logger.error( "Queue {} does not exist, will create", queueName );
                 }
                 catch ( Exception e ) {
@@ -251,8 +240,14 @@
             for ( String regionName : regionNames ) {
 
                 regionName = regionName.trim();
-                Regions regions = Regions.fromName( regionName );
-                Region region = Region.getRegion( regions );
+                Region region = null;
+                try {
+                    Regions regions = Regions.fromName(regionName);
+                    region = Region.getRegion(regions);
+                }
+                catch (IllegalArgumentException e) {
+                    throw new IllegalArgumentException("INVALID REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_CLUSTER_REGION_LIST + ": " + regionName, e);
+                }
 
                 AmazonSQSClient sqsClient = createSQSClient( region );
                 AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
@@ -380,19 +375,26 @@
     }
 
 
-    private String getName() {
+    private String getName(final boolean isDeadLetter) {
         String name =
             clusterFig.getClusterName() + "_" + cassandraConfig.getApplicationKeyspace() + "_" + scope.getName() + "_"
                 + scope.getRegionImplementation();
+        if (isDeadLetter) {
+            name += DEAD_LETTER_QUEUE_SUFFIX;
+        }
         name = name.toLowerCase(); //user lower case values
         Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );
 
         return name;
     }
 
+    private String getName() {
+        return getName(false);
+    }
+
 
     public LegacyQueue getReadQueue() {
-        String queueName = getName();
+        String queueName = getName(scope.isDeadLetterQueue());
 
         try {
             return readQueueUrlMap.get( queueName );
@@ -588,6 +590,43 @@
 
 
     @Override
+    public List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException {
+
+        List<LegacyQueueMessage> successMessages = new ArrayList<>();
+
+        if ( sqs == null ) {
+            logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
+            return successMessages;
+        }
+
+        String url = getReadQueue().getUrl();
+
+        List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
+
+        for (LegacyQueueMessage queueMessage : queueMessages) {
+            entries.add(new SendMessageBatchRequestEntry(queueMessage.getMessageId(), queueMessage.getStringBody()));
+        }
+
+        SendMessageBatchResult result = sqs.sendMessageBatch(url, entries);
+
+        Set<String> successIDs = new HashSet<>();
+        logger.debug("sendQueueMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size());
+
+        for (SendMessageBatchResultEntry batchResultEntry : result.getSuccessful()) {
+            successIDs.add(batchResultEntry.getId());
+        }
+
+        for (LegacyQueueMessage queueMessage : queueMessages) {
+            if (successIDs.contains(queueMessage.getMessageId())) {
+                successMessages.add(queueMessage);
+            }
+        }
+
+        return successMessages;
+    }
+
+
+    @Override
     public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
 
         if ( sqsAsync == null ) {
@@ -663,6 +702,7 @@
         DeleteMessageBatchResult result = sqs.deleteMessageBatch( request );
 
         boolean successful = result.getFailed().size() <= 0;
+        logger.debug("commitMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size());
 
         if ( !successful ) {
             for ( BatchResultErrorEntry failed : result.getFailed() ) {
@@ -684,8 +724,14 @@
      * Get the region
      */
     private Region getRegion() {
-        Regions regions = Regions.fromName(fig.getPrimaryRegion());
-        return Region.getRegion(regions);
+        String regionName = fig.getPrimaryRegion();
+        try {
+            Regions regions = Regions.fromName(regionName);
+            return Region.getRegion(regions);
+        }
+        catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("INVALID PRIMARY REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_CLUSTER_REGION_LOCAL + ": " + regionName, e);
+        }
     }
 
 
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
index 6d99250..6cdad3b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
@@ -19,6 +19,7 @@
 
 import java.util.*;
 
+import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -442,6 +443,14 @@
                     item = em.createItemInCollection( context.getOwner(), context.getCollectionName(), getEntityType(),
                             p );
                 }
+                catch (DuplicateUniquePropertyExistsException e) {
+                    // this is not an error (caller tried to create entity with a duplicate unique value)
+                    logger.info("Entity [{}] unable to be created in collection [{}] due to [{} - {}]", p, context.getCollectionName(),
+                        e.getClass().getSimpleName(), e.getMessage());
+
+                    // would be nice if status for each batch entry was returned...
+                    continue;
+                }
                 catch ( Exception e ) {
 
                     logger.error("Entity [{}] unable to be created in collection [{}] due to [{} - {}]", p, context.getCollectionName(),
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index c00575f..f7e107d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -24,7 +24,9 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.usergrid.persistence.queue.LegacyQueueManager;
 import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
@@ -65,6 +67,12 @@
 
 
     @Override
+    public List<LegacyQueueMessage> sendQueueMessages(final List<LegacyQueueMessage> queueMessages ) throws IOException {
+        return new ArrayList<>();
+    }
+
+
+    @Override
     public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
 
     }