1. Remove messages from dead letter queue if the message map entry was never received after a configurable timeout (15 minutes currently).
2. Log messages as they are moved from the dead letter queue back to the originating queue. Also log messages that are in dead letter queue and can't be moved to originating queue for some reason.
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 7c33969..530cf7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -724,10 +724,13 @@
                indexOperationMessage =
                    ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
 
+           } else if (System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getMapMessageTimeout()) {
+                // if esMapPersistence message hasn't been received yet, log and return (will be acked)
+                logger.error("ES map message never received, removing message from queue. indexBatchId={}", messageId);
+                return;
            } else {
-
-               throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
-
+                logger.warn("ES map message not received yet. indexBatchId={} elapsedTimeMsec={}", messageId, System.currentTimeMillis() - elasticsearchIndexEvent.getCreationTime());
+                throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
            }
 
         } else {
@@ -1105,14 +1108,38 @@
                                                  try {
                                                      // put the dead letter messages back in the appropriate queue
                                                      LegacyQueueManager returnQueue = null;
+                                                     String queueType;
                                                      if (isUtilityDeadQueue) {
-                                                         logger.warn("Utility dead queue message count: {}", messages.size());
                                                          returnQueue = utilityQueue;
+                                                         queueType = "utility";
                                                      } else {
-                                                         logger.warn("Index dead queue message count: {}", messages.size());
                                                          returnQueue = indexQueue;
+                                                         queueType = "index";
                                                      }
                                                      List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
+                                                     for (LegacyQueueMessage msg : successMessages) {
+                                                         logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
+                                                     }
+                                                     int unsuccessfulMessagesSize = messages.size() - successMessages.size();
+                                                     if (unsuccessfulMessagesSize > 0) {
+                                                         // some messages couldn't be sent to originating queue, log
+                                                         Set<String> successMessageIds = new HashSet<>();
+                                                         for (LegacyQueueMessage msg : successMessages) {
+                                                             String messageId = msg.getMessageId();
+                                                             if (successMessageIds.contains(messageId)) {
+                                                                 logger.warn("Found duplicate messageId in returned messages: {}", messageId);
+                                                             } else {
+                                                                 successMessageIds.add(messageId);
+                                                             }
+                                                         }
+                                                         for (LegacyQueueMessage msg : messages) {
+                                                             String messageId = msg.getMessageId();
+                                                             if (!successMessageIds.contains(messageId)) {
+                                                                 logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody());
+                                                             }
+                                                         }
+                                                     }
+
                                                      if (isUtilityDeadQueue) {
                                                          ackUtilityDeadQueue(successMessages);
                                                      } else {
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 6fe96dd..0ebcc7b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -106,4 +106,7 @@
     @Default("false") // 30 seconds
     boolean getQuorumFallback();
 
+    @Key("usergrid.queue.map.message.timeout")
+    @Default("900000") // 15 minutes
+    int getMapMessageTimeout();
 }