Merge branch 'collectionClearJob'
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d505a96..bf1f5e7 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -112,7 +112,7 @@
 
 # Read timeout for an individual request (in millseconds)
 #
-#cassandra.timeout=5000
+#cassandra.timeout=20000
 
 
 # Set the credentials used for Cassandra, if any.
@@ -786,6 +786,7 @@
 # instead, use character class ([.] instead of backslash-period)
 usergrid.org.config.property.regex=usergrid[.]view[.].*
 
+usergrid.viewable.loginEndpoint=http://localhost:8080
 
 
 ###########################  Usergrid Email Templates  ########################
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ec6b775..a0748e6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -140,6 +140,7 @@
 
 
         bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
+        bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class );
 
         bind( ExportService.class ).to( ExportServiceImpl.class );
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index cdb4fc7..68c4ef0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -2471,10 +2471,9 @@
 
         final Entity entity;
 
-        //this is the fall back, why isn't this writt
         if ( entityType == null ) {
              return null;
-//            throw new EntityNotFoundException( String.format( "Counld not find type for uuid {}", uuid ) );
+//            throw new EntityNotFoundException( String.format( "Could not find type for uuid {}", uuid ) );
         }
 
         entity = get( new SimpleEntityRef( entityType, uuid ) );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index cec7258..bad5b2c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -487,7 +487,7 @@
         // evict app Id from cache
         applicationIdCache.evictAppId(appName);
 
-        logger.info("Initialized application {}", appName);
+        logger.info("Initialized application {}, uuid {}", appName, appInfo.getUuid().toString());
         return appInfo;
     }
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 872ffbb..46c7a1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -39,7 +39,7 @@
     int sleep();
 
     @Key( "usergrid.entityManager.enable_deindex_on_update" )
-    @Default( "true" )
+    @Default( "false" )
     boolean getDeindexOnUpdate();
 
     /**
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
new file mode 100644
index 0000000..4b91e17
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+public enum AsyncEventQueueType {
+    REGULAR ("regular"), UTILITY("utility"), DELETE("delete");
+
+    private String displayName;
+    AsyncEventQueueType(String displayName) {
+        this.displayName = displayName;
+    }
+
+    @Override
+    public String toString() {
+        return displayName;
+    }
+}
+
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index cab4e3e..04eaf4c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
+import org.apache.usergrid.corepersistence.index.CollectionDeleteAction;
 import org.apache.usergrid.corepersistence.index.ReIndexAction;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -33,7 +34,7 @@
 /**
  * Low level queue service for events in the entity.  These events are fire and forget, and will always be asynchronous
  */
-public interface AsyncEventService extends ReIndexAction {
+public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction {
 
 
     /**
@@ -84,9 +85,9 @@
     /**
      *
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue);
+    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType);
 
     /**
      * @param applicationScope
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 530cf7d..3d06cae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -59,6 +59,7 @@
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -74,9 +75,6 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.commons.lang.StringUtils.indexOf;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
 
 /**
  * TODO, this whole class is becoming a nightmare.
@@ -104,13 +102,15 @@
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
-    public static final String DEAD_LETTER_SUFFIX = "_dead";
+    public static final String QUEUE_NAME_DELETE = "delete";
 
 
     private final LegacyQueueManager indexQueue;
     private final LegacyQueueManager utilityQueue;
+    private final LegacyQueueManager deleteQueue;
     private final LegacyQueueManager indexQueueDead;
     private final LegacyQueueManager utilityQueueDead;
+    private final LegacyQueueManager deleteQueueDead;
     private final IndexProcessorFig indexProcessorFig;
     private final LegacyQueueFig queueFig;
     private final IndexProducer indexProducer;
@@ -132,8 +132,10 @@
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong counterUtility = new AtomicLong();
+    private final AtomicLong counterDelete = new AtomicLong();
     private final AtomicLong counterIndexDead = new AtomicLong();
     private final AtomicLong counterUtilityDead = new AtomicLong();
+    private final AtomicLong counterDeleteDead = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -174,16 +176,24 @@
         LegacyQueueScope utilityQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
 
+        LegacyQueueScope deleteQueueScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL);
+
         LegacyQueueScope indexQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
 
         LegacyQueueScope utilityQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
 
+        LegacyQueueScope deleteQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL, true);
+
         this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
         this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+        this.deleteQueue = queueManagerFactory.getQueueManager(deleteQueueScope);
         this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
         this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
+        this.deleteQueueDead = queueManagerFactory.getQueueManager(deleteQueueDeadScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -206,34 +216,90 @@
         start();
     }
 
+    private String getQueueName(AsyncEventQueueType queueType) {
+        switch (queueType) {
+            case REGULAR:
+                return QUEUE_NAME;
+
+            case UTILITY:
+                return QUEUE_NAME_UTILITY;
+
+            case DELETE:
+                return QUEUE_NAME_DELETE;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType) {
+        return getQueue(queueType, false);
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? indexQueueDead : indexQueue;
+
+            case UTILITY:
+                return isDeadQueue ? utilityQueueDead : utilityQueue;
+
+            case DELETE:
+                return isDeadQueue ? deleteQueueDead : deleteQueue;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+    private AtomicLong getCounter(AsyncEventQueueType queueType, boolean isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? counterIndexDead : counter;
+
+            case UTILITY:
+                return isDeadQueue ? counterUtilityDead : counterUtility;
+
+            case DELETE:
+                return isDeadQueue ? counterDeleteDead : counterDelete;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+
+
+
 
     /**
      * Offer the EntityIdScope to SQS
      */
     private void offer(final Serializable operation) {
+        offer(operation, AsyncEventQueueType.REGULAR);
+    }
+
+    private void offer(final Serializable operation, AsyncEventQueueType queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            this.indexQueue.sendMessageToLocalRegion( operation );
+            getQueue(queueType).sendMessageToLocalRegion(operation);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
             timer.stop();
         }
+
     }
 
 
-    private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
+    private void offerTopic(final Serializable operation, AsyncEventQueueType queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            if (forUtilityQueue) {
-                this.utilityQueue.sendMessageToAllRegions(operation);
-            } else {
-                this.indexQueue.sendMessageToAllRegions(operation);
-            }
+            getQueue(queueType).sendMessageToAllRegions(operation);
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -244,15 +310,11 @@
     }
 
 
-    private void offerBatch(final List operations, boolean forUtilityQueue){
+    private void offerBatch(final List operations, AsyncEventQueueType queueType){
         final Timer.Context timer = this.writeTimer.time();
         try {
             //signal to SQS
-            if( forUtilityQueue ){
-                this.utilityQueue.sendMessages(operations);
-            }else{
-                this.indexQueue.sendMessages(operations);
-            }
+            getQueue(queueType).sendMessages(operations);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -260,25 +322,16 @@
         }
     }
 
-    private void offerBatchToUtilityQueue(final List operations){
-        try {
-            //signal to SQS
-            this.utilityQueue.sendMessages(operations);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        }
-    }
-
 
     /**
      * Take message
      */
-    private List<LegacyQueueMessage> take() {
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, boolean isDeadQueue) {
 
         final Timer.Context timer = this.readTimer.time();
 
         try {
-            return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+            return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, AsyncEvent.class);
         }
         finally {
             //stop our timer
@@ -286,52 +339,8 @@
         }
     }
 
-    /**
-     * Take message from SQS utility queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from index dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from SQS utility dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) {
+        return take(queueType, false);
     }
 
 
@@ -362,38 +371,15 @@
         }
     }
 
-
-    /**
-     * calls the event handlers and returns a result with information on whether
-     * it needs to be ack'd and whether it needs to be indexed
-     * Ack message in SQS
-     */
-    public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueue.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+    public void ack(final List<LegacyQueueMessage> messages, AsyncEventQueueType queueType, boolean isDeadQueue) {
+        if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) {
+            // different functionality
+            ack(messages);
         }
-    }
-
-    /**
-     * ack messages in index dead letter queue
-     */
-    public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            indexQueueDead.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+        try {
+            getQueue(queueType, isDeadQueue).commitMessages( messages );
         }
-    }
-
-    /**
-     * ack messages in utility dead letter queue
-     */
-    public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueueDead.commitMessages( messages );
-        }catch(Exception e){
+        catch (Exception e) {
             throw new RuntimeException("Unable to ack messages", e);
         }
     }
@@ -532,11 +518,13 @@
             applicationScope);
 
 
-        logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
-            applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
+                applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
+        }
 
         offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
     }
 
 
@@ -545,8 +533,10 @@
                                        final Entity entity, long updatedAfter) {
 
 
-        logger.trace("Offering EntityIndexEvent for {}:{}",
-            entity.getId().getUuid(), entity.getId().getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering EntityIndexEvent for {}:{}",
+                entity.getId().getUuid(), entity.getId().getType());
+        }
 
         offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
             new EntityIdScope(applicationScope, entity.getId()), updatedAfter));
@@ -587,8 +577,10 @@
                              final Entity entity,
                              final Edge newEdge) {
 
-        logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
-            newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
+                newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+        }
 
         offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));
 
@@ -622,11 +614,13 @@
     public void queueDeleteEdge(final ApplicationScope applicationScope,
                                 final Edge edge) {
 
-        logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
-            edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
+                edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
+        }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
-        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
+        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE );
     }
 
     private IndexOperationMessage  handleEdgeDelete(final LegacyQueueMessage message) {
@@ -650,8 +644,7 @@
         }
 
         // default this observable's return to empty index operation message if nothing is emitted
-        return eventBuilder.buildDeleteEdge(applicationScope, edge)
-            .toBlocking().lastOrDefault(new IndexOperationMessage());
+        return eventBuilder.buildDeleteEdge(applicationScope, edge);
 
     }
 
@@ -660,9 +653,9 @@
     /**
      * Queue up an indexOperationMessage for multi region execution
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) {
+    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType) {
 
         // don't try to produce something with nothing
         if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -686,9 +679,11 @@
 
         //send to the topic so all regions index the batch
 
-        logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId);
+        }
 
-        offerTopic( elasticsearchIndexEvent, forUtilityQueue );
+        offerTopic( elasticsearchIndexEvent, queueType );
     }
 
     private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
@@ -760,11 +755,13 @@
 
         // queue the de-index of old versions to the topic so cleanup happens in all regions
 
-        logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
-            applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
+                applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
+        }
 
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
-            new EntityIdScope( applicationScope, entityId), markedVersion), false);
+            new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE );
 
     }
 
@@ -821,10 +818,13 @@
     @Override
     public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
 
-        logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+        }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
-        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
+        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ),
+            AsyncEventQueueType.DELETE );
     }
 
     private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
@@ -840,21 +840,15 @@
         final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
         final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
         final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
+        final boolean isCollectionDelete = entityDeleteEvent.isCollectionDelete();
+        final long updatedBefore = entityDeleteEvent.getUpdatedBefore();
 
-        if (logger.isDebugEnabled())
-            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Deleting entity id from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore {}",
+                applicationScope, entityId, isCollectionDelete, updatedBefore);
+        }
 
-        final EventBuilderImpl.EntityDeleteResults
-            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
-
-
-        // Delete the entities and remove from graph separately
-        entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
-
-        entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
-
-        // default this observable's return to empty index operation message if nothing is emitted
-        return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(new IndexOperationMessage());
+        return eventBuilder.buildEntityDelete( applicationScope, entityId, isCollectionDelete, updatedBefore );
 
     }
 
@@ -879,23 +873,40 @@
     public void start() {
         final int indexCount = indexProcessorFig.getWorkerCount();
         final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+        final int deleteCount = indexProcessorFig.getWorkerCountDelete();
         final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
         final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
+        final int deleteDeadCount = indexProcessorFig.getWorkerCountDeleteDeadLetter();
+        logger.info("Starting queue workers for indexing: index={} indexDLQ={} utility={} utilityDLQ={} delete={} deleteDLQ={}",
+            indexCount, indexDeadCount, utilityCount, utilityDeadCount, deleteCount, deleteDeadCount);
 
         for (int i = 0; i < indexCount; i++) {
-            startWorker(QUEUE_NAME);
+            startWorker(AsyncEventQueueType.REGULAR);
         }
 
         for (int i = 0; i < utilityCount; i++) {
-            startWorker(QUEUE_NAME_UTILITY);
+            startWorker(AsyncEventQueueType.UTILITY);
         }
 
-        for (int i = 0; i < indexDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME);
+        for (int i = 0; i < deleteCount; i++) {
+            startWorker(AsyncEventQueueType.DELETE);
         }
 
-        for (int i = 0; i < utilityDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME_UTILITY);
+        if( indexQueue instanceof SNSQueueManagerImpl) {
+            logger.info("Queue manager implementation supports dead letters, start dead letter queue workers.");
+            for (int i = 0; i < indexDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.REGULAR);
+            }
+
+            for (int i = 0; i < utilityDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.UTILITY);
+            }
+
+            for (int i = 0; i < deleteDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.DELETE);
+            }
+        }else{
+            logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue workers.");
         }
     }
 
@@ -914,11 +925,10 @@
     }
 
 
-    private void startWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startWorker(final AsyncEventQueueType queueType) {
         synchronized (mutex) {
 
-            boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+            String type = getQueueName(queueType);
 
             Observable<List<LegacyQueueMessage>> consumer =
                 Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@@ -926,20 +936,15 @@
                     public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
                         //name our thread so it's easy to see
-                        long threadNum = isUtilityQueue ?
-                            counterUtility.incrementAndGet() : counter.incrementAndGet();
-                        Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+                        long threadNum = getCounter(queueType, false).incrementAndGet();
+                        Thread.currentThread().setName( "QueueConsumer_" + type + "_" + threadNum );
 
                         List<LegacyQueueMessage> drainList = null;
 
                         do {
                             try {
-                                if ( isUtilityQueue ){
-                                    drainList = takeFromUtilityQueue();
-                                }else{
-                                    drainList = take();
+                                drainList = take(queueType);
 
-                                }
                                 //emit our list in it's entity to hand off to a worker pool
                                 subscriber.onNext(drainList);
 
@@ -995,7 +1000,7 @@
 
                                     // submit the processed messages to index producer
                                     List<LegacyQueueMessage> messagesToAck =
-                                        submitToIndex( indexEventResults, isUtilityQueue );
+                                        submitToIndex( indexEventResults, queueType );
 
                                     if ( messagesToAck.size() < messages.size() ) {
                                         logger.warn(
@@ -1005,12 +1010,7 @@
 
                                     // ack each message if making it to this point
                                     if( messagesToAck.size() > 0 ){
-
-                                        if ( isUtilityQueue ){
-                                            ackUtilityQueue( messagesToAck );
-                                        }else{
-                                            ack( messagesToAck );
-                                        }
+                                        ack(messagesToAck, queueType, false);
                                     }
 
                                     return messagesToAck;
@@ -1034,129 +1034,112 @@
     }
 
 
-    private void startDeadQueueWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startDeadQueueWorker(final AsyncEventQueueType queueType) {
+        String type = getQueueName(queueType);
         synchronized (mutex) {
 
-            boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
-
             Observable<List<LegacyQueueMessage>> consumer =
-                    Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
-                        @Override
-                        public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
+                Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
+                    @Override
+                    public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
-                            //name our thread so it's easy to see
-                            long threadNum = isUtilityDeadQueue ?
-                                counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
-                            Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum );
+                        //name our thread so it's easy to see
+                        long threadNum = getCounter(queueType, true).incrementAndGet();
+                        Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type + "_" + threadNum );
 
-                            List<LegacyQueueMessage> drainList = null;
+                        List<LegacyQueueMessage> drainList = null;
 
-                            do {
-                                try {
-                                    if ( isUtilityDeadQueue ){
-                                        drainList = takeFromUtilityDeadQueue();
-                                    }else{
-                                        drainList = takeFromIndexDeadQueue();
-                                    }
-                                    //emit our list in it's entity to hand off to a worker pool
-                                    subscriber.onNext(drainList);
+                        do {
+                            try {
+                                drainList = take(queueType, true);
 
-                                    //take since  we're in flight
-                                    inFlight.addAndGet( drainList.size() );
+                                //emit our list in it's entity to hand off to a worker pool
+                                subscriber.onNext(drainList);
 
-                                } catch ( Throwable t ) {
+                                //take since  we're in flight
+                                inFlight.addAndGet( drainList.size() );
 
-                                    final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
+                            } catch ( Throwable t ) {
 
-                                    // there might be an error here during tests, just clean the cache
-                                    indexQueueDead.clearQueueNameCache();
+                                final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
 
-                                    if ( t instanceof InvalidQueryException ) {
+                                // there might be an error here during tests, just clean the cache
+                                indexQueueDead.clearQueueNameCache();
 
-                                        // don't fill up log with exceptions when keyspace and column
-                                        // families are not ready during bootstrap/setup
-                                        logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
-                                            t.getMessage(), sleepTime );
+                                if ( t instanceof InvalidQueryException ) {
 
-                                    } else {
-                                        logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
-                                    }
+                                    // don't fill up log with exceptions when keyspace and column
+                                    // families are not ready during bootstrap/setup
+                                    logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
+                                        t.getMessage(), sleepTime );
 
-                                    if ( drainList != null ) {
-                                        inFlight.addAndGet( -1 * drainList.size() );
-                                    }
-
-                                    try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
+                                } else {
+                                    logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
                                 }
+
+                                if ( drainList != null ) {
+                                    inFlight.addAndGet( -1 * drainList.size() );
+                                }
+
+                                try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
                             }
-                            while ( true );
                         }
-                    } )        //this won't block our read loop, just reads and proceeds
-                        .flatMap( sqsMessages -> {
+                        while ( true );
+                    }
+                } )        //this won't block our read loop, just reads and proceeds
+                    .flatMap( sqsMessages -> {
 
-                            //do this on a different schedule, and introduce concurrency
-                            // with flatmap for faster processing
-                            return Observable.just( sqsMessages )
+                        //do this on a different schedule, and introduce concurrency
+                        // with flatmap for faster processing
+                        return Observable.just( sqsMessages )
 
-                                             .map( messages -> {
-                                                 if ( messages == null || messages.size() == 0 ) {
-                                                     // no messages came from the queue, move on
-                                                     return null;
-                                                 }
+                            .map( messages -> {
+                                if ( messages == null || messages.size() == 0 ) {
+                                    // no messages came from the queue, move on
+                                    return null;
+                                }
 
-                                                 try {
-                                                     // put the dead letter messages back in the appropriate queue
-                                                     LegacyQueueManager returnQueue = null;
-                                                     String queueType;
-                                                     if (isUtilityDeadQueue) {
-                                                         returnQueue = utilityQueue;
-                                                         queueType = "utility";
-                                                     } else {
-                                                         returnQueue = indexQueue;
-                                                         queueType = "index";
-                                                     }
-                                                     List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
-                                                     for (LegacyQueueMessage msg : successMessages) {
-                                                         logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
-                                                     }
-                                                     int unsuccessfulMessagesSize = messages.size() - successMessages.size();
-                                                     if (unsuccessfulMessagesSize > 0) {
-                                                         // some messages couldn't be sent to originating queue, log
-                                                         Set<String> successMessageIds = new HashSet<>();
-                                                         for (LegacyQueueMessage msg : successMessages) {
-                                                             String messageId = msg.getMessageId();
-                                                             if (successMessageIds.contains(messageId)) {
-                                                                 logger.warn("Found duplicate messageId in returned messages: {}", messageId);
-                                                             } else {
-                                                                 successMessageIds.add(messageId);
-                                                             }
-                                                         }
-                                                         for (LegacyQueueMessage msg : messages) {
-                                                             String messageId = msg.getMessageId();
-                                                             if (!successMessageIds.contains(messageId)) {
-                                                                 logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody());
-                                                             }
-                                                         }
-                                                     }
+                                try {
+                                    // put the dead letter messages back in the appropriate queue
+                                    LegacyQueueManager returnQueue = getQueue(queueType, false);
 
-                                                     if (isUtilityDeadQueue) {
-                                                         ackUtilityDeadQueue(successMessages);
-                                                     } else {
-                                                         ackIndexDeadQueue(successMessages);
-                                                     }
+                                    List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
+                                    for (LegacyQueueMessage msg : successMessages) {
+                                        logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), msg.getMessageId(), msg.getStringBody());
+                                    }
+                                    int unsuccessfulMessagesSize = messages.size() - successMessages.size();
+                                    if (unsuccessfulMessagesSize > 0) {
+                                        // some messages couldn't be sent to originating queue, log
+                                        Set<String> successMessageIds = new HashSet<>();
+                                        for (LegacyQueueMessage msg : successMessages) {
+                                            String messageId = msg.getMessageId();
+                                            if (successMessageIds.contains(messageId)) {
+                                                logger.warn("Found duplicate messageId in returned messages: {}", messageId);
+                                            } else {
+                                                successMessageIds.add(messageId);
+                                            }
+                                        }
+                                        for (LegacyQueueMessage msg : messages) {
+                                            String messageId = msg.getMessageId();
+                                            if (!successMessageIds.contains(messageId)) {
+                                                logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), msg.getType(), messageId, msg.getStringBody());
+                                            }
+                                        }
+                                    }
 
-                                                     return messages;
-                                                 }
-                                                 catch ( Exception e ) {
-                                                     logger.error( "Failed to ack messages", e );
-                                                     return null;
-                                                     //do not rethrow so we can process all of them
-                                                 }
-                                             } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+                                    ack(successMessages, queueType, true);
 
-                            //end flatMap
-                        }, indexProcessorFig.getEventConcurrencyFactor() );
+                                    return messages;
+                                }
+                                catch ( Exception e ) {
+                                    logger.error( "Failed to ack messages", e );
+                                    return null;
+                                    //do not rethrow so we can process all of them
+                                }
+                            } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+
+                        //end flatMap
+                    }, indexProcessorFig.getEventConcurrencyFactor() );
 
             //start in the background
 
@@ -1170,7 +1153,7 @@
      * Submit results to index and return the queue messages to be ack'd
      *
      */
-    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
+    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, AsyncEventQueueType queueType) {
 
         // if nothing came back then return empty list
         if(indexEventResults==null){
@@ -1197,7 +1180,7 @@
             // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
-       queueIndexOperationMessage(combined, forUtilityQueue);
+        queueIndexOperationMessage(combined, queueType);
 
         return queueMessages;
     }
@@ -1208,10 +1191,10 @@
             new EntityIndexOperation( applicationScope, id, updatedSince);
 
         queueIndexOperationMessage(
-            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
+            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), AsyncEventQueueType.REGULAR );
     }
 
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType) {
 
         final List<EntityIndexEvent> batch = new ArrayList<>();
         edges.forEach(e -> {
@@ -1222,9 +1205,25 @@
 
         });
 
-        logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
+        }
 
-        offerBatch( batch, forUtilityQueue );
+        offerBatch( batch, queueType );
+    }
+
+    public void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType) {
+
+        final List<EntityDeleteEvent> batch = new ArrayList<>();
+        edges.forEach(e -> {
+
+            //change to id scope to avoid serialization issues
+            batch.add(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
+                new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), true, updatedBefore));
+
+        });
+
+        offerBatch(batch, queueType);
     }
 
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 4db9f4b..4bb6312 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -27,6 +27,7 @@
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -54,7 +55,7 @@
      * @param edge
      * @return
      */
-    Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
+    IndexOperationMessage buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
 
     /**
      * Return a bin with 2 observable streams for entity delete.
@@ -62,7 +63,18 @@
      * @param entityId
      * @return
      */
-    EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+    IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId);
+
+    /**
+     * Return a bin with 2 observable streams for entity delete.
+     * @param applicationScope
+     * @param entityId
+     * @param isCollectionDelete
+     * @param updatedBefore
+     * @return
+     */
+    IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId,
+                                            boolean isCollectionDelete, long updatedBefore);
 
 
 
@@ -94,17 +106,17 @@
 
 
 
-        private final Observable<Id> compactedNode;
+        private final Observable<MarkedEdge> deletedEdges;
 
 
 
 
         public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable,
                                     final Observable<List<MvccLogEntry>> entitiesDeleted,
-                                    final Observable<Id> compactedNode) {
+                                    final Observable<MarkedEdge> deletedEdges) {
             this.indexOperationMessageObservable = indexOperationMessageObservable;
             this.entitiesDeleted = entitiesDeleted;
-            this.compactedNode = compactedNode;
+            this.deletedEdges = deletedEdges;
         }
 
 
@@ -116,8 +128,8 @@
             return entitiesDeleted;
         }
 
-        public Observable<Id> getCompactedNode() {
-            return compactedNode;
+        public Observable<MarkedEdge> getEdgesDeleted() {
+            return deletedEdges;
         }
 
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index bbdce5a..7c72b72 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -24,12 +24,13 @@
 import java.util.List;
 import java.util.UUID;
 
+import org.antlr.misc.Graph;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -82,7 +83,7 @@
 
         if (logger.isDebugEnabled()) {
             logger.debug("Indexing  in app scope {} with entity {} and new edge {}",
-                    applicationScope, entity, newEdge);
+                applicationScope, entity, newEdge);
         }
 
         return indexService.indexEdge( applicationScope, entity, newEdge );
@@ -90,15 +91,38 @@
 
 
     @Override
-    public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge
+    public IndexOperationMessage buildDeleteEdge( final ApplicationScope applicationScope, final Edge
         edge ) {
         if (logger.isDebugEnabled()) {
             logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
         }
 
         final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-        return gm.deleteEdge( edge )
-            .flatMap( deletedEdge -> indexService.deleteIndexEdge( applicationScope, deletedEdge ));
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final IndexOperationMessage combined = new IndexOperationMessage();
+
+        gm.deleteEdge( edge )
+            .doOnNext( deletedEdge -> {
+
+                logger.debug("Processing deleted edge for de-indexing {}", deletedEdge);
+
+                // get ALL versions of the target node as any connection from this source node needs to be removed
+                ecm.getVersionsFromMaxToMin(deletedEdge.getTargetNode(), UUIDUtils.newTimeUUID())
+                    .doOnNext(mvccLogEntry -> {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", deletedEdge.getTargetNode(), mvccLogEntry);
+                        }
+                        combined.ingest(
+                            indexService
+                                .deIndexEdge(applicationScope, deletedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
+                                .toBlocking().lastOrDefault(new IndexOperationMessage()));
+
+                    }).toBlocking().lastOrDefault(null);
+
+            }).toBlocking().lastOrDefault(null);
+
+        return combined;
     }
 
 
@@ -106,43 +130,135 @@
     //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
 
     @Override
-    public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+    public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+        return buildEntityDelete(applicationScope, entityId, false, Long.MAX_VALUE);
+    }
+
+    @Override
+    public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId,
+                                                   final boolean isCollectionDelete, final long updatedBefore) {
+
         if (logger.isDebugEnabled()) {
-            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+            logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore={}",
+                applicationScope, entityId, isCollectionDelete, updatedBefore);
         }
 
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
-        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+        final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
 
-        //TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED)
+        boolean deleteEntity = ecm.load(entityId).
+            map(entity -> {
+                final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
 
-        MvccLogEntry mostRecentlyMarked = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
-            .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
+                boolean willDelete = false;
+                if ( modified == null ) {
+                    // We don't have a modified field, so we can't check, so delete it
+                    willDelete = true;
+                } else if (modified.getValue() <= updatedBefore) {
+                    willDelete = true;
+                }
 
-        // De-indexing and entity deletes don't check log entries.  We must do that first. If no DELETED logs, then
-        // return an empty observable as our no-op.
-        Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
-        Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
+                if (isCollectionDelete && willDelete) {
+                    // need to mark for deletion
+                    ecm.mark(entityId, null)
+                        .mergeWith(gm.markNode(entityId, CpNamingUtils.createGraphOperationTimestamp()))
+                        .toBlocking().last();
+                }
 
-        if(mostRecentlyMarked != null){
+                return willDelete;
+            }).toBlocking().firstOrDefault(true);
 
-            // fetch entity versions to be de-index by looking in cassandra
-            deIndexObservable =
-                indexService.deIndexEntity(applicationScope, entityId, mostRecentlyMarked.getVersion(),
-                    getVersionsOlderThanMarked(ecm, entityId, mostRecentlyMarked.getVersion()));
-
-            ecmDeleteObservable =
-                ecm.getVersionsFromMaxToMin( entityId, mostRecentlyMarked.getVersion() )
-                    .filter( mvccLogEntry->
-                        mvccLogEntry.getVersion().timestamp() <= mostRecentlyMarked.getVersion().timestamp() )
-                    .buffer( serializationFig.getBufferSize() )
-                    .doOnNext( buffer -> ecm.delete( buffer ) );
+        if (!deleteEntity) {
+            return new IndexOperationMessage();
         }
 
-        // Graph compaction checks the versions inside compactNode, just build this up for the caller to subscribe to
-        final Observable<Id> graphCompactObservable = gm.compactNode(entityId);
 
-        return new EntityDeleteResults( deIndexObservable, ecmDeleteObservable, graphCompactObservable );
+        MvccLogEntry mostRecentToDelete =
+            ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() )
+                .toBlocking()
+                .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
+
+//        logger.info("mostRecent stage={} entityId={} version={} state={}",
+//            mostRecentToDelete.getStage().name(), mostRecentToDelete.getEntityId(),
+//            mostRecentToDelete.getVersion().toString(), mostRecentToDelete.getState().name());
+
+        if (mostRecentToDelete == null) {
+            logger.info("No entity versions to delete for id {}", entityId.toString());
+        }
+        // if nothing is marked, then abort
+        if(mostRecentToDelete == null){
+            return new IndexOperationMessage();
+        }
+
+        final List<MvccLogEntry> logEntries = new ArrayList<>();
+        Observable<MvccLogEntry> mvccLogEntryListObservable =
+            ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() );
+        mvccLogEntryListObservable
+            .filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
+            .buffer( serializationFig.getBufferSize() )
+            .doOnNext( buffer -> ecm.delete( buffer ) )
+            .doOnNext(mvccLogEntries -> {
+                logEntries.addAll(mvccLogEntries);
+            }).toBlocking().lastOrDefault(null);
+
+        //logger.info("logEntries size={}", logEntries.size());
+
+        IndexOperationMessage combined = new IndexOperationMessage();
+
+        // do the edge deletes and build up de-index messages for each edge deleted
+        // assume we have "server1" and "region1" nodes in the graph with the following relationships (edges/connections):
+        //
+        // region1  -- zzzconnzzz|has -->  server1
+        // server1  -- zzzconnzzz|in  -->  region1
+        //
+        // there will always be a relationship from the appId to each entity based on the entity type (collection):
+        //
+        // application -- zzzcollzzz|servers --> server1
+        // application -- zzzcollzzz|regions --> region1
+        //
+        // When deleting either "server1" or "region1" entity, the connections should get deleted and de-indexed along
+        // with the entry for the entity itself in the collection. The above example should have at minimum 3 things to
+        // be de-indexed. There may be more as either "server1" or "region1" could have multiple versions.
+        //
+        // Further comments using the example of deleting "server1" from the above example.
+        gm.compactNode(entityId).doOnNext(markedEdge -> {
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Processing deleted edge for de-indexing {}", markedEdge);
+            }
+
+            // if the edge was for a connection where the entity to be deleted is the source node, we need to load
+            // the target node's versions so that all versions of connections to that entity can be de-indexed
+            // server1  -- zzzconnzzz|in  -->  region1
+            if(!markedEdge.getTargetNode().getType().equals(entityId.getType())){
+
+                // get ALL versions of the target node as any connection from this source node needs to be removed
+                ecm.getVersionsFromMaxToMin( markedEdge.getTargetNode(), UUIDUtils.newTimeUUID() )
+                    .doOnNext(mvccLogEntry -> {
+                        logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, mvccLogEntry);
+                        combined.ingest(
+                            indexService
+                                .deIndexEdge(applicationScope, markedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
+                                .toBlocking().lastOrDefault(new IndexOperationMessage()));
+
+                    }).toBlocking().lastOrDefault(null);
+
+            }else {
+
+                // for each version of the entity being deleted, de-index the connections where the entity is the target
+                // node ( application -- zzzcollzzz|servers --> server1 ) or (region1  -- zzzconnzzz|has -->  server1)
+                logEntries.forEach(logEntry -> {
+                    logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, logEntry);
+                    combined.ingest(
+                        indexService
+                            .deIndexEdge(applicationScope, markedEdge, logEntry.getEntityId(), logEntry.getVersion())
+                            .toBlocking().lastOrDefault(new IndexOperationMessage()));
+                });
+            }
+
+        }).toBlocking().lastOrDefault(null);
+
+        return combined;
     }
 
     @Override
@@ -185,22 +301,22 @@
 
 
         return indexService.deIndexOldVersions( applicationScope, entityId,
-            getVersionsOlderThanMarked(ecm, entityId, markedVersion), markedVersion);
+            getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion));
 
     }
 
 
-    private List<UUID> getVersionsOlderThanMarked( final EntityCollectionManager ecm,
-                                                   final Id entityId, final UUID markedVersion ){
+    private List<UUID> getVersionsOlderThanOrEqualToMarked(final EntityCollectionManager ecm,
+                                                           final Id entityId, final UUID markedVersion ){
 
         final List<UUID> versions = new ArrayList<>();
 
-        // only take last 5 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
+        // only take last 100 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
         // clusters that do not have this in-line cleanup
         ecm.getVersionsFromMaxToMin( entityId, markedVersion)
-            .take(5)
+            .take(100)
             .forEach( mvccLogEntry -> {
-                if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) {
+                if ( mvccLogEntry.getVersion().timestamp() <= markedVersion.timestamp() ) {
                     versions.add(mvccLogEntry.getVersion());
                 }
 
@@ -210,4 +326,17 @@
         return versions;
     }
 
+    private List<UUID> getAllVersions( final EntityCollectionManager ecm,
+                                       final Id entityId ) {
+
+        final List<UUID> versions = new ArrayList<>();
+
+        ecm.getVersionsFromMaxToMin(entityId, UUIDUtils.newTimeUUID())
+            .forEach( mvccLogEntry -> {
+                versions.add(mvccLogEntry.getVersion());
+            });
+
+        return versions;
+    }
+
 }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index 01d2ba8..1589632 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -24,6 +24,7 @@
 
 /**
  * Event that will signal to finish the actual delete (post-mark delete) for an Entity
+ * It will mark if this is for a collection delete
  */
 public final class EntityDeleteEvent extends AsyncEvent {
 
@@ -31,17 +32,41 @@
     @JsonProperty
     protected EntityIdScope entityIdScope;
 
+    @JsonProperty
+    private long updatedBefore;
+
+    @JsonProperty
+    private boolean isCollectionDelete;
+
     public EntityDeleteEvent() {
         super();
     }
 
     public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) {
         super(sourceRegion);
-        this.entityIdScope =  entityIdScope;
+        this.entityIdScope = entityIdScope;
+        this.updatedBefore = Long.MAX_VALUE;
+        this.isCollectionDelete = false;
+    }
+
+    public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope,
+                             boolean isCollectionDelete, long updatedBefore) {
+        super(sourceRegion);
+        this.entityIdScope = entityIdScope;
+        this.updatedBefore = updatedBefore;
+        this.isCollectionDelete = isCollectionDelete;
     }
 
 
     public EntityIdScope getEntityIdScope() {
         return entityIdScope;
     }
+
+    public long getUpdatedBefore() {
+        return updatedBefore;
+    }
+
+    public boolean isCollectionDelete() {
+        return isCollectionDelete;
+    }
 }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
new file mode 100644
index 0000000..7bad06b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.List;
+
+
+/**
+ * Callback to perform a collection delete operation based on an scope during bulk collection delete operations
+ */
+public interface CollectionDeleteAction {
+
+    /**
+     * Delete a batch list of entities.
+     * @param edges
+     * @param updatedBefore
+     * @param queueType
+     */
+    void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType);
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
new file mode 100644
index 0000000..4abdfea
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A builder interface to build our collection delete request
+ */
+public interface CollectionDeleteRequestBuilder {
+
+    /**
+     * Set the application id
+     */
+    CollectionDeleteRequestBuilder withApplicationId(final UUID applicationId);
+
+    /**
+     * Set the collection name.
+     * @param collectionName
+     * @return
+     */
+    CollectionDeleteRequestBuilder withCollection(final String collectionName);
+
+    /**
+     * Set our cursor to resume processing
+     * @param cursor
+     * @return
+     */
+    CollectionDeleteRequestBuilder withCursor(final String cursor);
+
+
+    CollectionDeleteRequestBuilder withDelay(int delayTimer, TimeUnit timeUnit);
+
+    /**
+     * Set the timestamp to delete entities updated <= this timestamp
+     * @param timestamp
+     * @return
+     */
+    CollectionDeleteRequestBuilder withEndTimestamp(final Long timestamp);
+
+
+    Optional<Integer> getDelayTimer();
+
+    Optional<TimeUnit> getTimeUnitOptional();
+
+    /**
+     * Get the application scope
+     * @return
+     */
+    Optional<ApplicationScope> getApplicationScope();
+
+    /**
+     * Get the collection name
+     * @return
+     */
+    Optional<String> getCollectionName();
+
+    /**
+     * Get the cursor
+     * @return
+     */
+    Optional<String> getCursor();
+
+    /**
+     * Get the latest timestamp to delete
+     * @return
+     */
+    Optional<Long> getEndTimestamp();
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
new file mode 100644
index 0000000..890b770
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * collection delete service request builder
+ */
+public class CollectionDeleteRequestBuilderImpl implements CollectionDeleteRequestBuilder {
+
+    private Optional<UUID> withApplicationId = Optional.absent();
+    private Optional<String> withCollectionName = Optional.absent();
+    private Optional<String> cursor = Optional.absent();
+    private Optional<Long> endTimestamp = Optional.absent();
+    private Optional<Integer> delayTimer = Optional.absent();
+    private Optional<TimeUnit> timeUnitOptional = Optional.absent();
+
+
+    /***
+     *
+     * @param applicationId The application id
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withApplicationId( final UUID applicationId ) {
+        this.withApplicationId = Optional.fromNullable( applicationId );
+        return this;
+    }
+
+
+    /**
+     * the collection name
+     * @param collectionName
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withCollection( final String collectionName ) {
+        this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() ) );
+        return this;
+    }
+
+
+    /**
+     * The cursor
+     * @param cursor
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withCursor( final String cursor ) {
+        this.cursor = Optional.fromNullable( cursor );
+        return this;
+    }
+
+
+    /**
+     * Determines whether we should tack on a delay for collection delete and for how long if we do. Also
+     * allowed to specify how throttled back it should be.
+     * @param delayTimer
+     * @param timeUnit
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withDelay( final int delayTimer, final TimeUnit timeUnit ){
+        this.delayTimer = Optional.fromNullable( delayTimer );
+        this.timeUnitOptional = Optional.fromNullable( timeUnit );
+
+        return this;
+    }
+
+
+    /**
+     * Set end timestamp in epoch time.  Only entities created before this time will be processed for deletion
+     * @param timestamp
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withEndTimestamp( final Long timestamp ) {
+        this.endTimestamp = Optional.fromNullable( timestamp );
+        return this;
+    }
+
+
+    @Override
+    public Optional<Integer> getDelayTimer() {
+        return delayTimer;
+    }
+
+    @Override
+    public Optional<TimeUnit> getTimeUnitOptional() {
+        return timeUnitOptional;
+    }
+
+
+    @Override
+    public Optional<ApplicationScope> getApplicationScope() {
+
+        if ( this.withApplicationId.isPresent() ) {
+            return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
+        }
+
+        return Optional.absent();
+    }
+
+
+    @Override
+    public Optional<String> getCollectionName() {
+        return withCollectionName;
+    }
+
+
+    @Override
+    public Optional<String> getCursor() {
+        return cursor;
+    }
+
+
+    @Override
+    public Optional<Long> getEndTimestamp() {
+        return endTimestamp;
+    }
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
new file mode 100644
index 0000000..c939dd3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+/**
+ * An interface for re-indexing all entities in an application
+ */
+public interface CollectionDeleteService {
+
+
+    /**
+     * Perform a collection delete via service
+     *
+     * @param collectionDeleteRequestBuilder The builder to build the request
+     */
+    CollectionDeleteStatus deleteCollection(final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder);
+
+
+    /**
+     * Generate a build for the collection delete
+     */
+    CollectionDeleteRequestBuilder getBuilder();
+
+
+    /**
+     * Get the status of a job
+     * @param jobId The jobId returned during the collection delete
+     * @return
+     */
+    CollectionDeleteStatus getStatus(final String jobId);
+
+
+    /**
+     * The response when requesting a collection delete operation
+     */
+    public class CollectionDeleteStatus {
+        final String jobId;
+        final Status status;
+        final long numberProcessed;
+        final long lastUpdated;
+
+
+        public CollectionDeleteStatus(final String jobId, final Status status, final long numberProcessed,
+                                      final long lastUpdated ) {
+            this.jobId = jobId;
+            this.status = status;
+            this.numberProcessed = numberProcessed;
+            this.lastUpdated = lastUpdated;
+        }
+
+
+        /**
+         * Get the jobId used to resume this operation
+         */
+        public String getJobId() {
+            return jobId;
+        }
+
+
+        /**
+         * Get the last updated time, as a long
+         * @return
+         */
+        public long getLastUpdated() {
+            return lastUpdated;
+        }
+
+
+        /**
+         * Get the number of records processed
+         * @return
+         */
+        public long getNumberProcessed() {
+            return numberProcessed;
+        }
+
+
+        /**
+         * Get the status
+         * @return
+         */
+        public Status getStatus() {
+            return status;
+        }
+    }
+
+    enum Status{
+        STARTED, INPROGRESS, COMPLETE, UNKNOWN;
+    }
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
new file mode 100644
index 0000000..7b3e324
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
+import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
+import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.utils.InflectionUtils;
+import org.apache.usergrid.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+import static com.google.common.base.Optional.fromNullable;
+
+
+@Singleton
+public class CollectionDeleteServiceImpl implements CollectionDeleteService {
+
+    private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteServiceImpl.class );
+
+    private static final MapScope RESUME_MAP_SCOPE =
+        new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "collectiondeleteresume" );
+
+    //Keep cursors to resume collection delete for 10 days.
+    private static final int CURSOR_TTL = 60 * 60 * 24 * 10;
+
+    private static final String MAP_CURSOR_KEY = "cursor";
+    private static final String MAP_COUNT_KEY = "count";
+    private static final String MAP_STATUS_KEY = "status";
+    private static final String MAP_UPDATED_KEY = "lastUpdated";
+
+
+    private final AllApplicationsObservable allApplicationsObservable;
+    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final AllEntityIdsObservable allEntityIdsObservable;
+    private final IndexProcessorFig indexProcessorFig;
+    private final MapManager mapManager;
+    private final MapManagerFactory mapManagerFactory;
+    private final AsyncEventService indexService;
+    private final EntityIndexFactory entityIndexFactory;
+    private final CollectionSettingsFactory collectionSettingsFactory;
+
+
+    @Inject
+    public CollectionDeleteServiceImpl(final EntityIndexFactory entityIndexFactory,
+                                       final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                       final AllEntityIdsObservable allEntityIdsObservable,
+                                       final MapManagerFactory mapManagerFactory,
+                                       final AllApplicationsObservable allApplicationsObservable,
+                                       final IndexProcessorFig indexProcessorFig,
+                                       final CollectionSettingsFactory collectionSettingsFactory,
+                                       final AsyncEventService indexService ) {
+        this.entityIndexFactory = entityIndexFactory;
+        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.allEntityIdsObservable = allEntityIdsObservable;
+        this.allApplicationsObservable = allApplicationsObservable;
+        this.indexProcessorFig = indexProcessorFig;
+        this.indexService = indexService;
+        this.collectionSettingsFactory = collectionSettingsFactory;
+        this.mapManagerFactory = mapManagerFactory;
+        this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
+    }
+
+
+    //TODO: optional delay, param.
+    @Override
+    public CollectionDeleteStatus deleteCollection( final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder) {
+
+        final AtomicInteger count = new AtomicInteger();
+
+        final Optional<EdgeScope> cursor = parseCursor( collectionDeleteRequestBuilder.getCursor() );
+
+        final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
+
+        final Optional<Integer> delayTimer = collectionDeleteRequestBuilder.getDelayTimer();
+
+        final Optional<TimeUnit> timeUnitOptional = collectionDeleteRequestBuilder.getTimeUnitOptional();
+
+        Optional<ApplicationScope> appId = collectionDeleteRequestBuilder.getApplicationScope();
+
+        Preconditions.checkArgument(collectionDeleteRequestBuilder.getCollectionName().isPresent(),
+            "You must specify a collection name");
+        String collectionName = collectionDeleteRequestBuilder.getCollectionName().get();
+
+        Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
+            "You cannot specify an app id and a cursor.  When resuming with cursor you must omit the appid." );
+        Preconditions.checkArgument( cursor.isPresent() || appId.isPresent(),
+            "Either application ID or cursor is required.");
+
+        ApplicationScope applicationScope;
+        if (appId.isPresent()) {
+            applicationScope = appId.get();
+        } else { // cursor is present
+            applicationScope = cursor.get().getApplicationScope();
+        }
+
+
+        final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+        // default to current time
+        final long endTimestamp = collectionDeleteRequestBuilder.getEndTimestamp().or( System.currentTimeMillis() );
+
+        String pluralizedCollectionName = InflectionUtils.pluralize(CpNamingUtils.getNameFromEdgeType(collectionName));
+
+        CollectionSettings collectionSettings =
+            collectionSettingsFactory.getInstance(new CollectionSettingsScopeImpl(applicationScope.getApplication(), pluralizedCollectionName));
+
+        Optional<Map<String, Object>> existingSettings =
+            collectionSettings.getCollectionSettings( pluralizedCollectionName );
+
+        if ( existingSettings.isPresent() ) {
+
+            Map jsonMapData = existingSettings.get();
+
+            jsonMapData.put( "lastCollectionClear", Instant.now().toEpochMilli() );
+
+            collectionSettings.putCollectionSettings(
+                pluralizedCollectionName, JsonUtils.mapToJsonString(jsonMapData ) );
+        }
+
+        allEntityIdsObservable.getEdgesToEntities( Observable.just(applicationScope),
+            fromNullable(collectionName), cursorSeek.getSeekValue() )
+            .buffer( indexProcessorFig.getCollectionDeleteBufferSize())
+            .doOnNext( edgeScopes -> {
+                logger.info("Sending batch of {} to be deleted.", edgeScopes.size());
+                indexService.deleteBatch(edgeScopes, endTimestamp, AsyncEventQueueType.DELETE);
+                count.addAndGet(edgeScopes.size() );
+                if( edgeScopes.size() > 0 ) {
+                    writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
+                }
+                writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
+            .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+            .subscribeOn( Schedulers.io() ).subscribe();
+
+
+        return new CollectionDeleteStatus( jobId, Status.STARTED, 0, 0 );
+    }
+
+
+    @Override
+    public CollectionDeleteRequestBuilder getBuilder() {
+        return new CollectionDeleteRequestBuilderImpl();
+    }
+
+
+    @Override
+    public CollectionDeleteStatus getStatus( final String jobId ) {
+        Preconditions.checkNotNull( jobId, "jobId must not be null" );
+        return getCollectionDeleteResponse( jobId );
+    }
+
+
+    /**
+     * Get the resume edge scope
+     *
+     * @param edgeScope The optional edge scope from the cursor
+     */
+    private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) {
+
+
+        if ( edgeScope.isPresent() ) {
+            return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) );
+        }
+
+        return new CursorSeek<>( Optional.absent() );
+    }
+
+
+    /**
+     * Swap our cursor for an optional edgescope
+     */
+    private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) {
+
+        if ( !cursor.isPresent() ) {
+            return Optional.absent();
+        }
+
+        //get our cursor
+        final String persistedCursor = mapManager.getString( cursor.get() );
+
+        if ( persistedCursor == null ) {
+            return Optional.absent();
+        }
+
+        final JsonNode node = CursorSerializerUtil.fromString( persistedCursor );
+
+        final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() );
+
+        return Optional.of( edgeScope );
+    }
+
+
+    /**
+     * Write the cursor state to the map in cassandra
+     */
+    private void writeCursorState( final String jobId, final EdgeScope edge ) {
+
+        final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge );
+
+        final String serializedState = CursorSerializerUtil.asString( node );
+
+        mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, CURSOR_TTL);
+    }
+
+
+    /**
+     * Write our state meta data into cassandra so everyone can see it
+     * @param jobId
+     * @param status
+     * @param processedCount
+     * @param lastUpdated
+     */
+    private void writeStateMeta( final String jobId, final Status status, final long processedCount,
+                                 final long lastUpdated ) {
+
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+                    jobId, status, processedCount, lastUpdated);
+        }
+
+        mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
+        mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+        mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+    }
+
+
+    /**
+     * Get the index response from the jobId
+     * @param jobId
+     * @return
+     */
+    private CollectionDeleteStatus getCollectionDeleteResponse( final String jobId ) {
+
+        final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
+
+        if(stringStatus == null){
+           return new CollectionDeleteStatus( jobId, Status.UNKNOWN, 0, 0 );
+        }
+
+        final Status status = Status.valueOf( stringStatus );
+
+        final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
+        final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
+
+        return new CollectionDeleteStatus( jobId, status, processedCount, lastUpdated );
+    }
+}
+
+
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
index 921777a..76f0b4b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
@@ -20,7 +20,6 @@
 
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.utils.JsonUtils;
 import org.slf4j.Logger;
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7eecf04..948e106 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,10 +40,14 @@
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE = "elasticsearch.worker_count_delete";
+
     String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER = "elasticsearch.worker_count_delete_deadletter";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -52,6 +56,8 @@
 
     String REINDEX_BUFFER_SIZE = "elasticsearch.reindex.buffer_size";
 
+    String COLLECTION_DELETE_BUFFER_SIZE = "elasticsearch.collection_delete.buffer_size";
+
     String REINDEX_CONCURRENCY_FACTOR = "elasticsearch.reindex.concurrency.factor";
 
 
@@ -105,6 +111,13 @@
     int getWorkerCountUtility();
 
     /**
+     * The number of worker threads used to read delete requests from the queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE)
+    int getWorkerCountDelete();
+
+    /**
      * The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue.
      */
     @Default("1")
@@ -119,6 +132,13 @@
     int getWorkerCountUtilityDeadLetter();
 
     /**
+     * The number of worker threads used to read dead messages from the delete dead letter queue and reload them into the delete queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER)
+    int getWorkerCountDeleteDeadLetter();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
@@ -139,6 +159,13 @@
     int getReindexConcurrencyFactor();
 
     /**
+     * Number of parallel buffers during collection delete
+     */
+    @Default("500")
+    @Key(COLLECTION_DELETE_BUFFER_SIZE)
+    int getCollectionDeleteBufferSize();
+
+    /**
      * Flag to resolve the LOCAL queue implementation service synchronously.
      */
     @Default("false")
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index b989a9c..58d470a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -69,18 +69,17 @@
      */
     Observable<IndexOperationMessage> deleteIndexEdge(final ApplicationScope applicationScope, final Edge edge);
 
-
     /**
-     * De-index all documents with the specified entityId and versions provided.  This will also remove any documents
-     * where the entity is a source/target node ( index docs where this entityId is a part of connections).
-     *
+     * Delete an index edge from the specified scope for a specific entity version
      * @param applicationScope
+     * @param edge
      * @param entityId
-     * @param markedVersion
+     * @param entityVersion
      * @return
      */
-    Observable<IndexOperationMessage> deIndexEntity(final ApplicationScope applicationScope, final Id entityId,
-                                                    final UUID markedVersion, final List<UUID> allVersionsBeforeMarked);
+    Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
+                                                  final Id entityId, final UUID entityVersion);
+
 
 
     /**
@@ -88,10 +87,9 @@
      *
      * @param applicationScope
      * @param entityId
-     * @param markedVersion
      * @return
      */
     Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope, final Id entityId,
-                                                         final List<UUID> versions, UUID markedVersion);
+                                                         final List<UUID> versions);
 
 }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 1b8614f..32470f6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -29,9 +29,7 @@
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.*;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.*;
@@ -103,7 +101,7 @@
 
 
         //we always index in the target scope
-        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
+        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
 
         //we may have to index  we're indexing from source->target here
         final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
@@ -216,10 +214,8 @@
         return Optional.of(defaultProperties);
     }
 
-    //Steps to delete an IndexEdge.
-    //1.Take the search edge given and search for all the edges in elasticsearch matching that search edge
-    //2. Batch Delete all of those edges returned in the previous search.
-    //TODO: optimize loops further.
+    // DO NOT USE THIS AS THE QUERY TO ES CAN CAUSE EXTREME LOAD
+    // TODO REMOVE THIS AND UPDATE THE TESTS TO NOT USE THIS METHOD
     @Override
     public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope applicationScope,
                                                               final Edge edge ) {
@@ -256,49 +252,31 @@
         return ObservableTimer.time( batches, addTimer );
     }
 
-
     @Override
-    public Observable<IndexOperationMessage> deIndexEntity( final ApplicationScope applicationScope, final Id entityId,
-                                                            final UUID markedVersion,
-                                                            final List<UUID> allVersionsBeforeMarked ) {
+    public Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
+                                                         final Id entityId, final UUID entityVersion){
 
-        final EntityIndex ei = entityIndexFactory.
-            createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
-
-        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
-        // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
-        final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
-            CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
-            Long.MAX_VALUE ) );
-
-
-
-        final EntityIndexBatch batch = ei.createBatch();
-
-        // de-index each version of the entity before the marked version
-        allVersionsBeforeMarked.forEach(version -> batch.deindex(searchEdgeFromSource, entityId, version));
-
-
-        // for now, query the index to remove docs where the entity is source/target node and older than markedVersion
-        // TODO: investigate getting this information from graph
-        CandidateResults candidateResults = ei.getNodeDocsOlderThanMarked(entityId, markedVersion );
-        candidateResults.forEach(candidateResult -> batch.deindex(candidateResult));
-
-        return Observable.just(batch.build());
+        if (logger.isTraceEnabled()) {
+            logger.trace("deIndexEdge edge={} entityId={} entityVersion={}", edge.toString(), entityId.toString(), entityVersion.toString());
+        }
+        final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
+        final EntityIndexBatch entityBatch = ei.createBatch();
+        entityBatch.deindex(generateScopeFromSource( edge ), entityId, entityVersion);
+        return Observable.just(entityBatch.build());
 
     }
 
+
     @Override
     public Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope,
                                                                 final Id entityId,
-                                                                final List<UUID> versions,
-                                                                UUID markedVersion) {
+                                                                final List<UUID> versions) {
 
         final EntityIndex ei = entityIndexFactory.
             createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
 
-        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
-        // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
+        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code for de-indexing
+        // previously .timsetamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
         final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
             CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
             Long.MAX_VALUE ) );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 2b3573e..d6bdd93 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@
      * Index a batch list of entities.  Goes to the utility queue.
      * @param edges
      * @param updatedSince
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
+    void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType);
 }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..05602fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
-                indexService.indexBatch(edgeScopes, modifiedSince, true);
+                indexService.indexBatch(edgeScopes, modifiedSince, AsyncEventQueueType.UTILITY);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
@@ -351,7 +352,7 @@
         final Status status = Status.valueOf( stringStatus );
 
         final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
-        final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
+        final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
 
         return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
     }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 83f4c8b..b1b7f75 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -121,12 +122,8 @@
                 if (isDeleted) {
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
-                    final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-
-                    indexMessageObservable
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -135,18 +132,8 @@
                     final Id sourceNodeId = markedEdge.getSourceNode();
                     logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -155,19 +142,8 @@
                     final Id targetNodeId = markedEdge.getTargetNode();
                     logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
-
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
                 }
 
 
@@ -252,13 +228,13 @@
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
             });
 
     }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index 1afb524..c75545e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -121,12 +122,8 @@
                 if (isDeleted) {
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
-                    final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-
-                    indexMessageObservable
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -135,18 +132,8 @@
                     final Id sourceNodeId = markedEdge.getSourceNode();
                     logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -155,18 +142,8 @@
                     final Id targetNodeId = markedEdge.getTargetNode();
                     logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -245,13 +222,13 @@
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
             });
 
     }
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index ae4623d..c9752c3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -743,7 +743,7 @@
 
     /**
      * Add a new index to the application for scale
-     * @param suffix unique indentifier for additional index
+     * @param newIndexName unique identifier for additional index
      * @param shards number of shards
      * @param replicas number of replicas
      * @param writeConsistency only "one, quorum, or all"
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 900bda5..4c9d16c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -31,6 +31,8 @@
 import org.apache.usergrid.persistence.index.utils.ClassUtils;
 import org.apache.usergrid.persistence.index.utils.ListUtils;
 import org.apache.usergrid.persistence.index.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -43,6 +45,7 @@
 public class Query {
 
 
+    private static final Logger logger = LoggerFactory.getLogger(Query.class);
 
     public enum Level {
         IDS, REFS, CORE_PROPERTIES, ALL_PROPERTIES, LINKED_PROPERTIES
@@ -319,6 +322,13 @@
 
 
     public static Query fromIdentifier( Object id ) {
+        if (id == null) {
+            throw new IllegalArgumentException("null identifier passed in");
+        }
+        Identifier objectIdentifier = Identifier.from(id);
+        if (objectIdentifier == null) {
+            throw new IllegalArgumentException("Supplied id results in null Identifier");
+        }
         Query q = new Query();
         q.addIdentifier( Identifier.from(id) );
         return q;
@@ -409,6 +419,10 @@
         }
 
         for ( Identifier identifier : identifiers ) {
+            if (identifier == null) {
+                logger.error("containsUuidIdentifiersOnly(): identifier in identifiers list is null");
+                return false;
+            }
             if ( !identifier.isUUID() ) {
                 return false;
             }
@@ -635,6 +649,9 @@
         if ( identifiers == null ) {
             identifiers = new ArrayList<Identifier>();
         }
+        if (identifier == null) {
+            throw new IllegalArgumentException("adding null identifier is not allowed");
+        }
         identifiers.add( identifier );
     }
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
index 790b4d9..0a4360f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
@@ -48,6 +48,32 @@
 
     public static final String COLLECTION_ACTIVITIES = "activities";
 
+    public static final String COLLECTION_EVENTS = "events";
+
+    public static final String COLLECTION_FOLDERS = "folders";
+
+    public static final String COLLECTION_DEVICES = "devices";
+
+    public static final String COLLECTION_NOTIFICATIONS = "notifications";
+
+    public static final String COLLECTION_ROLES = "roles";
+
+    public static boolean isCustomCollectionName(String collectionName) {
+        switch (collectionName.toLowerCase()) {
+            case COLLECTION_USERS:
+            case COLLECTION_GROUPS:
+            case COLLECTION_ASSETS:
+            case COLLECTION_ACTIVITIES:
+            case COLLECTION_EVENTS:
+            case COLLECTION_FOLDERS:
+            case COLLECTION_DEVICES:
+            case COLLECTION_NOTIFICATIONS:
+            case COLLECTION_ROLES:
+                return false;
+        }
+        return true;
+    }
+
     @EntityProperty(indexed = true, fulltextIndexed = false, required = true, mutable = false, aliasProperty = true,
             basic = true)
     protected String name;
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 3bfe460..68834b3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -92,7 +92,7 @@
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesToTargetObservable.edgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+        edgesToTargetObservable.edgesToTarget( gm, target, true).doOnNext(new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9e84219..55b77c0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -89,7 +89,7 @@
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId, true).doOnNext(edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 
@@ -118,7 +118,7 @@
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, source, true).doOnNext(edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
new file mode 100644
index 0000000..ddf2c68
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Injector;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+
+@NotThreadSafe
+public class CollectionDeleteTest extends AbstractCoreIT {
+    private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteTest.class );
+
+    private static final MetricRegistry registry = new MetricRegistry();
+
+
+    private static final int ENTITIES_TO_DELETE = 1000;
+    private static final int ENTITIES_TO_ADD_AFTER_TIME = 3;
+
+
+    @Before
+    public void startReporting() {
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Starting metrics reporting");
+        }
+    }
+
+
+    @After
+    public void printReport() {
+        logger.debug( "Printing metrics report" );
+    }
+
+
+    @Test( timeout = 240000 )
+    public void clearOneCollection() throws Exception {
+
+        logger.info( "Started clearOneCollection()" );
+
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
+
+        final EntityManager em = setup.getEmf().getEntityManager( appId );
+
+        final CollectionDeleteService collectionDeleteService = setup.getInjector().getInstance( CollectionDeleteService.class );
+
+        // ----------------- create a bunch of entities
+
+        Map<String, Object> entityMap = new HashMap<String, Object>() {{
+            put( "key1", 1000 );
+            put( "key2", 2000 );
+            put( "key3", "Some value" );
+        }};
+
+        String collectionName = "items";
+        String itemType = "item";
+
+
+        List<EntityRef> entityRefs = new ArrayList<EntityRef>();
+        for ( int i = 0; i < ENTITIES_TO_DELETE; i++ ) {
+
+            final Entity entity;
+
+            try {
+                entityMap.put( "key", i );
+                entity = em.create(itemType, entityMap);
+            }
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
+            }
+
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            if ( i % 10 == 0 ) {
+                logger.info( "Created {} entities", i );
+            }
+        }
+
+        logger.info("Created {} entities", ENTITIES_TO_DELETE);
+        long timeFirstPutDone = System.currentTimeMillis();
+        logger.info("timeFirstPutDone={}", timeFirstPutDone);
+
+        for (int i = 0; i < ENTITIES_TO_ADD_AFTER_TIME; i++) {
+
+            final Entity entity;
+
+            try {
+                entityMap.put( "key", ENTITIES_TO_DELETE + i );
+                entity = em.create(itemType, entityMap);
+            }
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
+            }
+
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            if ( i % 10 == 0 ) {
+                logger.info( "Created {} entities after delete time", i );
+            }
+
+        }
+        logger.info("Created {} entities after delete time", ENTITIES_TO_ADD_AFTER_TIME);
+
+
+        app.waitForQueueDrainAndRefreshIndex(5000);
+
+        final CollectionDeleteRequestBuilder builder =
+            collectionDeleteService.getBuilder()
+                .withApplicationId( em.getApplicationId() )
+                .withCollection(collectionName)
+                .withEndTimestamp(timeFirstPutDone);
+
+        CollectionDeleteService.CollectionDeleteStatus status = collectionDeleteService.deleteCollection(builder);
+
+        assertNotNull( status.getJobId(), "JobId is present" );
+
+        logger.info( "Delete collection" );
+
+
+        waitForDelete( status, collectionDeleteService );
+
+        app.waitForQueueDrainAndRefreshIndex(15000);
+
+        // ----------------- test that we can read the entries after the timestamp
+
+        readData( em, collectionName,ENTITIES_TO_ADD_AFTER_TIME);
+    }
+
+    /**
+     * Wait for the delete to occur
+     */
+    private void waitForDelete( final CollectionDeleteService.CollectionDeleteStatus status, final CollectionDeleteService collectionDeleteService )
+        throws InterruptedException, IllegalArgumentException {
+        if (status != null) {
+            logger.info("waitForDelete: jobID={}", status.getJobId());
+        } else {
+            logger.info("waitForDelete: error, status = null");
+            throw new IllegalArgumentException("collectionDeleteStatus = null");
+        }
+        while ( true ) {
+
+            try {
+                final CollectionDeleteService.CollectionDeleteStatus updatedStatus =
+                    collectionDeleteService.getStatus( status.getJobId() );
+
+                if (updatedStatus == null) {
+                    logger.info("waitForDelete: updated status is null");
+                } else {
+                    logger.info("waitForDelete: status={} numberProcessed={}",
+                        updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed());
+
+                    if ( updatedStatus.getStatus() == CollectionDeleteService.Status.COMPLETE ) {
+                        break;
+                    }
+                }
+            }
+            catch ( IllegalArgumentException iae ) {
+                //swallow.  Thrown if our job can't be found.  I.E hasn't updated yet
+            }
+
+
+            Thread.sleep( 1000 );
+        }
+    }
+
+
+    private int readData(EntityManager em, String collectionName, int expectedEntities)
+        throws Exception {
+
+        app.waitForQueueDrainAndRefreshIndex();
+
+        Results results = em.getCollection(em.getApplicationRef(), collectionName, null, expectedEntities,
+            Query.Level.ALL_PROPERTIES, false);
+
+        int count = 0;
+        while ( true ) {
+
+            if (results.getEntities().size() == 0) {
+                break;
+            }
+
+            UUID lastEntityUUID = null;
+            for ( Entity e : results.getEntities() ) {
+
+                assertEquals(2000, e.getProperty("key2"));
+
+                if (count % 100 == 0) {
+                    logger.info("read {} entities", count);
+                }
+                lastEntityUUID = e.getUuid();
+                count++;
+            }
+
+            results = em.getCollection(em.getApplicationRef(), collectionName, lastEntityUUID, expectedEntities,
+                Query.Level.ALL_PROPERTIES, false);
+
+        }
+        logger.info("read {} total entities", count);
+
+        assertEquals( "Did not get expected entities", expectedEntities, count );
+        return count;
+    }
+
+    private int countEntities( EntityManager em, String collectionName, int expectedEntities)
+           throws Exception {
+
+           app.waitForQueueDrainAndRefreshIndex();
+
+           Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+           Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
+
+           int count = 0;
+           while ( true ) {
+
+               count += results.size();
+
+
+               if ( results.hasCursor() ) {
+                   logger.info( "Counted {} : query again with cursor", count );
+                   q.setCursor( results.getCursor() );
+                   results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+               }
+               else {
+                   break;
+               }
+           }
+
+           assertEquals( "Did not get expected entities", expectedEntities, count );
+           return count;
+       }
+
+
+}
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CoreSchemaManager.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CoreSchemaManager.java
index c064751..1d51c25 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CoreSchemaManager.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CoreSchemaManager.java
@@ -18,6 +18,7 @@
 
 
 import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.CassandraFig;
 import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
@@ -37,14 +38,14 @@
     private static final Logger logger = LoggerFactory.getLogger( CoreSchemaManager.class );
 
     private final Setup setup;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
     private final LockManager lockManager;
     private final DataStaxCluster dataStaxCluster;
 
 
     public CoreSchemaManager( final Setup setup, Injector injector ) {
         this.setup = setup;
-        this.cassandraFig = injector.getInstance( CassandraFig.class );
+        this.cassandraConfig = injector.getInstance( CassandraConfig.class );
         this.lockManager = injector.getInstance( LockManager.class );
         this.dataStaxCluster = injector.getInstance( DataStaxCluster.class );
     }
@@ -80,16 +81,21 @@
 
     @Override
     public void destroy() {
-        logger.info( "dropping keyspaces" );
         try {
+            logger.info( "dropping application keyspace" );
             dataStaxCluster.getClusterSession()
-                .execute("DROP KEYSPACE "+ CQLUtils.quote(cassandraFig.getApplicationKeyspace()));
+                .execute("DROP KEYSPACE "+ CQLUtils.quote(cassandraConfig.getApplicationKeyspace()));
+            dataStaxCluster.waitForSchemaAgreement();
+
+            logger.info( "dropping application local keyspace" );
+            dataStaxCluster.getClusterSession()
+                .execute("DROP KEYSPACE "+ CQLUtils.quote(cassandraConfig.getApplicationLocalKeyspace()));
             dataStaxCluster.waitForSchemaAgreement();
 
             dataStaxCluster.getClusterSession().close(); // close session so it's meta will get refreshed
         }
         catch ( Exception e ) {
-            logger.error("Error dropping application keyspace: {} error: {}", cassandraFig.getApplicationKeyspace(), e);
+            logger.error("Error dropping application keyspaces: {} error: {}", cassandraConfig.getApplicationKeyspace(), e);
         }
         logger.info( "keyspaces dropped" );
         logger.info( "dropping indices" );
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index a7759de..a1ff84b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -74,10 +74,10 @@
     }
 
 
-    @Test( timeout = 120000 )
+    @Test( timeout = 240000 )
     public void rebuildOneCollectionIndex() throws Exception {
 
-        logger.info( "Started rebuildIndex()" );
+        logger.info( "Started rebuildOneCollectionIndex()" );
 
         String rand = RandomStringUtils.randomAlphanumeric( 5 );
         final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
@@ -163,7 +163,7 @@
 
         waitForRebuild( status, reIndexService );
 
-        app.waitForQueueDrainAndRefreshIndex(5000);
+        app.waitForQueueDrainAndRefreshIndex(15000);
 
         // ----------------- test that we can read the catherder collection and not the catshepard
 
@@ -172,7 +172,7 @@
     }
 
 
-    @Test( timeout = 120000 )
+    @Test( timeout = 240000 )
     public void rebuildIndex() throws Exception {
 
         logger.info( "Started rebuildIndex()" );
@@ -234,7 +234,7 @@
         }
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.waitForQueueDrainAndRefreshIndex(15000);
+        app.waitForQueueDrainAndRefreshIndex(30000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -278,7 +278,7 @@
 
             assertNotNull( status.getJobId(), "JobId is present" );
 
-            logger.info( "Rebuilt index" );
+            logger.info( "Rebuilt index, jobID={}", status.getJobId());
 
 
             waitForRebuild( status, reIndexService );
@@ -301,7 +301,7 @@
     @Test( timeout = 120000 )
     public void rebuildIndexGeo() throws Exception {
 
-        logger.info( "Started rebuildIndex()" );
+        logger.info( "Started rebuildIndexGeo()" );
 
         String rand = RandomStringUtils.randomAlphanumeric( 5 );
         final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
@@ -414,7 +414,7 @@
     @Test( timeout = 120000 )
     public void rebuildUpdatedSince() throws Exception {
 
-        logger.info( "Started rebuildIndex()" );
+        logger.info( "Started rebuildUpdatedSince()" );
 
         String rand = RandomStringUtils.randomAlphanumeric( 5 );
         final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
@@ -436,7 +436,7 @@
 
         final Entity secondEntity = em.create( "thing",  entityData);
 
-        app.waitForQueueDrainAndRefreshIndex(5000);
+        app.waitForQueueDrainAndRefreshIndex(15000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -511,14 +511,26 @@
      * Wait for the rebuild to occur
      */
     private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
-        throws InterruptedException {
+        throws InterruptedException, IllegalArgumentException {
+        if (status != null) {
+            logger.info("waitForRebuild: jobID={}", status.getJobId());
+        } else {
+            logger.info("waitForRebuild: error, status = null");
+            throw new IllegalArgumentException("reindexStatus = null");
+        }
         while ( true ) {
 
             try {
                 final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
 
-                if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
-                    break;
+                if (updatedStatus == null) {
+                    logger.info("waitForRebuild: updated status is null");
+                } else {
+                    logger.info("waitForRebuild: status={} numberProcessed={}", updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed());
+
+                    if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
+                        break;
+                    }
                 }
             }
             catch ( IllegalArgumentException iae ) {
diff --git a/stack/core/src/test/resources/project.properties b/stack/core/src/test/resources/project.properties
index 1a848bc..77a785a 100644
--- a/stack/core/src/test/resources/project.properties
+++ b/stack/core/src/test/resources/project.properties
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index 8f9058d..24f3046 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -16,6 +16,7 @@
 # with ug.heapmax=5000m and ug.heapmin=3000m (set in Maven settings.xml)
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=25000
 
 
 #Not a good number for real systems.  Write shards should be 2x cluster size from our tests
@@ -28,7 +29,7 @@
 elasticsearch.managment_index=usergrid_core_management
 #cassandra.keyspace.application=core_tests_schema
 elasticsearch.queue_impl.resolution=true
-elasticsearch.queue_impl=DISTRIBUTED
+elasticsearch.queue_impl=LOCAL
 
 elasticsearch.buffer_timeout=1
 
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index bfa3abe..ea9ada8 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -377,6 +377,7 @@
                 Iterator<Class> messageTypes = routerProducer.getMessageTypes().iterator();
                 while ( messageTypes.hasNext() ) {
                     Class messageType = messageTypes.next();
+                    logger.info("createClusterSystem: routerProducer {}: message type={}", routerProducer.getRouterPath(), messageType.getName());
                     routersByMessageType.put( messageType, routerProducer.getRouterPath() );
                 }
             }
@@ -467,7 +468,7 @@
         if (started) {
             logger.info( "ClientActor [{}] has started", ra.path() );
         } else {
-            throw new RuntimeException( "ClientActor ["+ra.path()+"] did not start in time" );
+            throw new RuntimeException( "ClientActor ["+ra.path()+"] did not start in time, validate that akka seeds are configured properly" );
         }
     }
 
diff --git a/stack/corepersistence/collection/src/test/resources/dynamic-test.properties b/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
index ca1a51c..59b8d31 100644
--- a/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
+++ b/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
@@ -2,6 +2,7 @@
 # safe dynamic property defaults for our testing via IDE or Maven
 cassandra.connections=30
 cassandra.timeout.pool=30000
+cassandra.timeout=30000
 cassandra.port=9160
 cassandra.hosts=localhost
 cassandra.cluster_name=Usergrid
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
index c82bf83..9e95663 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
@@ -20,6 +20,7 @@
 
 cassandra.connections=30
 cassandra.timeout.pool=30000
+cassandra.timeout=30000
 cassandra.port=9160
 
 # a comma delimited private IP address list to your chop cassandra cluster
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
index 82bea14..0253e6e 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
@@ -21,6 +21,7 @@
 # purposely setting connections lower to slow down the activity on the cassandra server locally
 cassandra.connections=5
 cassandra.timeout.pool=60000
+cassandra.timeout=60000
 cassandra.port=9160
 cassandra.hosts=localhost
 cassandra.cluster_name=Usergrid
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid.properties b/stack/corepersistence/collection/src/test/resources/usergrid.properties
index e85b1ac..b7a6c11 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties
@@ -33,6 +33,7 @@
 
 cassandra.connections=15
 cassandra.timeout.pool=240000
+cassandra.timeout=240000
 
 queue.num.actors=5
 queue.sender.num.actors=5
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 000c633..b746c61 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -93,7 +93,7 @@
      * @param node The node to remove.  This will apply a timestamp to apply the delete + compact operation.  Any edges connected to this node with a timestamp
      * <= the specified time on the mark will be removed from the graph
      */
-    Observable<Id> compactNode( final Id node );
+    Observable<MarkedEdge> compactNode( final Id node );
 
     /**
      * Get all versions of this edge where versions <= max version
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 5fcdcb4..2fe40b1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -262,24 +262,17 @@
 
 
     @Override
-    public Observable<Id> compactNode( final Id inputNode ) {
-
+    public Observable<MarkedEdge> compactNode( final Id inputNode ) {
 
         final UUID startTime = UUIDGenerator.newTimeUUID();
 
-
-        final Observable<Id> nodeObservable =
-            Observable.just( inputNode ).map( node -> nodeSerialization.getMaxVersion( scope, node ) ).takeWhile(
-                maxTimestamp -> maxTimestamp.isPresent() )
-
+        final Observable<MarkedEdge> nodeObservable =
+            Observable.just( inputNode )
+                .map( node -> nodeSerialization.getMaxVersion( scope, node ) )
+                //.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
+                .takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
                 //map our delete listener
-                .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) )
-                    //set to 0 if nothing is emitted
-                .lastOrDefault( 0 )
-                    //log for posterity
-                .doOnNext( count -> logger.trace( "Removed {} edges from node {}", count, inputNode ) )
-                    //return our id
-                .map( count -> inputNode );
+                .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
 
         return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
     }
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 9392dbc..71d2f1d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -169,6 +169,15 @@
         return true;
     }
 
+    @Override
+    public String toString(){
+        return "SimpleSearchByEdgeType{node="+node
+            +", type="+type
+            +", maxTimestamp="+maxTimestamp
+            +", order="+order
+            +", filterMarked="+filterMarked
+            +", last="+last+"}";
+    }
 
     @Override
     public int hashCode() {
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
index 68569e5..3bcdc55 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
@@ -22,6 +22,8 @@
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -39,8 +41,8 @@
        * @param node The node that was deleted
        * @param timestamp The timestamp of the event
        *
-       * @return An observable that emits the total number of edges that have been removed with this node both as the
+       * @return An observable that emits the marked edges that have been removed with this node both as the
        *         target and source
        */
-    Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp );
+    Observable<MarkedEdge> receive(final ApplicationScope scope, final Id node, final UUID timestamp );
 }
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index cd5b1a8..df4e5d5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -95,53 +95,37 @@
      * @param node The node that was deleted
      * @param timestamp The timestamp of the event
      *
-     * @return An observable that emits the total number of edges that have been removed with this node both as the
+     * @return An observable that emits the marked edges that have been removed with this node both as the
      *         target and source
      */
-    public Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
+    public Observable<MarkedEdge> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
 
 
         return Observable.just( node )
 
                 //delete source and targets in parallel and merge them into a single observable
-                .flatMap( new Func1<Id, Observable<Integer>>() {
-                    @Override
-                    public Observable<Integer> call( final Id node ) {
+                .flatMap( id -> {
 
-                        final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
+                    final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
 
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
-                        }
-
-
-                        if ( !maxVersion.isPresent() ) {
-                            return Observable.empty();
-                        }
-
-
-                        //do all the delete, then when done, delete the node
-                        return doDeletes( node, scope, maxVersion.get(), timestamp ).count()
-                                //if nothing is ever emitted, emit 0 so that we know no operations took place.
-                                // Finally remove
-                                // the
-                                // target node in the mark
-                                .doOnCompleted( new Action0() {
-                                    @Override
-                                    public void call() {
-                                        try {
-                                            nodeSerialization.delete( scope, node, maxVersion.get()).execute();
-                                        }
-                                        catch ( ConnectionException e ) {
-                                            throw new RuntimeException( "Unable to connect to casandra", e );
-                                        }
-                                    }
-                                } );
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
                     }
-                } ).defaultIfEmpty( 0 );
+                    if ( !maxVersion.isPresent() ) {
+                        return Observable.empty();
+                    }
+
+                    // do all the edge deletes and then remove the marked node, return all edges just deleted
+                    return
+                        doDeletes( node, scope, maxVersion.get(), timestamp ).doOnCompleted( () -> {
+                            try {
+                                nodeSerialization.delete( scope, node, maxVersion.get()).execute();
+                            } catch ( ConnectionException e ) {
+                                throw new RuntimeException( "Unable to connect to cassandra", e );
+                            }
+                        });
+                });
     }
-
-
     /**
      * Do the deletes
      */
@@ -162,7 +146,7 @@
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesToTarget(scope,
-                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
                             }
                         }));
 
@@ -174,7 +158,7 @@
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesFromSource(scope,
-                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
                             }
                         }));
 
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 78a1d4b..5577bd0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -36,9 +36,10 @@
      * Return an observable of all edges from a source
      * @param gm
      * @param sourceNode
+     * @param filterMarked
      * @return
      */
-    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode );
+    Observable<Edge> edgesFromSourceDescending(final GraphManager gm, final Id sourceNode, boolean filterMarked);
 
 
     /**
@@ -54,9 +55,10 @@
      * Return an observable of all edges to a target
      * @param gm
      * @param targetNode
+     * @param filterMarked
      * @return
      */
-    Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode);
+    Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode, boolean filterMarked);
 
     /**
      * Return an observable of all edges from a source node.  Ordered ascending, from the startTimestamp if specified
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
index 1f81864..e9e2b28 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
@@ -400,7 +400,6 @@
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchEdgeType( search );
 
-
         final Id applicationId = scope.getApplication();
         final Id searchNode = search.getNode();
 
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 20efe42..9e0998d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -35,7 +35,6 @@
 import com.google.common.base.Optional;
 
 import rx.Observable;
-import rx.functions.Func1;
 
 
 /**
@@ -55,7 +54,7 @@
      * Get all edges from the source
      */
     @Override
-    public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) {
+    public Observable<Edge> edgesFromSourceDescending(final GraphManager gm, final Id sourceNode, boolean filterMarked) {
         final Observable<String> edgeTypes =
             gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
 
@@ -67,7 +66,7 @@
 
                 return gm.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        Optional.<Edge>absent() ) );
+                        Optional.<Edge>absent(), filterMarked ) );
         } );
     }
 
@@ -119,19 +118,16 @@
      * Get all edges from the source
      */
     @Override
-    public Observable<Edge> edgesToTarget( final GraphManager gm, final Id targetNode ) {
-        final Observable<String> edgeTypes =
-            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
+    public Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode, boolean filterMarked) {
 
-        return edgeTypes.flatMap( edgeType -> {
-
-            if (logger.isTraceEnabled()) {
-                logger.trace("Loading edges of edgeType {} to {}", edgeType, targetNode);
-            }
-
+        return gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) )
+            .flatMap( edgeType -> {
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Loading edges of edgeType {} to {}", edgeType, targetNode);
+                }
             return gm.loadEdgesToTarget(
                 new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    Optional.<Edge>absent() ) );
+                    Optional.<Edge>absent(), filterMarked ) );
         } );
     }
 
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 6a08d46..69dd43b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -21,7 +21,6 @@
 
 
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
@@ -29,7 +28,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
-import rx.functions.Func1;
 
 /**
  * Emits the id of all nodes that are target nodes from the given source node
@@ -55,7 +53,7 @@
     public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
 
         //only search edge types that start with collections
-        return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( edge -> {
+        return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode, true).map(edge -> {
             final Id targetNode = edge.getTargetNode();
 
             if (logger.isDebugEnabled()) {
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 8eccdbd..1d4331c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -89,7 +89,7 @@
             final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
 
             //get edges from the source
-            return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 )
+            return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode, true).buffer( 1000 )
                                             .doOnNext( edges -> {
                                                     final MutationBatch batch = keyspace.prepareMutationBatch();
 
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 438a978..80198de 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -135,7 +135,7 @@
         UUID eventTime = UUIDGenerator.newTimeUUID();
 
 
-        int count = deleteListener.receive( scope, sourceNode, eventTime ).toBlocking().last();
+        int count = deleteListener.receive( scope, sourceNode, eventTime ).count().toBlocking().last();
 
         assertEquals( "Mark was not set, no delete should be executed", 0, count );
 
@@ -171,7 +171,7 @@
 
         nodeSerialization.mark( scope, sourceNode, timestamp ).execute();
 
-        int count = deleteListener.receive( scope, sourceNode, deleteEventTimestamp ).toBlocking().last();
+        int count = deleteListener.receive( scope, sourceNode, deleteEventTimestamp ).count().toBlocking().last();
 
         assertEquals( 1, count );
 
@@ -256,7 +256,7 @@
 
         nodeSerialization.mark( scope, targetNode, deleteBefore ).execute();
 
-        int count = deleteListener.receive( scope, targetNode, UUIDGenerator.newTimeUUID() ).toBlocking().last();
+        int count = deleteListener.receive( scope, targetNode, UUIDGenerator.newTimeUUID() ).count().toBlocking().last();
 
         assertEquals( 1, count );
 
@@ -366,7 +366,7 @@
 
         nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
 
-        int count = deleteListener.receive( scope, toDelete, UUIDGenerator.newTimeUUID() ).toBlocking().last();
+        int count = deleteListener.receive( scope, toDelete, UUIDGenerator.newTimeUUID() ).count().toBlocking().last();
 
         assertEquals( edgeCount, count );
 
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties b/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
index 4dd17e2..63da408 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
@@ -1,6 +1,7 @@
 # Keep nothing but overriding test defaults in here
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=20000
 cassandra.port=9160
 cassandra.hosts=
 #cassandra.hosts=localhost
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
index ea4c3c7..9680d88 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
@@ -2,6 +2,7 @@
 
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=20000
 cassandra.port=9160
 
 # a comma delimited private IP address list to your chop cassandra cluster
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
index 96834e6..3cb5608 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
@@ -1,6 +1,7 @@
 # Keep nothing but overriding test defaults in here
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=20000
 cassandra.port=9160
 cassandra.hosts=localhost
 cassandra.cluster_name=Usergrid
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid.properties b/stack/corepersistence/graph/src/test/resources/usergrid.properties
index d744efe..f8b7a8d 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid.properties
@@ -3,3 +3,4 @@
 
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=20000
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 14020a9..b444199 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -20,14 +20,12 @@
 package org.apache.usergrid.persistence.index;
 
 
-import com.google.common.base.Optional;
 import org.apache.usergrid.persistence.core.CPManager;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 import java.util.Map;
-import java.util.UUID;
 
 
 /**
@@ -36,7 +34,7 @@
 public interface EntityIndex extends CPManager {
 
 
-    public static final int MAX_LIMIT = 1000;
+    int MAX_LIMIT = 1000;
 
     /**
      * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
@@ -134,14 +132,6 @@
     CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId);
 
     /**
-     * Returns all entity docs that match the entityId being the nodeId ( aka connections where entityId = sourceNode)
-     *
-     * @param entityId      The entityId to match when searching
-     * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
-     * @return
-     */
-    CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion);
-    /**
      * delete all application records
      *
      * @return
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index a35921c..f4fae2b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -34,7 +34,6 @@
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.StringUtils;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
@@ -502,7 +501,7 @@
             searchResponse = srb.execute().actionGet();
         }
         catch ( Throwable t ) {
-            logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );
+            logger.error( "Unable to communicate with Elasticsearch: {}", t.getMessage() );
             failureMonitor.fail( "Unable to execute batch", t );
             throw t;
         }
@@ -584,68 +583,6 @@
     }
 
 
-    @Override
-    public CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion ) {
-
-        // TODO: investigate if functionality via iterator so a caller can page the deletion until all is gone
-
-        Preconditions.checkNotNull( entityId, "entityId cannot be null" );
-        Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null");
-        ValidationUtils.verifyVersion(markedVersion);
-
-        SearchResponse searchResponse;
-        List<CandidateResult> candidates = new ArrayList<>();
-
-        final long markedTimestamp = markedVersion.timestamp();
-
-        // never let this fetch more than 100 to save memory
-        final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());
-
-        // this query will find all the documents where this entity is a source/target node
-        final QueryBuilder nodeQuery = QueryBuilders
-            .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(entityId));
-
-        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder()
-            .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC);
-
-        try {
-
-            long queryTimestamp = 0L;
-
-            QueryBuilder timestampQuery =  QueryBuilders
-                .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
-                .gte(queryTimestamp)
-                .lt(markedTimestamp);
-
-            QueryBuilder finalQuery = QueryBuilders.constantScoreQuery(
-                QueryBuilders
-                    .boolQuery()
-                    .must(timestampQuery)
-                    .must(nodeQuery)
-            );
-
-
-            searchResponse = srb
-                .setQuery(finalQuery)
-                .setSize(searchLimit)
-                .execute()
-                .actionGet();
-
-
-            candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
-
-        }
-        catch ( Throwable t ) {
-            logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );
-            failureMonitor.fail( "Unable to execute batch", t );
-            throw t;
-        }
-        failureMonitor.success();
-
-        return new CandidateResults( candidates, Collections.EMPTY_SET);
-    }
-
-
     /**
      * Completely delete an index.
      */
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
index 84a28f0..70d2284 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
@@ -34,6 +34,7 @@
     public static final String UUID_REX =
             "[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}";
     public static final String EMAIL_REX =  "[a-zA-Z0-9._%'+\\-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}";
+    public static final String NAME_REX = "[a-zA-Z0-9_\\-./'+ ]*";
 
     public enum Type {
         UUID, NAME, EMAIL
@@ -46,7 +47,7 @@
     static Pattern emailRegEx = Pattern.compile( EMAIL_REX );
     // "Pattern nameRegEx" below used to be [a-zA-Z0-9_\\-./], changed it to contain a 'space' to a
     // ddress https://issues.apache.org/jira/browse/USERGRID-94
-    static Pattern nameRegEx = Pattern.compile( "[a-zA-Z0-9_\\-./'+ ]*" );
+    static Pattern nameRegEx = Pattern.compile( NAME_REX );
 
 
     private Identifier( Type type, Object value ) {
diff --git a/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties b/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
index 2589a7f..aaa4653 100644
--- a/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
@@ -2,6 +2,7 @@
 # safe dynamic property defaults for our testing via IDE or Maven
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=20000
 cassandra.port=9160
 cassandra.hosts=localhost
 cassandra.cluster_name=Usergrid
diff --git a/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
index 677df96..855a8b5 100644
--- a/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
@@ -1,6 +1,7 @@
 # These are for CHOP environment settings
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=20000
 cassandra.port=9160
 
 # a comma delimited private IP address list to your chop cassandra cluster
diff --git a/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
index 67a9ab1..c0166e7 100644
--- a/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
@@ -6,6 +6,7 @@
 cassandra.cluster_name=Usergrid
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=20000
 
 collections.keyspace=Usergrid_Collections
 collections.keyspace.strategy.options=replication_factor:1
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
index 153ed4b..005ce0c 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -93,6 +93,12 @@
             <groupId>org.apache.usergrid</groupId>
             <artifactId>common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>cassandra-all</artifactId>
+                    <groupId>org.apache.cassandra</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
 
@@ -152,18 +158,6 @@
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.usergrid</groupId>
-            <artifactId>common</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>cassandra-all</artifactId>
-                    <groupId>org.apache.cassandra</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <!--
                 <dependency>
                     <groupId>com.datastax.cassandra</groupId>
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
index d3a46aa..2ed37e0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
@@ -163,7 +163,7 @@
     @Override
     public Collection<TableDefinition> getTables() {
         return Collections.singletonList(
-            new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), "queues", CQL ) );
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_QUEUES, CQL ) );
     }
 
 }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index a485f55..a6e2451 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -43,7 +43,7 @@
     public QueueModule( String queueManagerType ) {
 
         logger.info("QueueManagerType={}", queueManagerType);
-        if ( "DISTRIBUTED_SNS".equals( queueManagerType ) ) {
+        if ( "DISTRIBUTED_SNS".equals( queueManagerType ) || "SNS".equals(queueManagerType)) {
             this.implementation = LegacyQueueManager.Implementation.DISTRIBUTED_SNS;
         }
         else if ( "DISTRIBUTED".equals( queueManagerType ) ) {
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
index e220650..b2cebaa 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
@@ -34,7 +34,7 @@
  */
 public class KeyspaceDropper {
 
-    private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class );
+    private static final Logger logger = LoggerFactory.getLogger( KeyspaceDropper.class );
 
     static { dropTestKeyspaces(); }
 
@@ -57,6 +57,10 @@
 
         dropTestKeyspace( keyspaceApp, hosts, port );
         dropTestKeyspace( keyspaceQueue, hosts, port );
+
+        // drop local test keyspaces
+        dropTestKeyspace(keyspaceApp + "_", hosts, port);
+        dropTestKeyspace(keyspaceQueue + "_", hosts, port);
     }
 
     public static void dropTestKeyspace( String keyspace, String[] hosts, int port ) {
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index d4ed7ef..8da2180 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -340,7 +340,7 @@
                 if (qmm.getQueueDepth( queueName, available ) == numMessages) {
                     break;
                 }
-                Thread.sleep( 500 );
+                Thread.sleep( 1000 );
             }
 
             Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName, available ) );
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index d77e7e8..2a1a83e 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -67,3 +67,4 @@
 
 #cassandra.connections=30
 cassandra.timeout.pool=20000
+cassandra.timeout=20000
diff --git a/stack/pom.xml b/stack/pom.xml
index 513c894..c98c72d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -661,9 +661,9 @@
             </dependency>
 
             <dependency>
-                <groupId>com.github.stephenc</groupId>
+                <groupId>com.github.jbellis</groupId>
                 <artifactId>jamm</artifactId>
-                <version>0.2.5</version>
+                <version>0.3.1</version>
             </dependency>
 
             <!-- Third Party Non-Commercial Dependencies -->
@@ -1311,7 +1311,7 @@
                     <useSystemClassLoader>false</useSystemClassLoader>
                     <testFailureIgnore>false</testFailureIgnore>
                     <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin}
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         ${ug.argline}
                     </argLine>
                     <systemPropertyVariables>
@@ -1548,7 +1548,7 @@
                         <version>${surefire.plugin.version}</version>
                         <configuration>
                             <argLine>
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 ${ug.argline}
                                 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec
                             </argLine>
diff --git a/stack/query-validator/src/test/resources/usergrid-custom-test.properties b/stack/query-validator/src/test/resources/usergrid-custom-test.properties
index bc1ba56..c8e3eee 100644
--- a/stack/query-validator/src/test/resources/usergrid-custom-test.properties
+++ b/stack/query-validator/src/test/resources/usergrid-custom-test.properties
@@ -30,3 +30,5 @@
 
 # This property is required to be set and cannot be defaulted anywhere
 usergrid.cluster_name=usergrid
+
+elasticsearch.queue_impl=LOCAL
diff --git a/stack/rest/pom.xml b/stack/rest/pom.xml
index 9bb83a6..e7aa68a 100644
--- a/stack/rest/pom.xml
+++ b/stack/rest/pom.xml
@@ -93,7 +93,7 @@
                     <argLine>-Dwebapp.directory=${basedir}/src/main/webapp
                         -Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax}
                         -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         -Djava.util.logging.config.file=${basedir}/src/test/resources/logging.properties ${ug.argline}
                     </argLine>
                     <includes>
@@ -460,7 +460,7 @@
                                 -Dwebapp.directory=${basedir}/src/main/webapp
                                 -Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                                 -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 -Djava.util.logging.config.file=${basedir}/src/test/resources/logging.properties
                                 ${ug.argline}
                             </argLine>
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index b8c1caa..c9174c1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -18,19 +18,19 @@
 package org.apache.usergrid.rest.applications;
 
 
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
+import javax.ws.rs.*;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.PathSegment;
 import javax.ws.rs.core.UriInfo;
 
+import com.google.common.base.Preconditions;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilder;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilderImpl;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteService;
+import org.apache.usergrid.persistence.index.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
@@ -48,6 +48,9 @@
 
 import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
  * A collection resource that stands before the Service Resource. If it cannot find
@@ -61,6 +64,9 @@
 })
 public class CollectionResource extends ServiceResource {
 
+    private static final Logger logger = LoggerFactory.getLogger( CollectionResource.class );
+    private static final String UPDATED_BEFORE_FIELD = "updatedBefore";
+
     public CollectionResource() {
     }
 
@@ -190,6 +196,61 @@
     }
 
 
+    @PUT
+    @Path("{itemName}/_clear")
+    @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
+    @RequireApplicationAccess
+    @JSONP
+    public ApiResponse clearCollectionPut(
+        final Map<String, Object> payload,
+        @PathParam("itemName") final String collectionName,
+        @QueryParam("callback") @DefaultValue("callback") String callback
+    ) throws Exception {
+
+        logger.info("Clearing collection {} for application {}", collectionName, getApplicationId().toString());
+
+        final CollectionDeleteRequestBuilder request = createRequest()
+            .withApplicationId(getApplicationId())
+            .withCollection(collectionName);
+
+        return executeResumeAndCreateResponse(payload, request, callback);
+
+    }
+
+
+    @GET
+    @Path( "{itemName}/_clear/{jobId}")
+    @Produces({MediaType.APPLICATION_JSON,"application/javascript"})
+    @RequireApplicationAccess
+    @JSONP
+    public ApiResponse clearCollectionJobGet(
+        @Context UriInfo ui,
+        @PathParam("itemName") PathSegment itemName,
+        @PathParam("jobId") String jobId,
+        @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+        if(logger.isTraceEnabled()){
+            logger.trace( "CollectionResource.clearCollectionJobGet" );
+        }
+
+        Preconditions
+            .checkNotNull(jobId, "path param jobId must not be null" );
+
+        CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().getStatus(jobId);
+
+        final ApiResponse response = createApiResponse();
+
+        response.setAction( "clear collection" );
+        response.setProperty( "jobId", status.getJobId() );
+        response.setProperty( "status", status.getStatus() );
+        response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+        response.setProperty( "numberCheckedForDeletion", status.getNumberProcessed() );
+        response.setSuccess();
+
+        return response;
+    }
+
+
     // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
     // So system access only.
     // TODO: use scheduler here to get around people sending a reindex call 30 times.
@@ -210,4 +271,57 @@
             services.getApplicationId().toString(),itemName.getPath(),false,callback );
     }
 
+
+    private CollectionDeleteService getCollectionDeleteService() {
+        return injector.getInstance( CollectionDeleteService.class );
+    }
+
+
+    private CollectionDeleteRequestBuilder createRequest() {
+        return new CollectionDeleteRequestBuilderImpl();
+    }
+
+
+    private ApiResponse executeResumeAndCreateResponse( final Map<String, Object> payload,
+                                                        final CollectionDeleteRequestBuilder request,
+                                                        final String callback ) {
+
+        Map<String,Object> newPayload = payload;
+        if(newPayload == null ||  !payload.containsKey( UPDATED_BEFORE_FIELD )){
+            newPayload = new HashMap<>(1);
+            newPayload.put(UPDATED_BEFORE_FIELD,Long.MAX_VALUE);
+        }
+
+        Preconditions.checkArgument(newPayload.get(UPDATED_BEFORE_FIELD) instanceof Number,
+            "The field \"updatedBefore\" in the payload must be a timestamp" );
+
+        //add our updated timestamp to the request
+        if ( newPayload.containsKey( UPDATED_BEFORE_FIELD ) ) {
+            final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_BEFORE_FIELD));
+            request.withEndTimestamp( timestamp );
+        }
+
+        return executeAndCreateResponse( request, callback );
+    }
+
+    /**
+     * Execute the request and return the response.
+     */
+    private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {
+
+
+        final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
+
+        final ApiResponse response = createApiResponse();
+
+        response.setAction( "clear collection" );
+        response.setProperty( "jobId", status.getJobId() );
+        response.setProperty( "status", status.getStatus() );
+        response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+        response.setProperty( "numberQueued", status.getNumberProcessed() );
+        response.setSuccess();
+
+        return response;
+    }
+
 }
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
index 5435f7e..3e4542d 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
@@ -17,6 +17,7 @@
 package org.apache.usergrid.rest.applications.users;
 
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 
@@ -465,6 +466,14 @@
             if ( ( password1 != null ) || ( password2 != null ) ) {
                 if ( management.checkPasswordResetTokenForAppUser( getApplicationId(), getUserUuid(), token ) ) {
                     if ( ( password1 != null ) && password1.equals( password2 ) ) {
+                        // validate password
+                        Collection<String> violations = management.passwordPolicyCheck(password1, false);
+                        if (violations.size() > 0) {
+                            // password not valid
+                            errorMsg = management.getPasswordDescription(false);
+                            return handleViewable("resetpw_set_form", this, getOrganizationName());
+                        }
+
                         management.setAppUserPassword( getApplicationId(), getUser().getUuid(), password1 );
                         management.revokeAccessTokenForAppUser( token );
                         return handleViewable( "resetpw_set_success", this, getOrganizationName() );
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
index 4cbe9b2..1f80bc1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
@@ -43,6 +43,7 @@
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.UriInfo;
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 
@@ -66,6 +67,9 @@
 
     String token = null;
 
+    String loginEndpoint;
+
+
 
     public UserResource() {
     }
@@ -294,8 +298,17 @@
             if ( ( password1 != null ) || ( password2 != null ) ) {
                 if ( management.checkPasswordResetTokenForAdminUser( user.getUuid(), tokenInfo ) ) {
                     if ( ( password1 != null ) && password1.equals( password2 ) ) {
+                        // validate password
+                        Collection<String> violations = management.passwordPolicyCheck(password1, true);
+                        if (violations.size() > 0) {
+                            // password not valid
+                            errorMsg = management.getPasswordDescription(true);
+                            return handleViewable( "resetpw_set_form", this, organizationId );
+                        }
+
                         management.setAdminUserPassword( user.getUuid(), password1 );
                         management.revokeAccessTokenForAdminUser( user.getUuid(), token );
+                        loginEndpoint = properties.getProperty("usergrid.viewable.loginEndpoint");
                         return handleViewable( "resetpw_set_success", this, organizationId );
                     }
                     else {
@@ -342,6 +355,9 @@
         return errorMsg;
     }
 
+    public String getLoginEndpoint() {
+        return loginEndpoint;
+    }
 
     public String getToken() {
         return token;
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/error.jsp
@@ -27,7 +27,7 @@
 </head>
 <body>
 
-	<p>An error occurred <c:out value="${it}"/>.</p>
+	<p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/test.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/test.jsp
index 83a6ad1..68c12f2 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/test.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/test.jsp
@@ -1,5 +1,6 @@
 <%@ page language="java" contentType="text/html; charset=ISO-8859-1"
     pageEncoding="ISO-8859-1"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
@@ -24,6 +25,6 @@
 	<link rel="stylesheet" type="text/css" href="/css/styles.css" />
 </head>
 <body>
-<h1>${it.foo}</h1> 
+<h1>${fn:escapeXml(it.foo)}</h1>
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/authorize_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/authorize_form.jsp
index 6b1b8b2..ed934a7 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/authorize_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/authorize_form.jsp
@@ -2,6 +2,7 @@
 	pageEncoding="ISO-8859-1"%>
 <%@ page import="org.apache.usergrid.rest.AbstractContextResource"%>
 <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
@@ -28,13 +29,13 @@
 <body>
 
 	<div class="dialog-area">
-		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
 		<form class="dialog-form" action="" method="post">
-			<input type="hidden" name="response_type" value="${it.responseType}">
-			<input type="hidden" name="client_id" value="${it.clientId}">
-			<input type="hidden" name="redirect_uri" value="${it.redirectUri}">
-			<input type="hidden" name="scope" value="${it.scope}">
-			<input type="hidden" name="state" value="${it.state}">
+			<input type="hidden" name="response_type" value="${fn:escapeXml(it.responseType)}">
+			<input type="hidden" name="client_id" value="${fn:escapeXml(it.clientId)}">
+			<input type="hidden" name="redirect_uri" value="${fn:escapeXml(it.redirectUri)}">
+			<input type="hidden" name="scope" value="${fn:escapeXml(it.scope)}">
+			<input type="hidden" name="state" value="${fn:escapeXml(it.state)}">
 			<fieldset>
 				<p>
 					<label for="username">Username</label>
@@ -56,4 +57,4 @@
 	</div>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/error.jsp
@@ -27,7 +27,7 @@
 </head>
 <body>
 
-	<p>An error occurred <c:out value="${it}"/>.</p>
+	<p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/activate.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/activate.jsp
index dfcf3b7..20e69b8 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/activate.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/activate.jsp
@@ -26,7 +26,7 @@
 </head>
 <body>
 
-	<p>Your account with email address <c:out value="${it.user.email}"/> has been successfully activated.</p>
+	<p>Your account with email address <c:out value="${it.user.email}" escapeXml="true"/> has been successfully activated.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/confirm.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/confirm.jsp
index 02e9ee3..d7f3acc 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/confirm.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/confirm.jsp
@@ -26,8 +26,8 @@
 </head>
 <body>
 
-	<p>Your account with email address <c:out value="${it.user.email}"/> has been successfully confirmed.
+	<p>Your account with email address <c:out value="${it.user.email}" escapeXml="true"/> has been successfully confirmed.
 	You will received an email soon to let you know when you account has been activated</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/error.jsp
@@ -27,7 +27,7 @@
 </head>
 <body>
 
-	<p>An error occurred <c:out value="${it}"/>.</p>
+	<p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_form.jsp
index 0f53bfc..5230ea7 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_form.jsp
@@ -1,6 +1,7 @@
 <%@ page language="java" contentType="text/html; charset=ISO-8859-1"
 	pageEncoding="ISO-8859-1"%>
 <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
@@ -27,13 +28,13 @@
 <body>
 	<div class="dialog-area">
 		<c:if test="${!empty it.errorMsg}">
-			<div class="dialog-form-message">${it.errorMsg}</div>
+			<div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div>
 		</c:if>
 		<form class="dialog-form" action="" method="post">
 			<fieldset>
 				<p>
 					Enter the captcha to have your password reset instructions sent to
-					<c:out value="${it.user.email}" />
+					<c:out value="${it.user.email}" escapeXml="true" />
 				</p>
 				<p id="human-proof"></p>
 				${it.reCaptchaHtml}
@@ -44,4 +45,4 @@
 		</form>
 	</div>
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_success.jsp
index 23f8508..41c5176 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_success.jsp
@@ -29,7 +29,7 @@
 </head>
 <body>
 
-	<p>Email with instructions for password reset sent to <c:out value="${it.user.email}"/></p>
+	<p>Email with instructions for password reset sent to <c:out value="${it.user.email}" escapeXml="true"/></p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_form.jsp
index a83d80d..c0203ce 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_form.jsp
@@ -4,6 +4,7 @@
 <%@ page import="net.tanesha.recaptcha.ReCaptchaFactory"%>
 <%@ page import="org.apache.usergrid.rest.AbstractContextResource"%>
 <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
@@ -30,12 +31,12 @@
 <body>
 
 	<div class="dialog-area">
-		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
 		<form class="dialog-form" action="" method="post">
-			<input type="hidden" name="token" value="${it.token}">
+			<input type="hidden" name="token" value="${fn:escapeXml(it.token)}">
 			<fieldset>
 				<p>
-					<label for="password1">Please enter your new password for <c:out value="${it.user.email}"/>.</label>
+					<label for="password1">Please enter your new password for <c:out value="${it.user.email}" escapeXml="true"/>.</label>
 				</p>
 				<p>
 					<input class="text_field" id="password1" name="password1" type="password" />
@@ -54,4 +55,4 @@
 	</div>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_success.jsp
index 9de90ba..3915084 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_success.jsp
@@ -29,7 +29,7 @@
 </head>
 <body>
 
-	<p>New password set for <c:out value="${it.user.email}"/></p>
+	<p>New password set for <c:out value="${it.user.email}" escapeXml="true"/></p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/error.jsp
@@ -27,7 +27,7 @@
 </head>
 <body>
 
-	<p>An error occurred <c:out value="${it}"/>.</p>
+	<p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_form.jsp
index 3211a3a..01dfc57 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_form.jsp
@@ -1,6 +1,7 @@
 <%@ page language="java" contentType="text/html; charset=ISO-8859-1"
 	pageEncoding="ISO-8859-1"%>
 <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
@@ -28,7 +29,7 @@
 
 	<div class="dialog-area">
 		<c:if test="${!empty it.errorMsg}">
-			<div class="dialog-form-message">${it.errorMsg}</div>
+			<div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div>
 		</c:if>
 		<form class="dialog-form" action="" method="post">
 			<fieldset>
@@ -50,4 +51,4 @@
 	</div>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_success.jsp
index 23f8508..41c5176 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_success.jsp
@@ -29,7 +29,7 @@
 </head>
 <body>
 
-	<p>Email with instructions for password reset sent to <c:out value="${it.user.email}"/></p>
+	<p>Email with instructions for password reset sent to <c:out value="${it.user.email}" escapeXml="true"/></p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/authorize_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/authorize_form.jsp
index 6b1b8b2..ed934a7 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/authorize_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/authorize_form.jsp
@@ -2,6 +2,7 @@
 	pageEncoding="ISO-8859-1"%>
 <%@ page import="org.apache.usergrid.rest.AbstractContextResource"%>
 <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
@@ -28,13 +29,13 @@
 <body>
 
 	<div class="dialog-area">
-		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
 		<form class="dialog-form" action="" method="post">
-			<input type="hidden" name="response_type" value="${it.responseType}">
-			<input type="hidden" name="client_id" value="${it.clientId}">
-			<input type="hidden" name="redirect_uri" value="${it.redirectUri}">
-			<input type="hidden" name="scope" value="${it.scope}">
-			<input type="hidden" name="state" value="${it.state}">
+			<input type="hidden" name="response_type" value="${fn:escapeXml(it.responseType)}">
+			<input type="hidden" name="client_id" value="${fn:escapeXml(it.clientId)}">
+			<input type="hidden" name="redirect_uri" value="${fn:escapeXml(it.redirectUri)}">
+			<input type="hidden" name="scope" value="${fn:escapeXml(it.scope)}">
+			<input type="hidden" name="state" value="${fn:escapeXml(it.state)}">
 			<fieldset>
 				<p>
 					<label for="username">Username</label>
@@ -56,4 +57,4 @@
 	</div>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/error.jsp
@@ -27,7 +27,7 @@
 </head>
 <body>
 
-	<p>An error occurred <c:out value="${it}"/>.</p>
+	<p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/activate.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/activate.jsp
index 85114cd..f5fa14d 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/activate.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/activate.jsp
@@ -26,7 +26,7 @@
 </head>
 <body>
 
-	<p>Your organization <c:out value="${it.organization.name}"/> has been successfully activated.</p>
+	<p>Your organization <c:out value="${it.organization.name}" escapeXml="true"/> has been successfully activated.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/confirm.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/confirm.jsp
index f4307b7..5fb41c7 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/confirm.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/confirm.jsp
@@ -26,8 +26,8 @@
 </head>
 <body>
 
-	<p>Your organization <c:out value="${it.organization.name}"/> has been successfully confirmed.
+	<p>Your organization <c:out value="${it.organization.name}" escapeXml="true"/> has been successfully confirmed.
 	You will received an email soon to let you know when you organization has been activated</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/error.jsp
@@ -27,7 +27,7 @@
 </head>
 <body>
 
-	<p>An error occurred <c:out value="${it}"/>.</p>
+	<p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/activate.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/activate.jsp
index dfcf3b7..20e69b8 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/activate.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/activate.jsp
@@ -26,7 +26,7 @@
 </head>
 <body>
 
-	<p>Your account with email address <c:out value="${it.user.email}"/> has been successfully activated.</p>
+	<p>Your account with email address <c:out value="${it.user.email}" escapeXml="true"/> has been successfully activated.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/confirm.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/confirm.jsp
index 02e9ee3..d7f3acc 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/confirm.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/confirm.jsp
@@ -26,8 +26,8 @@
 </head>
 <body>
 
-	<p>Your account with email address <c:out value="${it.user.email}"/> has been successfully confirmed.
+	<p>Your account with email address <c:out value="${it.user.email}" escapeXml="true"/> has been successfully confirmed.
 	You will received an email soon to let you know when you account has been activated</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/error.jsp
@@ -27,7 +27,7 @@
 </head>
 <body>
 
-	<p>An error occurred <c:out value="${it}"/>.</p>
+	<p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_form.jsp
index 3e56cd1..6341d60 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_form.jsp
@@ -1,6 +1,7 @@
 <%@ page language="java" contentType="text/html; charset=ISO-8859-1"
 	pageEncoding="ISO-8859-1"%>
 <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
@@ -27,13 +28,13 @@
 <body>
 	<div class="dialog-area password-reset-form">
 		<c:if test="${!empty it.errorMsg}">
-			<div class="dialog-form-message">${it.errorMsg}</div>
+			<div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div>
 		</c:if>
 		<form class="dialog-form" action="" method="post">
 			<fieldset>
 				<p>
 					Enter the captcha to have your password reset instructions sent to
-					<c:out value="${it.user.email}" />
+					<c:out value="${it.user.email}" escapeXml="true" />
 				</p>
 				<p id="human-proof"></p>
 				${it.reCaptchaHtml}
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_success.jsp
index 23f8508..41c5176 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_success.jsp
@@ -29,7 +29,7 @@
 </head>
 <body>
 
-	<p>Email with instructions for password reset sent to <c:out value="${it.user.email}"/></p>
+	<p>Email with instructions for password reset sent to <c:out value="${it.user.email}" escapeXml="true"/></p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_form.jsp
index a83d80d..6334466 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_form.jsp
@@ -30,12 +30,12 @@
 <body>
 
 	<div class="dialog-area">
-		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
 		<form class="dialog-form" action="" method="post">
-			<input type="hidden" name="token" value="${it.token}">
+			<input type="hidden" name="token" value="${fn:escapeXml(it.token)}">
 			<fieldset>
 				<p>
-					<label for="password1">Please enter your new password for <c:out value="${it.user.email}"/>.</label>
+					<label for="password1">Please enter your new password for <c:out value="${it.user.email}" escapeXml="true"/>.</label>
 				</p>
 				<p>
 					<input class="text_field" id="password1" name="password1" type="password" />
@@ -54,4 +54,4 @@
 	</div>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_success.jsp
index 9de90ba..3915084 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_success.jsp
@@ -29,7 +29,7 @@
 </head>
 <body>
 
-	<p>New password set for <c:out value="${it.user.email}"/></p>
+	<p>New password set for <c:out value="${it.user.email}" escapeXml="true"/></p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/error.jsp
@@ -27,7 +27,7 @@
 </head>
 <body>
 
-	<p>An error occurred <c:out value="${it}"/>.</p>
+	<p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_form.jsp
index 8643016..f8cf496 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_form.jsp
@@ -27,7 +27,7 @@
 <body>
 
 	<div class="dialog-area">
-		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+		<c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
 		<form class="dialog-form" action="" method="post">
 			<fieldset>
 				<p>
@@ -47,4 +47,4 @@
 	</div>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_success.jsp
index 23f8508..41c5176 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_success.jsp
@@ -29,7 +29,7 @@
 </head>
 <body>
 
-	<p>Email with instructions for password reset sent to <c:out value="${it.user.email}"/></p>
+	<p>Email with instructions for password reset sent to <c:out value="${it.user.email}" escapeXml="true"/></p>
 
 </body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/test/resources/project.properties b/stack/rest/src/test/resources/project.properties
index 94ef3bd..20a38f6 100644
--- a/stack/rest/src/test/resources/project.properties
+++ b/stack/rest/src/test/resources/project.properties
@@ -15,4 +15,4 @@
 # limitations under the License.
 
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
diff --git a/stack/rest/src/test/resources/usergrid-custom-test.properties b/stack/rest/src/test/resources/usergrid-custom-test.properties
index cbec81f..ee7b039 100644
--- a/stack/rest/src/test/resources/usergrid-custom-test.properties
+++ b/stack/rest/src/test/resources/usergrid-custom-test.properties
@@ -18,7 +18,7 @@
 cassandra.startup=external
 cassandra.connections=30
 cassandra.timeout.pool=20000
-cassandra.timeout=20000
+cassandra.timeout=25000
 
 
 hystrix.threadpool.graph_user.coreSize=1200
@@ -68,7 +68,7 @@
 collection.uniquevalues.actors=300
 collection.uniquevalues.authoritative.region=us-east
 
-elasticsearch.queue_impl=DISTRIBUTED
+elasticsearch.queue_impl=LOCAL
 
 # Queueing Test Settings
 # Reduce the long polling time for the tests
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index b1df1b4..29fa311 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -102,7 +102,7 @@
                     <useSystemClassLoader>false</useSystemClassLoader>
                     <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                         -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         ${ug.argline} -Dlog4j.configuration=file:${basedir}/src/test/resources/log4j.properties
                     </argLine>
                     <includes>
@@ -499,7 +499,7 @@
                         <configuration>
                             <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                                 -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 ${ug.argline}
                                 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec
                             </argLine>
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
index 2b88b07..8b840d6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
@@ -17,11 +17,7 @@
 package org.apache.usergrid.management;
 
 
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 
 import org.apache.usergrid.persistence.CredentialsInfo;
 import org.apache.usergrid.persistence.Entity;
@@ -372,6 +368,10 @@
 
 	Observable<Id> deleteAllEntities(final UUID applicationId,final int limit);
 
+    Collection<String> passwordPolicyCheck(String password, boolean isAdminUser);
+
+    String getPasswordDescription(boolean isAdminUser);
+
 
     // DO NOT REMOVE BELOW METHODS, THEY ARE HERE TO ALLOW EXTERNAL CLASSES TO OVERRIDE AND HOOK INTO POST PROCESSING
     void createOrganizationPostProcessing( final OrganizationInfo orgInfo,
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index 2ba9bde..89375fd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -3412,6 +3412,16 @@
         return service.deleteAllEntities(CpNamingUtils.getApplicationScope(applicationId),limit);
     }
 
+    @Override
+    public Collection<String> passwordPolicyCheck(String password, boolean isAdminUser) {
+        return passwordPolicy.policyCheck(password, isAdminUser);
+    }
+
+    @Override
+    public String getPasswordDescription(boolean isAdminUser) {
+        return passwordPolicy.getDescription(isAdminUser);
+    }
+
     private String getProperty(String key) {
         String obj = properties.getProperty(key);
         if(StringUtils.isEmpty(obj))
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ActivitiesServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ActivitiesServiceIT.java
index c4762f2..c8a446a 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ActivitiesServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ActivitiesServiceIT.java
@@ -93,11 +93,14 @@
 
         app.testRequest( ServiceAction.GET, 4, null, "users", userC.getUuid(), "feed" );
 
-        app.testRequest( ServiceAction.GET, 2, null, "users", userC.getUuid(), "feed",
-                Query.fromQL( "select * where content contains 'cookie'" ) );
+        // time for indexing
+        Thread.sleep(10000);
 
         app.testRequest( ServiceAction.GET, 1, "users", userC.getUuid(), "feed",
-                Query.fromQL( "select * where verb='post' and content contains 'cookie'" ) );
+            Query.fromQL( "select * where verb='post' and content contains 'cookie'" ) );
+
+        app.testRequest( ServiceAction.GET, 2, null, "users", userC.getUuid(), "feed",
+                Query.fromQL( "select * where content contains 'cookie'" ) );
 
         app.put( "username", "finn" );
         app.put( "email", "finn@ooo.com" );
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
index 81dced1..434fd80 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
@@ -102,6 +102,9 @@
 
         app.testRequest( ServiceAction.GET, 2, "users", "edanuff", "likes", "restaurants" );
 
+        // time for indexing
+        Thread.sleep(10000);
+
         app.testRequest( ServiceAction.GET, 1, "users", "edanuff", "likes", "restaurants",
                 Query.fromQL( "select * where name='Brickhouse'" ) );
 
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
index c035192..9b3dfcd 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
@@ -49,9 +49,9 @@
 
     protected Notification notificationWaitForComplete(Notification notification)
             throws Exception {
-        long timeout = System.currentTimeMillis() + 60000;
+        long timeout = System.currentTimeMillis() + 120000;
         while (System.currentTimeMillis() < timeout) {
-            app.waitForQueueDrainAndRefreshIndex(200);
+            app.waitForQueueDrainAndRefreshIndex(1000);
             notification = app.getEntityManager().get(notification.getUuid(), Notification.class);
             if (notification.getFinished() != null) {
                 return notification;
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 8360009..ecc5443 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -632,7 +632,7 @@
 
 
         // receipts are created and queried, wait a bit longer for this to happen as indexing
-        app.waitForQueueDrainAndRefreshIndex(500);
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // get the receipts entity IDs
         List<EntityRef> receipts = getNotificationReceipts(notification);
diff --git a/stack/services/src/test/resources/project.properties b/stack/services/src/test/resources/project.properties
index d38e878..03736c0 100644
--- a/stack/services/src/test/resources/project.properties
+++ b/stack/services/src/test/resources/project.properties
@@ -16,4 +16,4 @@
 
 target.directory=${project.build.directory}
 resources.dir=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
diff --git a/stack/services/src/test/resources/usergrid-custom-test.properties b/stack/services/src/test/resources/usergrid-custom-test.properties
index bcc8b8e..4178a07 100644
--- a/stack/services/src/test/resources/usergrid-custom-test.properties
+++ b/stack/services/src/test/resources/usergrid-custom-test.properties
@@ -35,7 +35,7 @@
 
 elasticsearch.buffer_timeout=1
 elasticsearch.queue_impl.resolution=true
-elasticsearch.queue_impl=DISTRIBUTED
+elasticsearch.queue_impl=LOCAL
 
 # Queueing Test Settings
 queue.long.polling.time.millis=50
diff --git a/stack/test-utils/pom.xml b/stack/test-utils/pom.xml
index bbcf1ff..f99d43a 100644
--- a/stack/test-utils/pom.xml
+++ b/stack/test-utils/pom.xml
@@ -59,7 +59,7 @@
                        <threadCount>${usergrid.it.threads}</threadCount>
                        <threadCountClasses></threadCountClasses>
                        <reuseForks>true</reuseForks>
-                       <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8  -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+                       <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8  -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
                         <includes>
                            <include>**/CassandraResourceITSuite.java</include>
                         </includes>
@@ -290,7 +290,7 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <version>${surefire.plugin.version}</version>
                         <configuration>
-                            <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+                            <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
                         </configuration>
                     </plugin>
                 </plugins>
diff --git a/stack/test-utils/src/test/resources/project.properties b/stack/test-utils/src/test/resources/project.properties
index cd5b819..0bc9bb7 100644
--- a/stack/test-utils/src/test/resources/project.properties
+++ b/stack/test-utils/src/test/resources/project.properties
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index cbd2c1e..b34b068 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -61,7 +61,7 @@
             <storage-config>${basedir}/src/test/conf</storage-config>
           </systemPropertyVariables>
           <forkMode>always</forkMode>
-          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
         </configuration>
 
       </plugin>
diff --git a/stack/websocket/pom.xml b/stack/websocket/pom.xml
index af5ed56..72a5c9d 100644
--- a/stack/websocket/pom.xml
+++ b/stack/websocket/pom.xml
@@ -70,7 +70,7 @@
             <storage-config>${basedir}/src/test/conf</storage-config>
           </systemPropertyVariables>
           <forkMode>always</forkMode>
-          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
         </configuration>
       </plugin>
     </plugins>