Merge commit 'refs/pull/596/head' of github.com:apache/usergrid
diff --git a/stack/README.md b/stack/README.md
index 268cb7e..6166da2 100644
--- a/stack/README.md
+++ b/stack/README.md
@@ -3,7 +3,7 @@
 A highly-scalable data platform for mobile applications.
 
 * **Documentation**: http://usergrid.apache.org/docs/
-* **Homepage**: http://http://usergrid.apache.org/
+* **Homepage**: http://usergrid.apache.org/
 
 
 ## Requirements
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 909c073..841e978 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -181,6 +181,8 @@
         bind( ApplicationService.class ).to( ApplicationServiceImpl.class );
 
         bind( StatusService.class ).to( StatusServiceImpl.class );
+
+        bind(ApplicationRestorePasswordService.class).to(ApplicationRestorePasswordServiceImpl.class);
     }
 
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 7a4c781..0681b7a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -638,7 +638,9 @@
 
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
-        if ( !skipIndexingForType( entityId.getType() ) ) {
+        // may still want to delete index entries even if indexing is turned off for new updates
+        if ( entityManagerFig.deindexDeletedWhenCollectionIndexingOff() ||
+            !skipIndexingForType( entityId.getType() ) ) {
             indexService.queueEntityDelete( applicationScope, entityId );
         }
 
@@ -1244,8 +1246,15 @@
 
     @Override
     public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName ) throws Exception {
+        return getDictionaryAsMap(entity, dictionaryName, true);
+    }
 
-        entity = validate( entity );
+
+    @Override
+    public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName,
+                                                   boolean forceVerification) throws Exception {
+
+        entity = validate( entity, forceVerification);
 
         Map<Object, Object> dictionary = new LinkedHashMap<Object, Object>();
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 3c8a53f..23cf1c3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -42,6 +42,10 @@
     @Default( "false" )
     boolean getDeindexOnUpdate();
 
+    @Key( "usergrid.entityManager.deindex_deleted_when_collection_indexing_off")
+    @Default( "true" )
+    boolean deindexDeletedWhenCollectionIndexingOff();
+
     /**
      * Comma-separated list of one or more Amazon regions to use if multiregion
      * is set to true.
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 1eb5e03..dbec084 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -301,14 +301,14 @@
 
 
         return indexService.deIndexOldVersions( applicationScope, entityId,
-            getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion));
+            getVersionsOlderThanMarked(ecm, entityId, markedVersion));
 
 
     }
 
 
-    private List<UUID> getVersionsOlderThanOrEqualToMarked(final EntityCollectionManager ecm,
-                                                           final Id entityId, final UUID markedVersion ){
+    private List<UUID> getVersionsOlderThanMarked(final EntityCollectionManager ecm, final Id entityId,
+                                                  final UUID markedVersion ){
 
         final List<UUID> versions = new ArrayList<>();
 
@@ -317,7 +317,7 @@
         ecm.getVersionsFromMaxToMin( entityId, markedVersion)
             .take(100)
             .forEach( mvccLogEntry -> {
-                if ( mvccLogEntry.getVersion().timestamp() <= markedVersion.timestamp() ) {
+                if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) {
                     versions.add(mvccLogEntry.getVersion());
                 }
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 32470f6..b1d493e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -199,6 +199,11 @@
 
             Object fields = jsonMapData.get("fields");
 
+            // if "fields" field doesn't exist, should treat like fields=all
+            if ( fields == null ) {
+                return Optional.absent();
+            }
+
             if ( fields != null && fields instanceof String && "all".equalsIgnoreCase(fields.toString())) {
                 return Optional.absent();
             }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index b9238e5..48c3908 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.utils.StringUtils;
+
 /**
  * An interface for re-indexing all entities in an application
  */
@@ -47,6 +49,13 @@
      */
     ReIndexStatus getStatus( final String jobId );
 
+    /**
+     * Get the status of a collection job
+     * @param collectionName The collectionName for the rebuild index
+     * @return
+     */
+    ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName );
+
 
     /**
      * The response when requesting a re-index operation
@@ -56,14 +65,27 @@
         final Status status;
         final long numberProcessed;
         final long lastUpdated;
+        final String collectionName;
 
 
         public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
-                              final long lastUpdated ) {
-            this.jobId = jobId;
+                              final long lastUpdated, final String collectionName ) {
+
+            if(StringUtils.isNotEmpty(jobId)){
+                this.jobId = jobId;
+            }else {
+                this.jobId = "";
+            }
+
             this.status = status;
             this.numberProcessed = numberProcessed;
             this.lastUpdated = lastUpdated;
+
+            if(StringUtils.isNotEmpty(collectionName)){
+                this.collectionName = collectionName;
+            }else {
+                this.collectionName = "";
+            }
         }
 
 
@@ -74,6 +96,13 @@
             return jobId;
         }
 
+        /**
+         * Get the collectionName used to resume this operation
+         */
+        public String getCollectionName() {
+            return collectionName;
+        }
+
 
         /**
          * Get the last updated time, as a long
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 05602fc..036f89c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -75,6 +75,7 @@
     private static final String MAP_COUNT_KEY = "count";
     private static final String MAP_STATUS_KEY = "status";
     private static final String MAP_UPDATED_KEY = "lastUpdated";
+    private static final String MAP_SEPARATOR = "|||";
 
 
     private final AllApplicationsObservable allApplicationsObservable;
@@ -140,7 +141,9 @@
 
         // create an observable that loads a batch to be indexed
 
-        if(reIndexRequestBuilder.getCollectionName().isPresent()) {
+        final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent();
+
+        if(isForCollection) {
 
             String collectionName =  InflectionUtils.pluralize(
                 CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() ));
@@ -175,12 +178,36 @@
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
                 }
-                writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
-            .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+                if( isForCollection ){
+                    writeStateMetaForCollection(
+                        appId.get().getApplication().getUuid().toString(),
+                        reIndexRequestBuilder.getCollectionName().get(),
+                        Status.INPROGRESS, count.get(),
+                        System.currentTimeMillis() );
+                }else{
+                    writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() );
+                }
+            })
+            .doOnCompleted(() ->{
+                if( isForCollection ){
+                    writeStateMetaForCollection(
+                        appId.get().getApplication().getUuid().toString(),
+                        reIndexRequestBuilder.getCollectionName().get(),
+                        Status.COMPLETE, count.get(),
+                        System.currentTimeMillis() );
+                }else {
+                    writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis());
+                }
+            })
             .subscribeOn( Schedulers.io() ).subscribe();
 
+        if(isForCollection){
+            return new ReIndexStatus( "", Status.STARTED, 0, 0, CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get()) );
 
-        return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
+        }
+
+
+        return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" );
     }
 
 
@@ -196,38 +223,15 @@
         return getIndexResponse( jobId );
     }
 
-
-    /**
-     * Simple collector that counts state, then flushed every time a buffer is provided.  Writes final state when complete
-     */
-    private class FlushingCollector {
-
-        private final String jobId;
-        private long count;
-
-
-        private FlushingCollector( final String jobId ) {
-            this.jobId = jobId;
-        }
-
-
-        public void flushBuffer( final List<EdgeScope> buffer ) {
-            count += buffer.size();
-
-            //write our cursor state
-            if ( buffer.size() > 0 ) {
-                writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
-            }
-
-            writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
-        }
-
-        public void complete(){
-            writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
-        }
+    @Override
+    public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) {
+        Preconditions.checkNotNull( collectionName, "appIdString must not be null" );
+        Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
+        return getIndexResponseForCollection( appIdString, collectionName );
     }
 
 
+
     /**
      * Get the resume edge scope
      *
@@ -346,7 +350,7 @@
         final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
 
         if(stringStatus == null){
-           return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 );
+           return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" );
         }
 
         final Status status = Status.valueOf( stringStatus );
@@ -354,7 +358,41 @@
         final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
         final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
 
-        return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
+        return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" );
+    }
+
+
+    private void writeStateMetaForCollection(final String appIdString, final String collectionName,
+                                             final Status status, final long processedCount, final long lastUpdated ) {
+
+    	String prefixedColName = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() );
+    	if(logger.isDebugEnabled()) {
+            logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}",
+            		collectionName, status, processedCount, lastUpdated);
+        }
+
+        mapManager.putString( appIdString + MAP_SEPARATOR + prefixedColName + MAP_STATUS_KEY, status.name() );
+        mapManager.putLong( appIdString + MAP_SEPARATOR + prefixedColName + MAP_COUNT_KEY, processedCount );
+        mapManager.putLong( appIdString + MAP_SEPARATOR + prefixedColName + MAP_UPDATED_KEY, lastUpdated );
+    }
+
+
+    private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) {
+
+        String prefixedColName = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() );
+    	final String stringStatus =
+            mapManager.getString( appIdString + MAP_SEPARATOR + prefixedColName + MAP_STATUS_KEY );
+
+        if(stringStatus == null){
+            return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName );
+        }
+
+        final Status status = Status.valueOf( stringStatus );
+
+        final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + prefixedColName + MAP_COUNT_KEY );
+        final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + prefixedColName + MAP_UPDATED_KEY );
+
+        return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName );
     }
 }
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 88b5001..7aa614a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -28,6 +28,10 @@
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.index.query.ParsedQuery;
+import org.apache.usergrid.persistence.index.query.ParsedQueryBuilder;
+
+import static org.apache.usergrid.persistence.Query.MAX_LIMIT;
 
 
 /**
@@ -43,16 +47,24 @@
     // it can happen if ES was not updated or has yet to be updated.
     private final boolean keepStaleEntries;
     private String query;
+    private ParsedQuery parsedQuery;
 
 
     public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id, boolean keepStaleEntries, String query ) {
 
         this.applicationScope = applicationScope;
-        this.requestCursor = requestCursor;
-        this.limit = limit;
         this.id = id;
         this.keepStaleEntries = keepStaleEntries;
         this.query = query;
+        this.parsedQuery = ParsedQueryBuilder.build(query);
+        if (parsedQuery != null && parsedQuery.isDirectQuery()) {
+            // for direct query, use no limit or cursor
+            this.limit = MAX_LIMIT + 1;
+            this.requestCursor = new RequestCursor(Optional.absent());
+        } else {
+            this.limit = limit;
+            this.requestCursor = requestCursor;
+        }
     }
 
 
@@ -98,4 +110,8 @@
         return query;
     }
 
+    public ParsedQuery getParsedQuery() {
+        return parsedQuery;
+    }
+
 }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
index 7a46507..cc409b0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
@@ -129,7 +129,7 @@
 
                     try {
                         final CandidateResults candidateResults =
-                            applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet,
+                            applicationEntityIndex.search( searchEdge, searchTypes, pipelineContext.getParsedQuery(), limit, currentOffSet,
                                 propertiesWithType, analyzeOnly, returnQuery);
 
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 20bcfe9..c0db02f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -21,13 +21,18 @@
 
 
 import java.util.*;
+import java.util.logging.Filter;
 
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.index.query.ParsedQuery;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.DistanceField;
 import org.apache.usergrid.persistence.model.field.EntityObjectField;
 import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.field.StringField;
 import org.apache.usergrid.persistence.model.field.value.EntityObject;
 import org.apache.usergrid.utils.DateUtils;
 import org.slf4j.Logger;
@@ -99,6 +104,8 @@
 
         boolean keepStaleEntries = pipelineContext.getKeepStaleEntries();
         String query = pipelineContext.getQuery();
+        ParsedQuery parsedQuery = pipelineContext.getParsedQuery();
+        boolean isDirectQuery = parsedQuery == null ? false : parsedQuery.isDirectQuery();
 
         //buffer them to get a page size we can make 1 network hop
         final Observable<FilterResult<Entity>> searchIdSetObservable =
@@ -107,7 +114,12 @@
             //load them
             .flatMap( candidateResults -> {
 
-                //flatten toa list of ids to load
+                if (isDirectQuery) {
+                    // add ids for direct query
+                    updateDirectQueryCandidateResults(entityCollectionManager, candidateResults);
+                }
+
+                //flatten to a list of ids to load
                 final Observable<List<Candidate>> candidates =
                     Observable.from(candidateResults)
                         .map(filterResultCandidate -> filterResultCandidate.getValue()).toList();
@@ -118,12 +130,13 @@
                         Observable<EntitySet> entitySets = Observable.from(candidatesList)
                             .map(candidateEntry -> candidateEntry.getCandidateResult().getId()).toList()
                             .flatMap(idList -> entityCollectionManager.load(idList));
-                        //now we have a collection, validate our canidate set is correct.
+                        //now we have a collection, validate our candidate set is correct.
                         return entitySets.map(
                             entitySet -> new EntityVerifier(
                                 applicationIndex.createBatch(), entitySet, candidateResults,indexProducer)
                         )
-                            .doOnNext(entityCollector -> entityCollector.merge(keepStaleEntries, query))
+                            .doOnNext(entityCollector -> entityCollector.merge(keepStaleEntries, query,
+                                isDirectQuery))
                             .flatMap(entityCollector -> Observable.from(entityCollector.getResults()))
                             .map(entityFilterResult -> {
                                 final Entity entity = entityFilterResult.getValue();
@@ -222,6 +235,28 @@
 
 
     /**
+     * Update direct query candidates to add IDs.
+     */
+    private void updateDirectQueryCandidateResults(
+        EntityCollectionManager entityCollectionManager, List<FilterResult<Candidate>> candidatesList) {
+        for (FilterResult<Candidate> filterCandidate : candidatesList) {
+            Candidate candidate = filterCandidate.getValue();
+            CandidateResult candidateResult = candidate.getCandidateResult();
+            String entityType = candidateResult.getDirectEntityType();
+            Id entityId = null;
+            if (candidateResult.isDirectQueryName()) {
+                entityId = entityCollectionManager.getIdField( entityType,
+                    new StringField( Schema.PROPERTY_NAME, candidateResult.getDirectEntityName() ) )
+                    .toBlocking() .lastOrDefault( null );
+            } else if (candidateResult.isDirectQueryUUID()) {
+                entityId = new SimpleId(candidateResult.getDirectEntityUUID(), entityType);
+            }
+            filterCandidate.getValue().getCandidateResult().setId(entityId);
+        }
+    }
+
+
+    /**
      * Our collector to collect entities.  Not quite a true collector, but works within our operational
      * flow as this state is mutable and difficult to represent functionally
      */
@@ -234,6 +269,7 @@
         private final List<FilterResult<Candidate>> candidateResults;
         private final IndexProducer indexProducer;
         private final EntitySet entitySet;
+        private List<FilterResult<Candidate>> dedupedCandidateResults = new ArrayList<>();
 
 
         public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
@@ -250,14 +286,33 @@
         /**
          * Merge our candidates and our entity set into results
          */
-        public void merge(boolean keepStaleEntries, String query) {
+        public void merge(boolean keepStaleEntries, String query, boolean isDirectQuery) {
 
-            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
-                validate( candidateResult , keepStaleEntries, query);
+            if (!isDirectQuery) {
+                filterDuplicateCandidates(query);
+            } else {
+                // remove direct query duplicates or missing entities (names that don't exist will have null ids)
+                Set<UUID> foundUUIDs = new HashSet<>();
+                for (FilterResult<Candidate> candidateFilterResult : candidateResults) {
+                    Id id = candidateFilterResult.getValue().getCandidateResult().getId();
+                    if (id != null) {
+                        UUID uuid = id.getUuid();
+                        if (!foundUUIDs.contains(uuid)) {
+                            dedupedCandidateResults.add(candidateFilterResult);
+                            foundUUIDs.add(uuid);
+                        }
+                    }
+                }
             }
 
-            indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
+            for (final FilterResult<Candidate> candidateResult : dedupedCandidateResults) {
+                validate(candidateResult, keepStaleEntries, query, isDirectQuery);
+            }
 
+            // no index requests made for direct query, so no need to modify index
+            if (!isDirectQuery) {
+                indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
+            }
         }
 
 
@@ -287,14 +342,78 @@
         }
 
 
-        private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query ) {
+        // don't need to worry about whether we are keeping stale entries -- this will remove candidates that are
+        // older than others in the result set
+        private void filterDuplicateCandidates(String query) {
+
+            Map<Id, UUID> latestEntityVersions = new HashMap<>();
+
+            // walk through candidates and find latest version for each entityID
+            for ( final FilterResult<Candidate> filterResult : candidateResults ) {
+                final Candidate candidate = filterResult.getValue();
+                final CandidateResult candidateResult = candidate.getCandidateResult();
+                final Id candidateId = candidateResult.getId();
+                final UUID candidateVersion = candidateResult.getVersion();
+
+                UUID previousCandidateVersion = latestEntityVersions.get(candidateId);
+                if (previousCandidateVersion != null) {
+                    // replace if newer
+                    if (UUIDComparator.staticCompare(candidateVersion, previousCandidateVersion) > 0) {
+                        latestEntityVersions.put(candidateId, candidateVersion);
+                    }
+                } else {
+                    latestEntityVersions.put(candidateId, candidateVersion);
+                }
+            }
+
+            // walk through candidates again, saving newest results and deindexing older
+            for ( final FilterResult<Candidate> filterResult : candidateResults ) {
+                final Candidate candidate = filterResult.getValue();
+                final CandidateResult candidateResult = candidate.getCandidateResult();
+                final Id candidateId = candidateResult.getId();
+                final UUID candidateVersion = candidateResult.getVersion();
+
+                final UUID latestCandidateVersion = latestEntityVersions.get(candidateId);
+
+                if (candidateVersion.equals(latestCandidateVersion)) {
+                    // save candidate
+                    dedupedCandidateResults.add(filterResult);
+                } else {
+                    // deindex if not the current version in database
+                    final MvccEntity entity = entitySet.getEntity( candidateId );
+                    final UUID databaseVersion = entity.getVersion();
+
+                    if (!candidateVersion.equals(databaseVersion)) {
+                        Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
+                        Date entityTimeStamp = UUIDTimeStampToDate(databaseVersion);
+
+                        logger.warn( "Found old stale entity on edge {} for entityId {} Entity version {} ({}).  Candidate version {} ({}). Will not be returned in result set. Query = [{}]",
+                            candidate.getSearchEdge(),
+                            entity.getId().getUuid(),
+                            databaseVersion,
+                            DateUtils.instance.formatIso8601Date(entityTimeStamp),
+                            candidateVersion,
+                            DateUtils.instance.formatIso8601Date(candidateTimeStamp),
+                            query
+                        );
+
+                        final SearchEdge searchEdge = candidate.getSearchEdge();
+                        batch.deindex(searchEdge, entity.getId(), candidateVersion);
+                    }
+                }
+            }
+        }
+
+
+        private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query,
+                               boolean isDirectQuery) {
 
             final Candidate candidate = filterResult.getValue();
             final CandidateResult candidateResult = candidate.getCandidateResult();
             final boolean isGeo = candidateResult instanceof GeoCandidateResult;
             final SearchEdge searchEdge = candidate.getSearchEdge();
             final Id candidateId = candidateResult.getId();
-            final UUID candidateVersion = candidateResult.getVersion();
+            UUID candidateVersion = candidateResult.getVersion();
 
 
             final MvccEntity entity = entitySet.getEntity( candidateId );
@@ -302,9 +421,14 @@
 
             //doesn't exist warn and drop
             if ( entity == null ) {
-                logger.warn(
-                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra.  Ignoring since this could be a region sync issue",
-                    candidateId, candidateVersion );
+                if (!isDirectQuery) {
+                    logger.warn(
+                        "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra.  Ignoring since this could be a region sync issue",
+                        candidateId, candidateVersion);
+                } else {
+                    logger.warn(
+                        "Direct query for entityId {} was not found in cassandra, query=[{}]", candidateId, query);
+                }
 
 
                 //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
@@ -314,29 +438,42 @@
             }
 
 
-            final UUID entityVersion = entity.getVersion();
+            final UUID databaseVersion = entity.getVersion();
+            if (isDirectQuery) {
+                // use returned (latest) version for direct query
+                candidateVersion = databaseVersion;
+            }
             final Id entityId = entity.getId();
 
-            //entity is newer than ES version, could be a missed or slow index event
-            if ( UUIDComparator.staticCompare(entityVersion, candidateVersion) > 0 ) {
+            // The entity is marked as deleted
+            if (!entity.getEntity().isPresent() || entity.getStatus() == MvccEntity.Status.DELETED ) {
+
+                // when updating entities, we don't delete all previous versions from ES so this action is expected
+                if(logger.isDebugEnabled()){
+                    logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}",
+                        searchEdge, entityId, databaseVersion);
+                }
+
+                batch.deindex( searchEdge, entityId, candidateVersion );
+                return;
+            }
+
+            // entity exists and is newer than ES version, could be a missed or slow index event
+            if ( UUIDComparator.staticCompare(databaseVersion, candidateVersion) > 0 ) {
 
                Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
-               Date entityTimeStamp = UUIDTimeStampToDate(entityVersion);
+               Date entityTimeStamp = UUIDTimeStampToDate(databaseVersion);
 
-               Map<String,String> fields = new HashMap<>();
-               for  (Field field : entity.getEntity().get().getFields()) {
-                   fields.put(field.getName(),String.valueOf(field.getValue()));
-               }
-
-               logger.warn( "Found stale entity on edge {} for entityId {} Entity version date = {}.  Candidate version date = {}. Will be returned in result set = {} Query = [{}] Entity fields = {}",
-                   searchEdge,
-                   entityId.getUuid(),
-                   DateUtils.instance.formatIso8601Date(entityTimeStamp),
-                   DateUtils.instance.formatIso8601Date(candidateTimeStamp),
-                   keepStaleEntries,
-                   query,
-                   fields
-               );
+                logger.warn( "Found stale entity on edge {} for entityId {} Entity version {} ({}).  Candidate version {} ({}). Will be returned in result set = {} Query = [{}]",
+                    candidate.getSearchEdge(),
+                    entity.getId().getUuid(),
+                    databaseVersion,
+                    DateUtils.instance.formatIso8601Date(entityTimeStamp),
+                    candidateVersion,
+                    DateUtils.instance.formatIso8601Date(candidateTimeStamp),
+                    keepStaleEntries,
+                    query
+                );
 
                 if (!keepStaleEntries) {
                     batch.deindex(searchEdge, entityId, candidateVersion);
@@ -344,34 +481,20 @@
                 }
             }
 
-
-            // The entity is marked as deleted
-            if (!entity.getEntity().isPresent() || entity.getStatus() == MvccEntity.Status.DELETED ) {
-
-                // when updating entities, we don't delete previous versions from ES so this action is expected
-                if(logger.isDebugEnabled()){
-                    logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}",
-                        searchEdge, entityId, entityVersion);
-                }
-
-                batch.deindex( searchEdge, entityId, candidateVersion );
-                return;
-            }
-
             //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
             //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+            if ( UUIDComparator.staticCompare( candidateVersion, databaseVersion ) > 0 ) {
 
                 logger.warn(
                     "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair should be run",
-                        searchEdge, entityId, entityVersion);
+                        searchEdge, entityId, databaseVersion);
 
                   //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
 
                 return;
             }
 
-            //they're the same add it
+            // add the result
 
             final Entity returnEntity = entity.getEntity().get();
             if(isGeo){
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordService.java
new file mode 100644
index 0000000..99ef57d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordService.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.corepersistence.service;
+
+import java.util.UUID;
+
+public interface ApplicationRestorePasswordService {
+    String getApplicationRestorePassword(final UUID applicationId);
+    void setApplicationRestorePassword(final UUID applicationId, final String restorePassword);
+    void removeApplicationRestorePassword(final UUID applicationId);
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordServiceImpl.java
new file mode 100644
index 0000000..e56bba7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordServiceImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.corepersistence.service;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.UUID;
+
+public class ApplicationRestorePasswordServiceImpl implements ApplicationRestorePasswordService {
+    private final MapManagerFactory mapManagerFactory;
+    final static String restorePasswordNamespace = "appRestorePassword";
+    final static String passwordKey = "password";
+
+    @Inject
+    public ApplicationRestorePasswordServiceImpl(final MapManagerFactory mapManagerFactory) {
+        this.mapManagerFactory = mapManagerFactory;
+    }
+
+    private MapManager getMapManager(final UUID applicationId) {
+        final Id appId = CpNamingUtils.generateApplicationId(applicationId);
+        return mapManagerFactory.createMapManager(new MapScopeImpl(appId, restorePasswordNamespace));
+    }
+
+    @Override
+    public String getApplicationRestorePassword(final UUID applicationId) {
+        Preconditions.checkNotNull(applicationId, "app id is null");
+
+        MapManager mapManager = getMapManager(applicationId);
+        return mapManager.getString(passwordKey);
+
+    }
+
+    @Override
+    public void setApplicationRestorePassword(final UUID applicationId, final String restorePassword) {
+        Preconditions.checkNotNull(applicationId, "app id is null");
+        Preconditions.checkArgument(!StringUtils.isEmpty(restorePassword), "restorePassword is empty");
+
+        MapManager mapManager = getMapManager(applicationId);
+        mapManager.putString(passwordKey, restorePassword);
+
+    }
+
+    @Override
+    public void removeApplicationRestorePassword(final UUID applicationId) {
+        Preconditions.checkNotNull(applicationId, "app id is null");
+
+        MapManager mapManager = getMapManager(applicationId);
+        mapManager.delete(passwordKey);
+    }
+
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index 259e4b9..c476f1f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -84,8 +84,11 @@
         final Optional<String> query = search.getQuery();
 
         final IdBuilder pipelineBuilder =
-            pipelineBuilderFactory.create( search.getApplicationScope() ).withCursor( search.getCursor() )
-                                  .withLimit( search.getLimit() ).fromId( search.getSourceNodeId() );
+            pipelineBuilderFactory.create( search.getApplicationScope() )
+            	.withCursor( search.getCursor() )
+            	.withLimit( search.getLimit() )
+            	.query(query)
+            	.fromId( search.getSourceNodeId() );
 
 
         //we want to load all entities
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index c9752c3..c0e64a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -276,6 +276,10 @@
     public Map<Object, Object> getDictionaryAsMap( EntityRef entityRef, String dictionaryName )
             throws Exception;
 
+    public Map<Object, Object> getDictionaryAsMap( EntityRef entityRef, String dictionaryName,
+                                                   boolean forceVerification )
+            throws Exception;
+
     public Object getDictionaryElementValue( EntityRef entityRef, String dictionaryName,
             String elementName ) throws Exception;
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
index c5833af..51b20c6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
@@ -23,10 +23,14 @@
 
 import org.apache.usergrid.persistence.Query.Level;
 import org.apache.usergrid.utils.InflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class PathQuery<E> {
 
+    private static final Logger logger = LoggerFactory.getLogger( PathQuery.class );
+
     private PathQuery source;
     private Query query;
     private UUID uuid;
@@ -122,10 +126,30 @@
 
         EntityRef ref = new SimpleEntityRef(type,uuid);
 
-        // if it's a single name identifier, just directly fetch that
-        if ( !query.getQl().isPresent() && query.getSingleNameOrEmailIdentifier() != null){
+        if ( !query.getQl().isPresent() && query.getSingleUuidIdentifier() != null) {
 
+            // fetch entity by UUID directly
+            UUID entityUUID = query.getSingleUuidIdentifier();
+
+            if (logger.isTraceEnabled()) {
+                logger.trace("getHeadResults(): identified path uuid: appid={}, collection={}, entityUUID={}",
+                    uuid.toString(), query.getCollection(), entityUUID.toString());
+            }
+
+            String entityType = InflectionUtils.singularize(query.getCollection());
+
+            return em.getEntities(Collections.singletonList(entityUUID), entityType);
+
+        } else if ( !query.getQl().isPresent() && query.getSingleNameOrEmailIdentifier() != null){
+
+            // if it's a single name identifier, just directly fetch that
             String name = query.getSingleNameOrEmailIdentifier();
+
+            if (logger.isTraceEnabled()) {
+                logger.trace("getHeadResults(): identified path name/email: appid={}, collection={}, name={}",
+                    uuid.toString(), query.getCollection(), name);
+            }
+
             String entityType = InflectionUtils.singularize(query.getCollection());
 
             // don't use unique index repair on read only logic
@@ -140,6 +164,12 @@
             return em.getEntities(Collections.singletonList(entityId), entityType);
         }
 
+        if (logger.isTraceEnabled()) {
+            logger.trace("getHeadResults(): sending query to ES: appid={}, collection={}, query=[{}]",
+                uuid.toString(), query.getCollection(),
+                query.getQl().isPresent() ? query.getQl().get() : "NONE");
+        }
+
         return ( query.getCollection() != null ) ?
                em.searchCollection( ref, query.getCollection(), query ) :
                em.searchTargetEntities(ref, query);
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
index 9e5598c..c22a627 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
@@ -17,31 +17,31 @@
 package org.apache.usergrid.persistence;
 
 
-import com.codahale.metrics.MetricRegistry;
-import com.google.inject.Injector;
-import net.jcip.annotations.NotThreadSafe;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.index.*;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilder;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteService;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
-import static org.junit.Assert.*;
+import com.codahale.metrics.MetricRegistry;
 
 
-@NotThreadSafe
+
 public class CollectionDeleteTest extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteTest.class );
 
@@ -199,9 +199,10 @@
 
     private int retryReadData(EntityManager em, String collectionName, int expectedEntities,  int retry) throws Exception {
         int count = -1;
+        Set<Entity> uniqueRemEnts = new HashSet<Entity>();
         do {
             try {
-                count = readData(em, collectionName, expectedEntities);
+                count = readData(em, collectionName, expectedEntities, uniqueRemEnts);
             } catch (Exception ignore) {
                 logger.info( "caught exception ", ignore);
             }
@@ -211,7 +212,7 @@
         return count;
     }
 
-    private int readData(EntityManager em, String collectionName, int expectedEntities)
+    private int readData(EntityManager em, String collectionName, int expectedEntities, Set<Entity> uniqueRemEnts)
         throws Exception {
 
         app.waitForQueueDrainAndRefreshIndex();
@@ -219,45 +220,44 @@
         Results results = em.getCollection(em.getApplicationRef(), collectionName, null, expectedEntities,
             Query.Level.ALL_PROPERTIES, false);
 
-        int count = 0;
-        List<Entity> list = new ArrayList<>();
+        
         while ( true ) {
 
             if (results.getEntities().size() == 0) {
                 break;
             }
+            
 
             UUID lastEntityUUID = null;
             for ( Entity e : results.getEntities() ) {
 
                 assertEquals(2000, e.getProperty("key2"));
 
-                if (count % 100 == 0) {
-                    logger.info("read {} entities", count);
+                if (uniqueRemEnts.size() % 100 == 0) {
+                    logger.info("read {} entities", uniqueRemEnts.size());
                 }
                 lastEntityUUID = e.getUuid();
-                count++;
-                list.add(e);
+                uniqueRemEnts.add(e);
+                logger.info("Found remaining entity {}", lastEntityUUID);
             }
 
             results = em.getCollection(em.getApplicationRef(), collectionName, lastEntityUUID, expectedEntities,
                 Query.Level.ALL_PROPERTIES, false);
 
         }
-        logger.info("read {} total entities", count);
 
-        if (count != expectedEntities) {
-            logger.info("Expected {} did not match actual {}", expectedEntities, count);
-            if (count < 20) {
-                for (Entity e : list) {
+        if (uniqueRemEnts.size() != expectedEntities) {
+            logger.info("Expected {} did not match actual {}", expectedEntities, uniqueRemEnts.size());
+            if (uniqueRemEnts.size() < 20) {
+                for (Entity e : uniqueRemEnts) {
                     Object key = e.getProperty("key2");
-                    logger.info("Entity key {} ceated {}", key, e.getCreated());
+                    logger.info("Entity key {} uuid {} created {}", key,e.getUuid(), e.getCreated());
                 }
             }
         }
 
-        assertEquals( "Did not get expected entities", expectedEntities, count );
-        return count;
+        assertEquals( "Did not get expected entities", expectedEntities, uniqueRemEnts.size() );
+        return uniqueRemEnts.size();
     }
 
     private int countEntities( EntityManager em, String collectionName, int expectedEntities)
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index 57962c0..90af5ba 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -154,12 +154,12 @@
 
         ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex(builder);
 
-        assertNotNull(status.getJobId(), "JobId is present");
+        assertNotNull(status.getCollectionName(), "Collection name is present");
 
         logger.info("Rebuilt index");
 
 
-        waitForRebuild(status, reIndexService);
+        waitForRebuild(em.getApplicationId().toString(), status.getCollectionName(), reIndexService);
 
         //app.waitForQueueDrainAndRefreshIndex(15000);
 
@@ -279,7 +279,7 @@
             logger.info("Rebuilt index, jobID={}", status.getJobId());
 
 
-            waitForRebuild(status, reIndexService);
+            waitForRebuild(status.getJobId(), reIndexService);
 
 
             logger.info("Rebuilt index");
@@ -387,7 +387,7 @@
 
             logger.info("Rebuilt index");
 
-            waitForRebuild(status, reIndexService);
+            waitForRebuild(status.getJobId(), reIndexService);
 
             logger.info("Rebuilt index");
 
@@ -486,7 +486,7 @@
 
             logger.info("Rebuilt index");
 
-            waitForRebuild(status, reIndexService);
+            waitForRebuild(status.getJobId(), reIndexService);
 
             logger.info("Rebuilt index");
 
@@ -505,18 +505,53 @@
     /**
      * Wait for the rebuild to occur
      */
-    private void waitForRebuild(final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService)
+    private void waitForRebuild(final String jobId, final ReIndexService reIndexService)
         throws InterruptedException, IllegalArgumentException {
-        if (status != null) {
-            logger.info("waitForRebuild: jobID={}", status.getJobId());
+        if (jobId != null && !jobId.trim().equals("")) {
+            logger.info("waitForRebuild: jobID={}", jobId);
         } else {
-            logger.info("waitForRebuild: error, status = null");
-            throw new IllegalArgumentException("reindexStatus = null");
+            logger.info("waitForRebuild: error, jobId = null or empty");
+            throw new IllegalArgumentException("jobId = null or empty");
         }
         while (true) {
 
             try {
-                final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus(status.getJobId());
+                final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus(jobId);
+
+                if (updatedStatus == null) {
+                    logger.info("waitForRebuild: updated status is null");
+                } else {
+                    logger.info("waitForRebuild: status={} numberProcessed={}", updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed());
+
+                    if (updatedStatus.getStatus() == ReIndexService.Status.COMPLETE) {
+                        break;
+                    }
+                }
+            } catch (IllegalArgumentException iae) {
+                //swallow.  Thrown if our job can't be found.  I.E hasn't updated yet
+            }
+
+
+            Thread.sleep(1000);
+        }
+    }
+    
+    
+    /**
+     * Wait for the rebuild to occur
+     */
+    private void waitForRebuild(final String appId, final String collectionName, final ReIndexService reIndexService)
+        throws InterruptedException, IllegalArgumentException {
+        if (appId != null && !appId.trim().equals("") && collectionName != null && !collectionName.trim().equals("")) {
+            logger.info("waitForRebuild: appId={} collName={}", appId, collectionName);
+        } else {
+            logger.info("waitForRebuild: error, appId or collName = null or empty");
+            throw new IllegalArgumentException("appId or collName = null or empty");
+        }
+        while (true) {
+
+            try {
+                final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatusForCollection(appId, collectionName);
 
                 if (updatedStatus == null) {
                     logger.info("waitForRebuild: updated status is null");
diff --git a/stack/corepersistence/queryindex/src/main/antlr3/org/apache/usergrid/persistence/index/query/tree/CpQueryFilter.g b/stack/corepersistence/queryindex/src/main/antlr3/org/apache/usergrid/persistence/index/query/tree/CpQueryFilter.g
index 605aed4..d75a617 100644
--- a/stack/corepersistence/queryindex/src/main/antlr3/org/apache/usergrid/persistence/index/query/tree/CpQueryFilter.g
+++ b/stack/corepersistence/queryindex/src/main/antlr3/org/apache/usergrid/persistence/index/query/tree/CpQueryFilter.g
@@ -112,7 +112,7 @@
 
 GT  : '>' | 'gt';
 
-GTE : '>=' |  'gte';  
+GTE : '>=' |  'gte';
 
 
 //keywords before var ids
@@ -134,11 +134,14 @@
 
 OF : ('O'|'o')('F'|'f');
 
-UUID :  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
-  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-' 
-  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-' 
-  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-' 
-  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-' 
+DIRECT : ('D'|'d')('I'|'i')('R'|'r')('E'|'e')('C'|'c')('T'|'t');
+
+UUID :
+  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
+  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
+  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
+  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
+  HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
   HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
   HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
   HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
@@ -156,18 +159,18 @@
     |   '.' ('0'..'9')+ EXPONENT?
     |   ('0'..'9')+ EXPONENT)
     ;
-    
+
 STRING
     :  '\'' ( ESC_SEQ | ~('\\'|'\'') )* '\''
     ;
 
 
-    
+
 WS : (' ' | '\t' | '\n' | '\r' | '\f')+  {$channel=HIDDEN;};
 
 
 
-    
+
 
 
 
@@ -208,12 +211,19 @@
 
 
 
-property :	ID<Property>;
+property :
+  ID<Property>
+  | ASC<Property>
+  | DESC<Property>
+  | CONTAINS<Property>
+  | WITHIN<Property>
+  | DIRECT<Property>
+  ;
 
 containsproperty : ID<ContainsProperty>;
 
 withinproperty : ID<WithinProperty>;
-	
+
 booleanliteral: BOOLEAN<BooleanLiteral>;
 
 
@@ -225,70 +235,96 @@
 
 stringliteral :
   STRING<StringLiteral>;
-  
+
 floatliteral :
   FLOAT<FloatLiteral> ;
 
-//We delegate to each sub class literal so we can get each type	
-value : 
+idliteral :
+  ID<IdLiteral> ;
+
+directnameliteral :
+  ID<IdLiteral> ;
+
+directuuidliteral :
+  UUID<UUIDLiteral>;
+
+//We delegate to each sub class literal so we can get each type
+value :
   booleanliteral
   | longliteral
   | uuidliteral
   | stringliteral
   | floatliteral
   ;
-  
 
+directidliteral :
+  directnameliteral
+  | directuuidliteral
+  ;
 
 //Every operand returns with the name of 'op'.  This is used because all subtrees require operands,
-//this allows us to link the java code easily by using the same name as a converntion
+//this allows us to link the java code easily by using the same name as a convention
 
 //begin search expressions
-  
-//mathmatical equality operations
+
+//mathematical equality operations
 equalityop :
   property LT<LessThan>^ value
   |property LTE<LessThanEqual>^ value
   |property EQ<Equal>^ value
   |property GT<GreaterThan>^ value
   |property GTE<GreaterThanEqual>^ value
-  ; 
+  ;
 
 //geo location search
 locationop :
   withinproperty WITHIN<WithinOperand>^ (floatliteral|longliteral) OF! (floatliteral|longliteral) ','! (floatliteral|longliteral);
-  
+
 //string search
 containsop :
   containsproperty CONTAINS<ContainsOperand>^ stringliteral;
 
+directidlist :
+  directidliteral (','! directidliteral)*;
+
 //
 operation :
- '('! expression ')'!
-   | equalityop 
-   | locationop 
-   | containsop 
+ '('! orexp ')'!
+   | equalityop
+   | locationop
+   | containsop
    ;
 
 //negations of expressions
 notexp :
 //only link if we have the not
- NOT<NotOperand>^ operation  
- |operation 
+ NOT<NotOperand>^ operation
+ |operation
  ;
 
 //and expressions contain operands.  These should always be closer to the leaves of a tree, it allows
 //for faster result intersection sooner in the query execution
 andexp :
  notexp (AND<AndOperand>^ notexp )*;
- 
- 
+
+
 //or expression should always be after AND expressions.  This will give us a smaller result set to union when evaluating trees
 //also a root level expression
-expression :
+orexp :
  andexp (OR<OrOperand>^ andexp )*;
 
 
+directexp :
+ DIRECT<DirectOperand>^ directidlist  ;
+
+
+// can use DIRECT {UUID|ID}+ at the root
+expression :
+  directexp
+  | orexp
+  ;
+
+
 
 //end expressions
 
@@ -300,14 +336,14 @@
 //order clause
 order
   : (property direction?){
-		String property = $property.text; 
+		String property = $property.text;
 		String direction = $direction.text;
 		parsedQuery.addSort(new SortPredicate(property, direction));
-    
+
   };
 
 //end order clauses
-  
+
 //Begin select clauses
 
 select_subject
@@ -317,7 +353,7 @@
 
 };
 
- 
+
 
 select_assign
   : target=ID ':' source=ID {
@@ -326,9 +362,9 @@
 
 };
 
-select_expr 
-  : ('*' | select_subject (',' select_subject) * | '{' select_assign (',' select_assign) * '}');  
-   
+select_expr
+  : ('*' | select_subject (',' select_subject) * | '{' select_assign (',' select_assign) * '}');
+
 //end select clauses
 
 ql returns [ParsedQuery parsedQuery]
@@ -337,7 +373,7 @@
   if($expression.tree instanceof Operand){
     parsedQuery.setRootOperand((Operand)$expression.tree);
   }
-  
+
   retval.parsedQuery = parsedQuery;
 
 
diff --git a/stack/corepersistence/queryindex/src/main/java/CpQueryFilter.tokens b/stack/corepersistence/queryindex/src/main/java/CpQueryFilter.tokens
index 53a0817..a9e7106 100644
--- a/stack/corepersistence/queryindex/src/main/java/CpQueryFilter.tokens
+++ b/stack/corepersistence/queryindex/src/main/java/CpQueryFilter.tokens
@@ -1,4 +1,3 @@
-T__31=31
 T__32=32
 T__33=33
 T__34=34
@@ -8,40 +7,42 @@
 T__38=38
 T__39=39
 T__40=40
+T__41=41
 AND=4
 ASC=5
 BOOLEAN=6
 CONTAINS=7
 DESC=8
-EQ=9
-ESC_SEQ=10
-EXPONENT=11
-FALSE=12
-FLOAT=13
-GT=14
-GTE=15
-HEX_DIGIT=16
-ID=17
-LONG=18
-LT=19
-LTE=20
-NOT=21
-OCTAL_ESC=22
-OF=23
-OR=24
-STRING=25
-TRUE=26
-UNICODE_ESC=27
-UUID=28
-WITHIN=29
-WS=30
-'('=31
-')'=32
-'*'=33
-','=34
-':'=35
-'order by'=36
-'select'=37
-'where'=38
-'{'=39
-'}'=40
+DIRECT=9
+EQ=10
+ESC_SEQ=11
+EXPONENT=12
+FALSE=13
+FLOAT=14
+GT=15
+GTE=16
+HEX_DIGIT=17
+ID=18
+LONG=19
+LT=20
+LTE=21
+NOT=22
+OCTAL_ESC=23
+OF=24
+OR=25
+STRING=26
+TRUE=27
+UNICODE_ESC=28
+UUID=29
+WITHIN=30
+WS=31
+'('=32
+')'=33
+'*'=34
+','=35
+':'=36
+'order by'=37
+'select'=38
+'where'=39
+'{'=40
+'}'=41
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
index 9b1898c..65c20f0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
@@ -30,14 +30,59 @@
  * An instance of a candidate result
  */
 public class CandidateResult implements EntityVersion {
-    private final Id entityId;
+    private Id entityId;
     private final UUID entityVersion;
     private final String docId;
+    private final String directEntityName;
+    private final UUID directEntityUUID;
+    private final String directEntityType;
 
     public CandidateResult( Id entityId, UUID entityVersion, String docId ) {
         this.entityId = entityId;
         this.entityVersion = entityVersion;
         this.docId = docId;
+        this.directEntityName = null;
+        this.directEntityUUID = null;
+        this.directEntityType = null;
+    }
+
+    public CandidateResult( Id entityId, CandidateResult sourceResult ) {
+        this.entityId = entityId;
+        this.entityVersion = sourceResult.entityVersion;
+        this.docId = sourceResult.docId;
+        this.directEntityName = sourceResult.directEntityName;
+        this.directEntityUUID = sourceResult.directEntityUUID;
+        this.directEntityType = sourceResult.directEntityType;
+    }
+
+    // direct query by name before resolution
+    public CandidateResult( String entityType, String directEntityName ) {
+        this.directEntityName = directEntityName;
+        this.directEntityUUID = null;
+        this.directEntityType = entityType;
+        this.entityId = null;
+        this.entityVersion = null;
+        this.docId = null;
+    }
+
+    // direct query by UUID before resolution
+    public CandidateResult( String entityType, UUID directEntityUUID ) {
+        this.directEntityUUID = directEntityUUID;
+        this.directEntityName = null;
+        this.directEntityType = entityType;
+        this.entityId = null;
+        this.entityVersion = null;
+        this.docId = null;
+    }
+
+    public boolean isDirectQueryName() {
+        return directEntityName != null;
+    }
+    public boolean isDirectQueryUUID() {
+        return directEntityUUID != null;
+    }
+    public boolean isDirectQuery() {
+        return isDirectQueryName() || isDirectQueryUUID();
     }
 
     @Override
@@ -54,25 +99,69 @@
         return docId;
     }
 
+    public String getDirectEntityName() {
+        return directEntityName;
+    }
+
+    public UUID getDirectEntityUUID() {
+        return directEntityUUID;
+    }
+
+    public String getDirectEntityType() {
+        return directEntityType;
+    }
+
+    // use to set id for direct query after resolution
+    public void setId(Id entityId) {
+        this.entityId = entityId;
+    }
+
 
     @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;
         }
+        if ( o == null ) {
+            return false;
+        }
         if ( !( o instanceof CandidateResult ) ) {
             return false;
         }
 
         final CandidateResult that = ( CandidateResult ) o;
 
-        if ( !entityId.equals( that.entityId ) ) {
+        if ( entityId == null && that.entityId != null) {
             return false;
         }
-        if ( !entityVersion.equals( that.entityVersion ) ) {
+        if ( entityId != null && !entityId.equals( that.entityId ) ) {
             return false;
         }
-        if ( !docId.equals( that.docId ) ) {
+        if ( entityVersion == null && that.entityVersion != null) {
+            return false;
+        }
+        if ( entityVersion != null && !entityVersion.equals( that.entityVersion ) ) {
+            return false;
+        }
+        if ( docId == null && that.docId != null) {
+            return false;
+        }
+        if ( docId != null && !docId.equals( that.docId ) ) {
+            return false;
+        }
+        if ( directEntityUUID != that.directEntityUUID ) {
+            return false;
+        }
+        if ( directEntityName == null && that.directEntityName != null) {
+            return false;
+        }
+        if ( directEntityName != null && !directEntityName.equals( that.directEntityName ) ) {
+            return false;
+        }
+        if ( directEntityType == null && that.directEntityType != null) {
+            return false;
+        }
+        if ( directEntityType != null && !directEntityType.equals( that.directEntityType ) ) {
             return false;
         }
 
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
index b8f87b6..3a69706 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
@@ -38,11 +38,19 @@
 
     private final List<CandidateResult> candidates;
     private final Collection<SelectFieldMapping> getFieldMappings;
+    private final boolean isDirectQuery;
 
     public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings) {
+        this(candidates, getFieldMappings, false);
+    }
+
+    public CandidateResults(List<CandidateResult> candidates,
+                            final Collection<SelectFieldMapping> getFieldMappings,
+                            final boolean isDirectQuery) {
         this.candidates = candidates;
         this.getFieldMappings = getFieldMappings;
         offset = Optional.absent();
+        this.isDirectQuery = isDirectQuery;
     }
 
 
@@ -82,6 +90,10 @@
         return getFieldMappings;
     }
 
+    public boolean isDirectQuery() {
+        return this.isDirectQuery;
+    }
+
 
     /**
      * Get the candidates
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index afcfdd7..ebe6824 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -22,6 +22,7 @@
 
 import org.apache.usergrid.persistence.core.CPManager;
 import org.apache.usergrid.persistence.core.util.Health;
+import org.apache.usergrid.persistence.index.query.ParsedQuery;
 import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
@@ -140,6 +141,25 @@
                             final int limit, final int offset, final Map<String, Class> fieldsWithType,
                             final boolean analyzeOnly, final boolean returnQuery);
 
+    /**
+     * Search on every document in the specified search edge.  Also search by the types if specified
+     *
+     * @param searchEdge        The edge to search on
+     * @param searchTypes       The search types to search
+     * @param parsedQuery       The parsed query to execute
+     * @param limit             The limit of values to return
+     * @param offset            The offset to query on
+     * @param fieldsWithType    An optional param that allows the caller to provide schema related info which might
+     *                          relate to data in the query, such as sort predicate types
+     * @param analyzeOnly       This optional param will instruct the query processing to only analyze the query and
+     *                          provide info but not actually execute the query.
+     * @param returnQuery       This optional param will cause the index query to be returned instead of run.
+     * @return
+     */
+    CandidateResults search(final SearchEdge searchEdge, final SearchTypes searchTypes, final ParsedQuery parsedQuery,
+                            final int limit, final int offset, final Map<String, Class> fieldsWithType,
+                            final boolean analyzeOnly, final boolean returnQuery);
+
 
     /**
      * Same as search, just iterates all documents that match the index edge exactly.
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 9b42da3..493a722 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -72,6 +72,8 @@
 
     String USERGRID_QUERYANALYZER_ENFORCE = "usergrid.queryanalyzer.enforce";
 
+    String DIRECT_QUERY_MAX_ITEMS = "direct.query.max.items";
+
 
 
 
@@ -238,4 +240,8 @@
     @Default("false")
     @Key( USERGRID_QUERYANALYZER_ENFORCE )
     boolean enforceQueryBreaker();
+
+    @Default("1000")
+    @Key( DIRECT_QUERY_MAX_ITEMS )
+    int directQueryMaxItems();
 }
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/TooManyDirectEntitiesException.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/TooManyDirectEntitiesException.java
new file mode 100644
index 0000000..b3f2052
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/TooManyDirectEntitiesException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.exceptions;
+
+
+/**
+ * Thrown when the user attempts to perform a "direct" operation with more than the max limit number of entities
+ */
+public class TooManyDirectEntitiesException extends QueryException {
+
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+    final private int numberItemsRequested;
+    final private int maxItemsAllowed;
+
+
+    public TooManyDirectEntitiesException(int numberItemsRequested, int maxItemsAllowed) {
+        super( "Exceeded maximum number of direct entities requested: "
+                + Integer.toString(numberItemsRequested) + " requested, limit is " + Integer.toString(maxItemsAllowed));
+        this.numberItemsRequested = numberItemsRequested;
+        this.maxItemsAllowed = maxItemsAllowed;
+    }
+
+
+    public int getNumberItemsRequested() {
+        return numberItemsRequested;
+    }
+
+
+    public int getMaxNumberItems() {
+        return maxItemsAllowed;
+    }
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index e3121e1..82be6ec 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -36,11 +36,9 @@
 import org.apache.usergrid.persistence.core.util.StringUtils;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
-import org.apache.usergrid.persistence.index.exceptions.IndexException;
-import org.apache.usergrid.persistence.index.exceptions.QueryAnalyzerException;
-import org.apache.usergrid.persistence.index.exceptions.QueryAnalyzerEnforcementException;
-import org.apache.usergrid.persistence.index.exceptions.QueryReturnException;
+import org.apache.usergrid.persistence.index.exceptions.*;
 import org.apache.usergrid.persistence.index.migration.IndexDataVersions;
+import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.ParsedQuery;
 import org.apache.usergrid.persistence.index.query.ParsedQueryBuilder;
 import org.apache.usergrid.persistence.index.query.SortPredicate;
@@ -437,23 +435,39 @@
     public CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
                                     final int limit, final int offset, final Map<String, Class> fieldsWithType,
                                     final boolean analyzeOnly, final boolean returnQuery ) {
+        Preconditions.checkNotNull( query, "query cannot be null" );
+        final ParsedQuery parsedQuery = ParsedQueryBuilder.build(query);
+
+        return search(searchEdge, searchTypes, parsedQuery, limit, offset, fieldsWithType, analyzeOnly, returnQuery);
+    }
+
+    public CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final ParsedQuery parsedQuery,
+                                    final int limit, final int offset, final Map<String, Class> fieldsWithType,
+                                    final boolean analyzeOnly, final boolean returnQuery ) {
 
         IndexValidationUtils.validateSearchEdge(searchEdge);
         Preconditions.checkNotNull(searchTypes, "searchTypes cannot be null");
-        Preconditions.checkNotNull( query, "query cannot be null" );
         Preconditions.checkArgument( limit > 0, "limit must be > 0" );
 
 
         SearchResponse searchResponse;
 
-        final ParsedQuery parsedQuery = ParsedQueryBuilder.build(query);
-
         if ( parsedQuery == null ){
             throw new IllegalArgumentException("a null query string cannot be parsed");
         }
 
+        if (parsedQuery.isDirectQuery() && parsedQuery.getDirectQueryItemCount() > indexFig.directQueryMaxItems()) {
+            throw new TooManyDirectEntitiesException(parsedQuery.getDirectQueryItemCount(), indexFig.directQueryMaxItems());
+        }
+
         final QueryVisitor visitor = visitParsedQuery(parsedQuery);
 
+        List<Identifier> directIdentifiers = visitor.getDirectIdentifiers();
+        if (directIdentifiers != null && directIdentifiers.size() > 0) {
+            // this is a direct query
+            return buildCandidateResultsForDirectQuery(directIdentifiers, parsedQuery, searchTypes);
+        }
+
         boolean hasGeoSortPredicates = false;
 
         for (SortPredicate sortPredicate : parsedQuery.getSortPredicates() ){
@@ -644,7 +658,7 @@
 
 
     /**
-     * Parse the results and return the canddiate results
+     * Parse the results and return the candidate results
      */
     private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
                                            final int limit, final int from, boolean hasGeoSortPredicates ) {
@@ -677,6 +691,33 @@
         return candidateResults;
     }
 
+
+    /**
+     * Build CandidateResults from direct query
+     */
+    private CandidateResults buildCandidateResultsForDirectQuery(final List<Identifier> directIdentifiers,
+                                                                 final ParsedQuery query,
+                                                                 final SearchTypes searchTypes) {
+        Preconditions.checkArgument(searchTypes.getTypes().length > 0, "Search type required");
+        String entityType = searchTypes.getTypes()[0];
+
+        List<CandidateResult> candidates = new ArrayList<>(directIdentifiers.size());
+
+        for (Identifier id : directIdentifiers) {
+            CandidateResult candidateResult = null;
+            if (id.isUUID()) {
+                candidateResult = new CandidateResult(entityType, id.getUUID());
+            } else if (id.isName()) {
+                candidateResult = new CandidateResult(entityType, id.getName());
+            }
+            candidates.add(candidateResult);
+        }
+
+        return new CandidateResults(candidates, query.getSelectFieldMappings(), true);
+    }
+
+
+
     private List<CandidateResult> aggregateScrollResults(List<CandidateResult> candidates,
                                                          final SearchResponse searchResponse, final UUID markedVersion){
 
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
index 4dd0d24..7f08ee3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
@@ -19,9 +19,10 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
-import java.util.Stack;
-import java.util.UUID;
+import java.util.*;
 
+import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.index.query.tree.*;
 import org.elasticsearch.common.geo.GeoDistance;
 import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.index.query.BoolFilterBuilder;
@@ -44,17 +45,6 @@
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
 import org.apache.usergrid.persistence.index.exceptions.NoFullTextIndexException;
 import org.apache.usergrid.persistence.index.exceptions.NoIndexException;
-import org.apache.usergrid.persistence.index.query.tree.AndOperand;
-import org.apache.usergrid.persistence.index.query.tree.ContainsOperand;
-import org.apache.usergrid.persistence.index.query.tree.Equal;
-import org.apache.usergrid.persistence.index.query.tree.GreaterThan;
-import org.apache.usergrid.persistence.index.query.tree.GreaterThanEqual;
-import org.apache.usergrid.persistence.index.query.tree.LessThan;
-import org.apache.usergrid.persistence.index.query.tree.LessThanEqual;
-import org.apache.usergrid.persistence.index.query.tree.NotOperand;
-import org.apache.usergrid.persistence.index.query.tree.OrOperand;
-import org.apache.usergrid.persistence.index.query.tree.QueryVisitor;
-import org.apache.usergrid.persistence.index.query.tree.WithinOperand;
 
 import com.google.common.base.Optional;
 
@@ -79,6 +69,13 @@
 
     private final GeoSortFields geoSortFields = new GeoSortFields();
 
+    /**
+     * Query direct to C* bypassing ES
+     */
+    private final List<Identifier> directIdList = new ArrayList<>();
+
+
+
 
     @Override
     public void visit( AndOperand op ) throws IndexException {
@@ -323,6 +320,35 @@
 
 
     @Override
+    public void visit( DirectOperand op ) {
+        List<Literal> idList = op.getDirectIds();
+
+        Set<Identifier> idSet = new HashSet<>();
+
+        for (Literal literal : idList) {
+
+            Identifier identifier = null;
+            if (literal instanceof IdLiteral) {
+                String name = ((IdLiteral)literal).getValue();
+                identifier = Identifier.fromName(name);
+            } else if (literal instanceof UUIDLiteral) {
+                UUID uuid = ((UUIDLiteral)literal).getValue();
+                identifier = Identifier.fromUUID(uuid);
+            }
+            // should only allow IdLiteral or UUIDLiteral, ignore if other
+
+            if (identifier != null) {
+                // ignore if already seen
+                if (!idSet.contains(identifier)) {
+                    directIdList.add(identifier);
+                    idSet.add(identifier);
+                }
+            }
+        }
+    }
+
+
+    @Override
     public void visit( LessThan op ) throws NoIndexException {
         final String name = op.getProperty().getValue().toLowerCase();
         final Object value = op.getLiteral().getValue();
@@ -472,6 +498,12 @@
     }
 
 
+    @Override
+    public List<Identifier> getDirectIdentifiers() {
+        return directIdList;
+    }
+
+
     /**
      * Generate the field name term for the field name  for queries
      */
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/ParsedQuery.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/ParsedQuery.java
index 1cb3ba2..4c0da5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/ParsedQuery.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/ParsedQuery.java
@@ -34,6 +34,7 @@
 
 import org.apache.usergrid.persistence.index.SelectFieldMapping;
 import org.apache.usergrid.persistence.index.exceptions.QueryParseException;
+import org.apache.usergrid.persistence.index.query.tree.DirectOperand;
 import org.apache.usergrid.persistence.index.query.tree.Operand;
 
 
@@ -206,4 +207,17 @@
     public boolean isGeoQuery(){
         return getOriginalQuery().contains("location") && getOriginalQuery().contains("within");
     }
+
+    public boolean isDirectQuery() {
+        return rootOperand instanceof DirectOperand;
+    }
+
+    public int getDirectQueryItemCount() {
+        int count = 0;
+        if (rootOperand instanceof DirectOperand) {
+            DirectOperand root = (DirectOperand)rootOperand;
+            count = root.getChildCount();
+        }
+        return count;
+    }
 }
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/DirectOperand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/DirectOperand.java
new file mode 100644
index 0000000..796dc5e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/DirectOperand.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.query.tree;
+
+
+import org.antlr.runtime.Token;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+
+public class DirectOperand extends Operand {
+
+    public DirectOperand(Token t ) {
+        super( t );
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * org.apache.usergrid.persistence.query.tree.Operand#visit(org.apache.usergrid.persistence
+     * .query.tree.QueryVisitor)
+     */
+    @Override
+    public void visit( QueryVisitor visitor ) {
+        visitor.visit( this );
+    }
+
+
+    /**
+     * @param name
+     * @param index
+     */
+    public void setDirectId( String name, int index ) {
+        setChild( index, new IdLiteral( name ) );
+    }
+
+
+    /**
+     * @param uuid
+     * @param index
+     */
+    public void setDirectId(UUID uuid, int index ) {
+        setChild( index, new UUIDLiteral( uuid ) );
+    }
+
+    /**
+     * @param index
+     * @return
+     */
+    public Literal getDirectId( int index ) {
+        return ( Literal ) this.children.get( index );
+    }
+
+    /**
+     * @return
+     */
+    public List<Literal> getDirectIds() {
+        List<Literal> ids = new ArrayList<>();
+        for (Object child : this.children) {
+            ids.add((Literal) child);
+        }
+        return ids;
+    }
+
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/IdLiteral.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/IdLiteral.java
new file mode 100644
index 0000000..1defdd3
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/IdLiteral.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+
+package org.apache.usergrid.persistence.index.query.tree;
+
+
+import org.antlr.runtime.ClassicToken;
+import org.antlr.runtime.Token;
+
+
+public class IdLiteral extends Literal<String> {
+
+    private String value;
+
+
+    public IdLiteral(Token t ) {
+        super( t );
+        value = t.getText();
+    }
+
+
+    public IdLiteral(String value ) {
+        super( new ClassicToken( 0, value ) );
+        this.value = value;
+    }
+
+
+    public String getValue() {
+        return this.value;
+    }
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/QueryVisitor.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/QueryVisitor.java
index 273f23f..6c191cf 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/QueryVisitor.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/QueryVisitor.java
@@ -19,19 +19,16 @@
 package org.apache.usergrid.persistence.index.query.tree;
 
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
+import java.util.List;
 
 import org.apache.usergrid.persistence.index.exceptions.NoFullTextIndexException;
 import org.apache.usergrid.persistence.index.exceptions.NoIndexException;
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
 import org.apache.usergrid.persistence.index.impl.GeoSortFields;
-import org.apache.usergrid.persistence.index.query.SortPredicate;
+import org.apache.usergrid.persistence.index.query.Identifier;
 
 import org.elasticsearch.index.query.FilterBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
 
 import com.google.common.base.Optional;
 
@@ -103,6 +100,12 @@
      */
     void visit( GreaterThanEqual op ) throws NoIndexException;
 
+    /**
+     * @param op
+     * @throws NoIndexException
+     */
+    void visit( DirectOperand op );
+
 
     /**
      * Return any filters created during parsing
@@ -126,4 +129,9 @@
      * @return The GeoSortFields  null safe
      */
     GeoSortFields getGeoSorts();
+
+    /**
+     * Return list of identifiers for direct query
+     */
+    List<Identifier> getDirectIdentifiers();
 }
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
index bf8dd3c..1ba9f3c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
@@ -303,8 +303,11 @@
         if ( uuid == null ) {
             return 0;
         }
-        long t = uuid.timestamp();
-        return ( t - KCLOCK_OFFSET ) / KCLOCK_MULTIPLIER_L;
+        return getUnixTimestampInMillisFromUUIDTimestamp(uuid.timestamp());
+    }
+
+    public static long getUnixTimestampInMillisFromUUIDTimestamp( long uuidTimestamp ) {
+        return ( uuidTimestamp - KCLOCK_OFFSET ) / KCLOCK_MULTIPLIER_L;
     }
 
 
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/GrammarTreeTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/GrammarTreeTest.java
index 380057f..c459e43 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/GrammarTreeTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/GrammarTreeTest.java
@@ -681,9 +681,9 @@
         }
 
         assertEquals(
-                "NoViableAltException('!'@[1:1: Tokens : ( T__31 | T__32 | T__33 | T__34 | T__35 | T__36 | T__37 | "
-                        + "T__38 | T__39 | T__40 | LT | LTE | EQ | GT | GTE | BOOLEAN | AND | OR | NOT | ASC | DESC |"
-                        + " CONTAINS | WITHIN | OF | UUID | ID | LONG | FLOAT | STRING | WS );])",
+                "NoViableAltException('!'@[1:1: Tokens : ( T__32 | T__33 | T__34 | T__35 | T__36 | T__37 | T__38 | T__39 | T__40 |"
+		+" T__41 | LT | LTE | EQ | GT | GTE | BOOLEAN | AND | OR | NOT | ASC | DESC | CONTAINS | WITHIN | OF | DIRECT |"
+		+" UUID | ID | LONG | FLOAT | STRING | WS );])",
                 error );
     }
 }
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index de1671d..83b65de 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -682,7 +682,7 @@
 
     @Override
     public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
-        boolean sendAsync = async.booleanValue();
+        boolean sendAsync = async == null || async.booleanValue();
         if (sendAsync) {
             sendMessageToLocalRegionAsync(body);
         } else {
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
index 531716a..fdbd399 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
@@ -51,7 +51,7 @@
     public static IndexConsistency get(String name) {
         IndexConsistency queueIndexingStrategy =  NAME_MAP.get(name);
         if (queueIndexingStrategy == null) {
-            return LATEST;
+            return STRICT;
         }
         return queueIndexingStrategy;
     }
diff --git a/stack/pom.xml b/stack/pom.xml
index c98c72d..438a6fd 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -114,10 +114,10 @@
         <junit-version>4.12</junit-version>
         <log4j-version>1.2.16</log4j-version>
         <org.springframework.version>3.2.13.RELEASE</org.springframework.version>
-        <shiro-version>1.2.4</shiro-version>
+        <shiro-version>1.3.2</shiro-version>
         <slf4j-version>1.7.2</slf4j-version>
         <snakeyaml-version>1.9</snakeyaml-version>
-        <tomcat-version>7.0.64</tomcat-version>
+        <tomcat-version>7.0.84</tomcat-version>
         <antlr.version>3.4</antlr.version>
         <tika.version>1.4</tika.version>
         <mockito.version>1.10.8</mockito.version>
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ApiResponse.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ApiResponse.java
index 727d187..684e8e1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/ApiResponse.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ApiResponse.java
@@ -613,9 +613,16 @@
 
 
     public void setParams( Map<String, List<String>> params ) {
+        setParams(params, null);
+    }
+
+
+    // provide an ignore list to block custom params
+    public void setParams( Map<String, List<String>> params, List<String> ignoreList ) {
         Map<String, List<String>> q = new LinkedHashMap<>();
         for ( String k : params.keySet() ) {
             if (IGNORE_QP.contains(k.toLowerCase())) continue;
+            if (ignoreList != null && ignoreList.contains(k.toLowerCase())) continue;
             List<String> v = params.get( k );
             if ( v != null ) {
                 q.put( k, new ArrayList<>( v ) );
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
index b8abe54..6a42826 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
@@ -134,9 +134,10 @@
     @JSONP
     @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
     public ApiResponse getAllApplications2( @Context UriInfo ui,
-                                                @QueryParam("callback") @DefaultValue("callback") String callback )
+                                            @QueryParam("deleted") @DefaultValue("false") Boolean deleted,
+                                            @QueryParam("callback") @DefaultValue("callback") String callback )
             throws URISyntaxException {
-        return getAllApplications( ui, false, callback );
+        return getAllApplications( ui, deleted, callback );
     }
 
 
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index 449eac9..8cd5e9e 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -287,26 +287,41 @@
     }
 
 
-    // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
-    // So system access only.
-    // TODO: use scheduler here to get around people sending a reindex call 30 times.
+
     @POST
     @Path("{itemName}/_reindex")
     @Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
-    @RequireSystemAccess
+    @RequireApplicationAccess
     @JSONP
     public ApiResponse executePostForReindexing(
-        @Context UriInfo ui, String body,
+        @Context UriInfo ui, final Map<String, Object> payload,
         @PathParam("itemName") PathSegment itemName,
         @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
 
         addItemToServiceContext( ui, itemName );
 
         IndexResource indexResource = new IndexResource(injector);
-        return indexResource.rebuildIndexesPost(
+        return indexResource.rebuildIndexCollectionPost(payload,
             services.getApplicationId().toString(),itemName.getPath(),false,callback );
     }
 
+    @GET
+    @Path("{itemName}/_reindex")
+    @Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
+    @RequireApplicationAccess
+    @JSONP
+    public ApiResponse executeGetForReindexStatus(
+        @Context UriInfo ui, final Map<String, Object> payload,
+        @PathParam("itemName") PathSegment itemName,
+        @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+        addItemToServiceContext( ui, itemName );
+
+        IndexResource indexResource = new IndexResource(injector);
+        return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(),
+            callback );
+    }
+
 
     private CollectionDeleteService getCollectionDeleteService() {
         return injector.getInstance( CollectionDeleteService.class );
@@ -346,18 +361,17 @@
     private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {
 
 
-        final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
+        final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection(request);
 
         final ApiResponse response = createApiResponse();
 
-        response.setAction( "clear collection" );
-        response.setProperty( "jobId", status.getJobId() );
-        response.setProperty( "status", status.getStatus() );
-        response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
-        response.setProperty( "numberQueued", status.getNumberProcessed() );
+        response.setAction("clear collection");
+        response.setProperty("jobId", status.getJobId());
+        response.setProperty("status", status.getStatus());
+        response.setProperty("lastUpdatedEpoch", status.getLastUpdated());
+        response.setProperty("numberQueued", status.getNumberProcessed());
         response.setSuccess();
 
         return response;
     }
-
 }
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/TooManyDirectEntitiesExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/TooManyDirectEntitiesExceptionMapper.java
new file mode 100644
index 0000000..06226a5
--- /dev/null
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/TooManyDirectEntitiesExceptionMapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.usergrid.rest.exceptions;
+
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.Provider;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+import org.apache.usergrid.persistence.index.exceptions.TooManyDirectEntitiesException;
+
+@Provider
+public class TooManyDirectEntitiesExceptionMapper extends AbstractExceptionMapper<TooManyDirectEntitiesException> {
+    @Override
+    public Response toResponse( final TooManyDirectEntitiesException e ) {
+        return toResponse( BAD_REQUEST, e );
+    }
+}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
index 79973c3..0eff6b7 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
@@ -23,6 +23,7 @@
 import org.apache.amber.oauth2.common.message.OAuthResponse;
 import org.apache.commons.lang.NullArgumentException;
 import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.corepersistence.service.ApplicationRestorePasswordService;
 import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.export.ExportService;
@@ -44,6 +45,7 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
+import org.apache.usergrid.security.shiro.utils.SubjectUtils;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.Context;
@@ -73,6 +75,7 @@
     private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class);
 
     public static final String CONFIRM_APPLICATION_IDENTIFIER = "confirm_application_identifier";
+    public static final String RESTORE_PASSWORD = "restore_password";
 
     //@Autowired
     //protected ExportService exportService;
@@ -465,8 +468,23 @@
             throw new IllegalArgumentException("Application ID not specified in request");
         }
 
+        ApplicationRestorePasswordService restorePasswordService = getApplicationRestorePasswordService();
+        if (!SubjectUtils.isServiceAdmin()) {
+            // require password if it exists
+            String storedRestorePassword = restorePasswordService.getApplicationRestorePassword(applicationId);
+            if (StringUtils.isNotEmpty(storedRestorePassword)) {
+                // must have matching password as query parameter
+                String suppliedRestorePassword = ui.getQueryParameters().getFirst(RESTORE_PASSWORD);
+                if (!storedRestorePassword.equals(suppliedRestorePassword)) {
+                    throw new IllegalArgumentException("Application cannot be restored without application password");
+                }
+            }
+        }
+
         management.restoreApplication( applicationId );
 
+        // not deleting password -- will be changed upon successful soft delete
+
         ApiResponse response = createApiResponse();
         response.setAction( "restore" );
         response.setApplication( emf.getEntityManager( applicationId ).getApplication() );
@@ -505,8 +523,23 @@
                 "Cannot delete application without supplying correct application name");
         }
 
+        String restorePassword = null;
+        ApplicationRestorePasswordService restorePasswordService = getApplicationRestorePasswordService();
+        if (SubjectUtils.isServiceAdmin()) {
+            restorePassword = ui.getQueryParameters().getFirst(RESTORE_PASSWORD);
+            if (StringUtils.isNotEmpty(restorePassword)) {
+                // save password, required for future undelete if not sysadmin
+                restorePasswordService.setApplicationRestorePassword(applicationId, restorePassword);
+            }
+        }
+
         management.deleteApplication( applicationId );
 
+        if (restorePassword == null) {
+            // clear restore password
+            restorePasswordService.removeApplicationRestorePassword(applicationId);
+        }
+
         if (logger.isTraceEnabled()) {
             logger.trace("ApplicationResource.delete() deleted appId = {}", applicationId);
         }
@@ -523,4 +556,8 @@
         return response;
     }
 
+    private ApplicationRestorePasswordService getApplicationRestorePasswordService() {
+        return injector.getInstance(ApplicationRestorePasswordService.class);
+    }
+
 }
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
index be60177..ec86750 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
@@ -28,13 +28,16 @@
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.exception.ConflictException;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.index.utils.ConversionUtils;
 import org.apache.usergrid.persistence.index.utils.UUIDUtils;
 import org.apache.usergrid.rest.AbstractContextResource;
 import org.apache.usergrid.rest.ApiResponse;
 import org.apache.usergrid.rest.RootResource;
+import org.apache.usergrid.rest.security.annotations.RequireOrganizationAccess;
 import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+import org.apache.usergrid.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Scope;
@@ -182,26 +185,78 @@
         return executeResumeAndCreateResponse( payload, request, callback );
     }
 
+    @RequireOrganizationAccess
+    @GET
+    @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
+    @JSONP
+    @Produces({ MediaType.APPLICATION_JSON, "application/javascript" })
+    public ApiResponse rebuildIndexCollectionGet( @PathParam( "applicationId" ) final String applicationIdStr,
+                                        @PathParam( "collectionName" ) final String collectionName,
+                                        @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
 
-    @RequireSystemAccess
+
+        throws Exception {
+        if (logger.isTraceEnabled()) {
+            logger.trace("Getting re-index status for app: {}, collection: {}", applicationIdStr, collectionName);
+        }
+
+
+        ReIndexService.ReIndexStatus status = getReIndexService().getStatusForCollection(applicationIdStr, collectionName);
+
+        final ApiResponse response = createApiResponse();
+
+        response.setAction( "get rebuild index status" );
+        response.setProperty( "collection", status.getCollectionName() );
+        response.setProperty( "status", status.getStatus() );
+        response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+        response.setProperty( "numberQueued", status.getNumberProcessed() );
+        response.setSuccess();
+
+        return response;
+    }
+
+    @RequireOrganizationAccess
     @POST
     @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
     @JSONP
     @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
-    public ApiResponse rebuildIndexesPost( @PathParam( "applicationId" ) final String applicationIdStr,
-                                               @PathParam( "collectionName" ) final String collectionName,
-                                               @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
-                                               @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+    public ApiResponse rebuildIndexCollectionPost(final Map<String, Object> payload,
+                                                  @PathParam( "applicationId" ) final String applicationIdStr,
+                                                  @PathParam( "collectionName" ) final String collectionName,
+                                                  @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
+                                                  @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
         throws Exception {
 
+        ReIndexService.ReIndexStatus existingStatus =
+            getReIndexService().getStatusForCollection(applicationIdStr, collectionName);
 
-        logger.info( "Rebuilding collection {} in  application {}", collectionName, applicationIdStr );
+        if(existingStatus.getStatus().equals(ReIndexService.Status.INPROGRESS)){
+            throw new ConflictException("Re-index for collection currently in progress");
+        }
+
+        logger.info( "Re-indexing collection {} in  application {}", collectionName, applicationIdStr );
 
         final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
 
+
         final ReIndexRequestBuilder request =
             createRequest().withApplicationId( appId ).withCollection( collectionName );
 
+        Map<String,Object> newPayload = payload;
+        if(newPayload == null ||  !payload.containsKey( UPDATED_FIELD )){
+            newPayload = new HashMap<>(1);
+            newPayload.put(UPDATED_FIELD,0);
+        }
+
+        Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number,
+            "Property \"updated\" in the payload must be a number in unix timestamp millis format" );
+
+        //add our updated timestamp to the request
+        if ( newPayload.containsKey( UPDATED_FIELD ) ) {
+            final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD));
+            request.withStartTimestamp( timestamp );
+        }
+
         return executeAndCreateResponse( request, callback );
     }
 
@@ -214,7 +269,6 @@
     public ApiResponse rebuildIndexesPut( final Map<String, Object> payload,
                                               @PathParam( "applicationId" ) final String applicationIdStr,
                                               @PathParam( "collectionName" ) final String collectionName,
-                                              @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
                                               @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
         throws Exception {
 
@@ -350,7 +404,15 @@
         final ApiResponse response = createApiResponse();
 
         response.setAction( "rebuild indexes" );
-        response.setProperty( "jobId", status.getJobId() );
+
+        if(StringUtils.isNotEmpty(status.getJobId())){
+            response.setProperty( "jobId", status.getJobId() );
+        }
+
+        if(StringUtils.isNotEmpty(status.getCollectionName())){
+            response.setProperty( "collection", status.getCollectionName() );
+        }
+
         response.setProperty( "status", status.getStatus() );
         response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
         response.setProperty( "numberQueued", status.getNumberProcessed() );
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/IndexResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/IndexResourceIT.java
index 28d6501..be71881 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/IndexResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/IndexResourceIT.java
@@ -25,6 +25,7 @@
 import org.apache.usergrid.rest.test.resource.model.ApiResponse;
 import org.apache.usergrid.rest.test.resource.model.QueryParameters;
 import org.apache.usergrid.rest.test.resource.model.Token;
+import org.apache.usergrid.corepersistence.index.ReIndexService.Status;
 import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -68,35 +69,51 @@
             .get(clientSetup.getSuperuserName(),clientSetup.getSuperuserPassword());
 
         QueryParameters queryParameters = new QueryParameters();
-        queryParameters.addParam( "access_token",superUserToken.getAccessToken());
+        queryParameters.addParam("access_token", superUserToken.getAccessToken());
         ApiResponse result = clientSetup.getRestClient()
-            .pathResource( "system/index/rebuild/" + clientSetup.getAppUuid() + "/StOrElaTloNs" )
-            .post( false, ApiResponse.class, null, queryParameters, true );
+            .pathResource("system/index/rebuild/" + clientSetup.getAppUuid() + "/StOrElaTloNs")
+            .post(false, ApiResponse.class, null, queryParameters, true);
 
         assertNotNull(result);
+        assertEquals(Status.STARTED.name(), result.getStatus());
 
         //try the reindex endpoint with all lowercase Characters
         queryParameters = new QueryParameters();
-        queryParameters.addParam( "access_token",clientSetup.getSuperuserToken().getAccessToken() );
+        queryParameters.addParam("access_token", superUserToken.getAccessToken());
         result = clientSetup.getRestClient()
-            .pathResource( "system/index/rebuild/"+clientSetup.getAppUuid()+"/storelatlons" )
-            .post( false, ApiResponse.class,null,queryParameters,true);
-        String status = result.getProperties().get("jobId").toString();
-
-        assertNotNull( result );
+            .pathResource("system/index/rebuild/" + clientSetup.getAppUuid() + "/storelatlons")
+            .post(false, ApiResponse.class, null, queryParameters, true);
+        
+        assertNotNull(result);
+        //at this point, this could return a result of the previous reindex, or if it has completed, it will create a new job
+        assertNotEquals(Status.UNKNOWN.name(), result.getStatus());
 
         WebTarget res = clientSetup.getRestClient()
-            .pathResource( "system/index/rebuild/" + result.getProperties()
-                .get( "jobId" ).toString() )
+            .pathResource("system/index/rebuild/" + clientSetup.getAppUuid() + "/storelatlons")
             .getTarget();
 
         HttpAuthenticationFeature feature = HttpAuthenticationFeature.basicBuilder()
             .credentials( "superuser", "superpassword" ).build();
 
         result = res.register( feature ).request().get( ApiResponse.class );
-
-        assertNotNull( result );
-        assertEquals(status,result.getProperties().get("jobId").toString());
+        assertNotNull(result);
+        
+        int retry = 0;
+        while(retry < 5 && !result.getStatus().equals(Status.COMPLETE.name())) {
+        	try {
+        		//hope reindex completes, if not, that's still ok
+				Thread.sleep(1000);
+			} catch (InterruptedException e) {
+				
+			}
+        	result = res.register( feature ).request().get( ApiResponse.class );
+        	retry++;
+        	assertNotNull(result);
+        }
+        
+        Map<String, Object> resultMap = result.getProperties();
+        assertNotNull( resultMap );
+        assertEquals(1,resultMap.get("numberQueued"));
 
 
     }
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/AndOrQueryTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/AndOrQueryTest.java
index d49460c..61e2760 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/AndOrQueryTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/AndOrQueryTest.java
@@ -24,6 +24,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import net.jcip.annotations.NotThreadSafe;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -35,6 +37,7 @@
  *
  * @since 4.0
  */
+@NotThreadSafe
 public class AndOrQueryTest extends QueryTestBase {
     private static final Logger logger = LoggerFactory.getLogger(AndOrQueryTest.class);
 
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
index 3d0c9fb..e0ab95e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
@@ -110,6 +110,8 @@
 
 	UUID addApplicationToOrganization(UUID organizationId, Entity appInfo) throws Exception;
 
+	boolean deleteAdminUser( UUID userId ) throws Exception;
+
 	void deleteOrganizationApplication( UUID organizationId, UUID applicationId ) throws Exception;
 
 	void disableAdminUser( UUID userId ) throws Exception;
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index ab93563..7e114e5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -934,6 +934,36 @@
     }
 
 
+    // reverse doCreateAdmin + creation of user entity
+    @Override
+    public boolean deleteAdminUser( UUID userId ) throws Exception {
+
+        // make sure user is not attached to any orgs
+        BiMap<UUID, String> orgMap = getOrganizationsForAdminUser(userId);
+        if (!orgMap.isEmpty()) {
+            // cannot delete admin user that is attached to orgs
+            logger.info("Cannot delete admin user {} -- admin user is attached to {} orgs", userId, orgMap.size());
+            return false;
+        }
+
+        EntityRef userRef = new SimpleEntityRef(User.ENTITY_TYPE, userId);
+
+        // delete mongo password
+        deleteUserMongoPassword(smf.getManagementAppId(), userRef);
+
+        // delete user password
+        deleteUserPassword(smf.getManagementAppId(), userRef);
+
+        // delete user token
+        deleteUserToken(smf.getManagementAppId(), userRef);
+
+        // delete user entity
+        emf.getEntityManager(smf.getManagementAppId()).delete(userRef);
+
+        return true;
+    }
+
+
     @Override
     public UserInfo createAdminFromPrexistingPassword( UUID organizationId, User user, CredentialsInfo ci )
         throws Exception {
@@ -3283,6 +3313,12 @@
     }
 
 
+    /** Delete the user's password credentials info */
+    protected void deleteUserPassword( UUID appId, EntityRef owner ) throws Exception {
+        deleteCreds( appId, owner, USER_PASSWORD );
+    }
+
+
     /** read the user password credential's info */
     protected CredentialsInfo readUserPasswordCredentials( UUID appId, UUID ownerId, String ownerType )
         throws Exception {
@@ -3302,11 +3338,22 @@
     }
 
 
+    /** Delete the user's token */
+    protected void deleteUserToken( UUID appId, EntityRef owner ) throws Exception {
+        deleteCreds( appId, owner, USER_TOKEN );
+    }
+
+
     /** Write the mongo password */
     protected void writeUserMongoPassword( UUID appId, EntityRef owner, CredentialsInfo password ) throws Exception {
         writeCreds( appId, owner, password, USER_MONGO_PASSWORD );
     }
 
+    /** Delete the mongo password */
+    private void deleteUserMongoPassword( UUID appId, EntityRef owner) throws Exception {
+        deleteCreds( appId, owner, USER_MONGO_PASSWORD );
+    }
+
 
     /** Read the mongo password */
     protected CredentialsInfo readUserMongoPassword( UUID appId, UUID ownerId, String ownerType ) throws Exception {
@@ -3339,6 +3386,12 @@
     }
 
 
+    private void deleteCreds( UUID appId, EntityRef owner, String key ) throws Exception {
+        EntityManager em = emf.getEntityManager( appId );
+        em.removeFromDictionary(owner, DICTIONARY_CREDENTIALS, key);
+    }
+
+
     private Set<CredentialsInfo> readUserPasswordHistory( UUID appId, UUID ownerId ) throws Exception {
         EntityManager em = emf.getEntityManager( appId );
         Entity owner = em.get( new SimpleEntityRef("user", ownerId ));
diff --git a/stack/services/src/test/java/org/apache/usergrid/NewOrgAppAdminRule.java b/stack/services/src/test/java/org/apache/usergrid/NewOrgAppAdminRule.java
index 659520e..892a440 100644
--- a/stack/services/src/test/java/org/apache/usergrid/NewOrgAppAdminRule.java
+++ b/stack/services/src/test/java/org/apache/usergrid/NewOrgAppAdminRule.java
@@ -120,6 +120,7 @@
      * Create the org admin and application
      */
     protected void before( Description description ) throws Exception {
+    	logger.info( "Test {}: Starting with application", description.getDisplayName() );
         final String className = description.getClassName();
         final String methodName = description.getMethodName();
         final String uuidString = newUUIDString();
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/OrganizationConfigIT.java b/stack/services/src/test/java/org/apache/usergrid/management/OrganizationConfigIT.java
index 6424dbd..d2932c2 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/OrganizationConfigIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/OrganizationConfigIT.java
@@ -32,12 +32,14 @@
 import org.junit.Rule;
 import org.junit.Test;
 
+import net.jcip.annotations.NotThreadSafe;
+
 import java.util.*;
 
 import static org.apache.usergrid.TestHelper.*;
 import static org.junit.Assert.*;
 
-
+@NotThreadSafe
 public class OrganizationConfigIT {
 
     @Rule
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/export/ExportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/export/ExportServiceIT.java
index 870b678..2268ea6 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/export/ExportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/export/ExportServiceIT.java
@@ -59,6 +59,8 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Module;
 
+import net.jcip.annotations.NotThreadSafe;
+
 import static org.apache.usergrid.TestHelper.newUUIDString;
 import static org.apache.usergrid.TestHelper.uniqueApp;
 import static org.apache.usergrid.TestHelper.uniqueOrg;
@@ -74,6 +76,7 @@
  *
  *
  */
+@NotThreadSafe
 public class ExportServiceIT {
 
     private static final Logger logger = LoggerFactory.getLogger( ExportServiceIT.class );
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index b34b068..991ff7c 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -262,6 +262,12 @@
       <version>${aws.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>2.8.1</version>
+    </dependency>
+
       <dependency>
           <groupId>org.mockito</groupId>
           <artifactId>mockito-core</artifactId>
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java b/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java
new file mode 100644
index 0000000..911e43f
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.BiMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.management.UserInfo;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.index.impl.EsProvider;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.services.assets.data.BinaryStore;
+import org.apache.usergrid.tools.export.ExportEntity;
+import org.apache.usergrid.utils.StringUtils;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.schedulers.Schedulers;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Delete all entities and connections of a Usergrid app.
+ */
+public class AppDeleter extends ExportingToolBase {
+    static final Logger logger = LoggerFactory.getLogger( AppDeleter.class );
+
+    private static final String ORGANIZATION_NAME = "organizationName";
+    private static final String APPLICATION_NAME = "applicationName";
+    private static final String DELETE_THREAD_COUNT = "deleteThreads";
+    private static final String PERFORM_DELETE = "performDelete";
+    private static final String LOG_EACH_ITEM = "logEachItem";
+
+    private static final String ACCESS_ID_PROPNAME = "AWS_ACCESS_KEY_ID";
+    private static final String SECRET_KEY_PROPNAME = "AWS_SECRET_KEY";
+    private static final String BUCKET_NAME_PROPNAME = "usergrid.binary.bucketname";
+
+    private static final String ALL_INDEXES = "*";
+    private static final String SCROLL_TIMEOUT = "5m";
+    private static final int SCROLL_SIZE = 10;
+
+    String applicationName;
+    String organizationName;
+
+    AtomicInteger entitiesFound = new AtomicInteger(0);
+    AtomicInteger entityDictionaryEntriesFound = new AtomicInteger(0);
+    AtomicInteger appDictionaryEntriesFound = new AtomicInteger(0);
+    AtomicInteger assetsFound = new AtomicInteger(0);
+    AtomicInteger esDocsFound = new AtomicInteger(0);
+    AtomicInteger orgAdminsFound = new AtomicInteger(0);
+
+    Scheduler deleteScheduler;
+    AmazonS3Client s3Client;
+    EsProvider esProvider;
+    IndexLocationStrategyFactory ilsf;
+
+    ObjectMapper mapper = new ObjectMapper();
+    Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
+    Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+
+    int deleteThreadCount = 10; // set via CLI option
+
+    BinaryStore binaryStore;
+
+    String logLineSeparator = "-------------------------------------------------------------------";
+
+
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
+
+        Options options = super.createOptions();
+
+        Option orgNameOption = OptionBuilder.hasArg().isRequired(true).withType("")
+            .withDescription( "Organization Name -" + ORGANIZATION_NAME ).create( ORGANIZATION_NAME );
+        options.addOption( orgNameOption );
+
+        Option appNameOption = OptionBuilder.hasArg().isRequired(false).withType("")
+            .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+        options.addOption( appNameOption );
+
+        Option performDeleteOption = OptionBuilder.hasArg().isRequired(false)
+                .withDescription("Perform Delete -" + PERFORM_DELETE).create(PERFORM_DELETE);
+        options.addOption( performDeleteOption );
+
+        Option deleteThreadsOption = OptionBuilder.hasArg().isRequired(false)
+                .withType("")
+                .withDescription( "Delete Threads -" + DELETE_THREAD_COUNT).create(DELETE_THREAD_COUNT);
+        options.addOption( deleteThreadsOption );
+
+        Option logEachItemOption = OptionBuilder.hasArg().isRequired(false)
+                .withDescription("Log each item -" + LOG_EACH_ITEM).create(LOG_EACH_ITEM);
+        options.addOption( logEachItemOption );
+
+        return options;
+    }
+
+
+    /**
+     * Tool entry point.
+     */
+    @Override
+    public void runTool(CommandLine line) throws Exception {
+
+        organizationName = line.getOptionValue( ORGANIZATION_NAME );
+        applicationName = line.getOptionValue( APPLICATION_NAME );
+        final boolean allApps = StringUtils.isEmpty(applicationName);
+
+        String performDeleteOption = line.getOptionValue(PERFORM_DELETE);
+        final boolean performDelete = StringUtils.isNotEmpty(performDeleteOption) && performDeleteOption.toLowerCase().equals("yes");
+
+        String logEachItemOption = line.getOptionValue(LOG_EACH_ITEM);
+        final boolean logEachItem = StringUtils.isNotEmpty(logEachItemOption) && logEachItemOption.toLowerCase().equals("yes");
+
+        if (StringUtils.isNotEmpty( line.getOptionValue(DELETE_THREAD_COUNT) )) {
+            try {
+                deleteThreadCount = Integer.parseInt( line.getOptionValue(DELETE_THREAD_COUNT) );
+            } catch (NumberFormatException nfe) {
+                logger.error( "-" + DELETE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+                return;
+            }
+        }
+
+        startSpring();
+
+        // S3 asset store
+        String accessId = (String)properties.get(ACCESS_ID_PROPNAME);
+        String secretKey = (String)properties.get(SECRET_KEY_PROPNAME);
+        String bucketName = (String)properties.get(BUCKET_NAME_PROPNAME);
+
+        Properties overrides = new Properties();
+        overrides.setProperty( "s3" + ".identity", accessId );
+        overrides.setProperty( "s3" + ".credential", secretKey );
+
+        AWSCredentials credentials = new BasicAWSCredentials(accessId, secretKey);
+        ClientConfiguration clientConfig = new ClientConfiguration();
+        clientConfig.setProtocol(Protocol.HTTP);
+
+        s3Client = new AmazonS3Client(credentials, clientConfig);
+
+        // ES
+        ilsf = injector.getInstance(IndexLocationStrategyFactory.class);
+        esProvider = injector.getInstance(EsProvider.class);
+
+        ExecutorService deleteThreadPoolExecutor = Executors.newFixedThreadPool(deleteThreadCount);
+        deleteScheduler = Schedulers.from( deleteThreadPoolExecutor );
+
+        setVerbose( line );
+
+        logger.info(logLineSeparator);
+
+        boolean singleApp = false;
+        String matchingAppPrefix = organizationName + "/";
+        if (StringUtils.isNotEmpty(applicationName)) {
+            singleApp = true;
+            matchingAppPrefix += applicationName;
+            logger.info("APPLICATION:");
+        } else {
+            logger.info("APPLICATIONS FOR ORG " + organizationName + ":");
+        }
+
+        boolean foundApps = false;
+        Map<String, UUID> activeAppMap = new HashMap<>();
+        for (Map.Entry<String, UUID> entry : emf.getApplications().entrySet()) {
+            if (entry.getKey().startsWith(matchingAppPrefix)) {
+                foundApps = true;
+                logger.info("ACTIVE APP: {} - {}", entry.getKey(), entry.getValue().toString());
+                activeAppMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        Map<String, UUID> deletedAppMap = new HashMap<>();
+        for (Map.Entry<String, UUID> entry : emf.getDeletedApplications().entrySet()) {
+            if (entry.getKey().startsWith(matchingAppPrefix)) {
+                foundApps = true;
+                logger.info("DELETED APP: {} - {}", entry.getKey(), entry.getValue().toString());
+                deletedAppMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+        logger.info(logLineSeparator);
+
+        if (!foundApps) {
+            if (singleApp) {
+                throw new RuntimeException( "Cannot find application " + organizationName + "/" + applicationName );
+            } else {
+                throw new RuntimeException( "Cannot find applications for org " + organizationName );
+            }
+        }
+
+        for (String name : activeAppMap.keySet()) {
+            UUID applicationId = activeAppMap.get(name);
+            final EntityManager em = emf.getEntityManager( applicationId );
+            handleApp(applicationId, name, false, em, performDelete, bucketName, logEachItem);
+        }
+        for (String name : deletedAppMap.keySet()) {
+            UUID applicationId = deletedAppMap.get(name);
+            final EntityManager em = emf.getEntityManager( applicationId );
+            handleApp(applicationId, name, true, em, performDelete, bucketName, logEachItem);
+        }
+
+        if (!singleApp) {
+            // handle org
+            handleOrg(organizationName, performDelete);
+        }
+
+    }
+
+
+    private void handleOrg(String organizationName, boolean performDelete) throws Exception {
+        OrganizationInfo orgInfo = managementService.getOrganizationByName(organizationName);
+        UUID orgUUID = orgInfo.getUuid();
+
+        logger.info(logLineSeparator);
+        logger.info("ORGANIZATION: {}({})", organizationName, orgUUID);
+        logger.info(logLineSeparator);
+
+        if (performDelete) {
+            try {
+                String clientId = managementService.getClientIdForOrganization(orgUUID);
+                String oldClientSecret = managementService.getClientSecretForOrganization(orgUUID);
+                logger.info(logLineSeparator);
+                logger.info("OLD ORG CLIENT ID: {}", clientId);
+                logger.info("OLD ORG CLIENT SECRET: {}", oldClientSecret);
+                String newClientSecret = managementService.newClientSecretForOrganization(orgInfo.getUuid());
+                logger.info("NEW ORG CLIENT SECRET: {}", newClientSecret);
+                logger.info(logLineSeparator);
+            } catch (Exception e) {
+                logger.error("FAILED TO CHANGE CREDENTIALS FOR ORG " + organizationName + ": " + e.getMessage(), e);
+            }
+        }
+
+        List<UserInfo> userList = managementService.getAdminUsersForOrganization(orgInfo.getUuid());
+
+        logger.info(logLineSeparator);
+        logger.info("ORGANIZATION ADMINS: {}({})", organizationName, orgInfo.getUuid());
+        logger.info(logLineSeparator);
+        orgAdminsFound.set(0);
+        for (UserInfo user : userList) {
+            orgAdminsFound.incrementAndGet();
+            BiMap<UUID, String> adminOrgs = managementService.getOrganizationsForAdminUser(user.getUuid());
+            int numOrgs = adminOrgs.size();
+            logger.info("ORGADMIN: {} ({}) - number of other orgs: {}", user.getUsername(), user.getEmail(), numOrgs - 1);
+            if (performDelete) {
+                managementService.removeAdminUserFromOrganization(user.getUuid(), orgInfo.getUuid(), true);
+                if (numOrgs <= 1) {
+                    logger.info("ORGADMIN {} is in no other orgs -- deleting", user.getUsername());
+                    try {
+                        boolean success = managementService.deleteAdminUser(user.getUuid());
+                        if (!success) {
+                            logger.info("ORGADMIN {} - failed to delete", user.getUsername());
+                        }
+                    } catch (Exception e) {
+                        logger.info("ORGADMIN " + user.getUsername() + " - exception while deleting: " + e.getMessage(), e);
+                    }
+                }
+            }
+        }
+        logger.info(logLineSeparator);
+        logger.info("ORGANIZATION ADMINS {} DONE! OrgAdmins: {}", performDelete ? "DELETE" : "LIST", orgAdminsFound.get());
+        logger.info(logLineSeparator);
+
+    }
+
+
+    private void handleApp(UUID appId, String orgAppName, boolean deletedApp,
+                           EntityManager em, boolean performDelete, String bucketName,
+                           boolean logEachItem) {
+        logger.info(logLineSeparator);
+        logger.info("APPLICATION: {}({}){}", orgAppName, appId.toString(), deletedApp ? " - DELETED" : "");
+        logger.info(logLineSeparator);
+
+        if (performDelete) {
+            try {
+                String clientId = managementService.getClientIdForApplication(appId);
+                String oldClientSecret = managementService.getClientSecretForApplication(appId);
+                logger.info(logLineSeparator);
+                logger.info("OLD APP CLIENT ID: {}", clientId);
+                logger.info("OLD APP CLIENT SECRET: {}", oldClientSecret);
+                String newClientSecret = managementService.newClientSecretForApplication(appId);
+                logger.info("NEW APP CLIENT SECRET: {}", newClientSecret);
+                logger.info(logLineSeparator);
+            } catch (Exception e) {
+                logger.error("FAILED TO CHANGE CREDENTIALS FOR APP " + orgAppName + ": " + e.getMessage(), e);
+            }
+        }
+
+        logger.info(logLineSeparator);
+        logger.info("FINDING APP DICTIONARIES");
+        logger.info(logLineSeparator);
+        // check for entity dictionaries
+        try {
+            EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+
+            Application application = em.getApplication();
+            //logger.info("APP: {}", application.toString());
+
+            for ( String dictionary : rootEm.getDictionaries( application ) ) {
+                try {
+                    //logger.info("DICTIONARY NAME: {}", dictionary);
+                    Map<Object, Object> dictMap = rootEm.getDictionaryAsMap(application, dictionary, false);
+                    for (Object key : dictMap.keySet()) {
+                        appDictionaryEntriesFound.incrementAndGet();
+                        if (logEachItem) {
+                            logger.info("APP DICTIONARY {} ENTRY: ({})", dictionary, key.toString());
+                        }
+                    }
+                }
+                catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+        catch (Exception e) {
+            logger.error("APP DICTIONARY CHECK FOR APP " + orgAppName + " FAILED: " + e.getMessage(), e);
+        }
+        logger.info(logLineSeparator);
+        logger.info("APP DICTIONARIES {} DONE! App Dictionary Entries found: {}", performDelete ? "DELETE" : "LIST", appDictionaryEntriesFound.get());
+        logger.info(logLineSeparator);
+
+        logger.info(logLineSeparator);
+        logger.info("FINDING ENTITIES");
+        logger.info(logLineSeparator);
+        entitiesFound.set(0);
+        Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+
+        collectionsObservable.flatMap( collection -> {
+
+            return Observable.create( new EntityObservable( em, collection ) )
+                .doOnNext( new EntityDeleteAction(em, performDelete, logEachItem) ).subscribeOn(deleteScheduler);
+
+
+        } ).doOnCompleted( new EntityDeleteWrapUpAction(performDelete) ).toBlocking().lastOrDefault(null);
+
+
+        logger.info(logLineSeparator);
+        logger.info("FINDING ASSETS");
+        logger.info(logLineSeparator);
+        assetsFound.set(0);
+
+        ObjectListing listing = null;
+        try {
+            listing = s3Client.listObjects(bucketName, appId.toString() + "/");
+        }
+        catch (Exception e) {
+            logger.error("FAILED TO RETRIEVE ASSETS: ", e);
+        }
+        if (listing != null) {
+            for (S3ObjectSummary summary : listing.getObjectSummaries()) {
+                String assetKey = summary.getKey();
+                assetsFound.getAndIncrement();
+                if (logEachItem) {
+                    logger.info("ASSET: {}", assetKey);
+                }
+                if (performDelete) {
+                    try {
+                        s3Client.deleteObject(bucketName, assetKey);
+                    }
+                    catch (Exception e) {
+                        logger.error("FAILED TO DELETE ASSET: " + assetKey, e);
+                    }
+                }
+            }
+        }
+        logger.info(logLineSeparator);
+        logger.info("Asset {} DONE! Assets: {}", performDelete ? "DELETE" : "LIST", assetsFound.get());
+        logger.info(logLineSeparator);
+
+        // Elasticsearch docs
+        logger.info(logLineSeparator);
+        logger.info("FINDING ES DOCS");
+        logger.info(logLineSeparator);
+        esDocsFound.set(0);
+
+        ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(appId, "application"));
+        // IndexLocationStrategy strategy = ilsf.getIndexLocationStrategy(applicationScope);
+
+        QueryBuilder qb = QueryBuilders.matchQuery("applicationId", "appId(" + appId.toString() + ",application)");
+        SearchResponse scrollResponse = esProvider.getClient()
+            .prepareSearch(ALL_INDEXES)
+            .setScroll(SCROLL_TIMEOUT)
+            .setSearchType(SearchType.SCAN)
+            .setQuery(qb)
+            .setSize(SCROLL_SIZE)
+            .setNoFields()
+            .execute().actionGet();
+
+        //logger.info(scrollResponse.toString());
+
+        while (true) {
+            BulkRequestBuilder bulkRequest = null;
+            if (performDelete) {
+                bulkRequest = esProvider.getClient().prepareBulk();
+            }
+            boolean docsToDelete = false;
+            for (SearchHit hit : scrollResponse.getHits().getHits()) {
+                esDocsFound.getAndIncrement();
+                if (logEachItem) {
+                    logger.info("ES DOC: {}", hit.getId());
+                }
+                if (performDelete) {
+                    docsToDelete = true;
+                    bulkRequest.add(esProvider.getClient()
+                        .prepareDelete(hit.getIndex(), hit.getType(), hit.getId()));
+                }
+            }
+
+            if (docsToDelete) {
+                BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+                if (bulkResponse.hasFailures()) {
+                    throw new RuntimeException(bulkResponse.buildFailureMessage());
+                }
+            }
+
+            scrollResponse = esProvider.getClient().prepareSearchScroll(scrollResponse.getScrollId())
+                .setScroll(SCROLL_TIMEOUT).execute().actionGet();
+
+            //logger.info(scrollResponse.toString());
+
+            if (scrollResponse.getHits().getHits().length == 0) {
+                break;
+            }
+        }
+        logger.info(logLineSeparator);
+        logger.info("ES Doc {} DONE! ES Docs: {}", performDelete ? "DELETE" : "LIST", esDocsFound.get());
+        logger.info(logLineSeparator);
+
+    }
+
+
+
+    // ----------------------------------------------------------------------------------------
+    // reading data
+
+
+    /**
+     * Emits collection names found in application.
+     */
+    private class CollectionsObservable implements Observable.OnSubscribe<String> {
+        EntityManager em;
+
+        public CollectionsObservable(EntityManager em) {
+            this.em = em;
+        }
+
+        public void call(Subscriber<? super String> subscriber) {
+
+            int count = 0;
+            try {
+                Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+
+                logger.debug( "Emitting {} collection names for application {}",
+                    collectionMetadata.size(), em.getApplication().getName() );
+
+                for ( String collection : collectionMetadata.keySet() ) {
+                    subscriber.onNext( collection );
+                    count++;
+                }
+
+            } catch (Exception e) {
+                subscriber.onError( e );
+            }
+
+            subscriber.onCompleted();
+            logger.info( "Completed. Read {} collection names", count );
+        }
+    }
+
+
+    /**
+     * Emits entities of collection.
+     */
+    private class EntityObservable implements Observable.OnSubscribe<ExportEntity> {
+        EntityManager em;
+        String collection;
+
+        public EntityObservable(EntityManager em, String collection) {
+            this.em = em;
+            this.collection = collection;
+        }
+
+        public void call(Subscriber<? super ExportEntity> subscriber) {
+
+            logger.info("Starting to fetch entities of collection {}", collection);
+
+            //subscriber.onStart();
+
+            try {
+                int count = 0;
+
+                Query query = new Query();
+                query.setLimit( MAX_ENTITY_FETCH );
+
+                Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+
+                while (results.size() > 0) {
+                    for (Entity entity : results.getEntities()) {
+                        try {
+                            Set<String> dictionaries = em.getDictionaries( entity );
+                            Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+                            for (String dictionary : dictionaries) {
+                                Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+                                if (dict.isEmpty()) {
+                                    continue;
+                                }
+                                dictionariesByName.put( dictionary, dict );
+                            }
+
+                            ExportEntity exportEntity = new ExportEntity(
+                                    organizationName,
+                                    applicationName,
+                                    entity,
+                                    dictionariesByName );
+
+                            subscriber.onNext( exportEntity );
+                            count++;
+
+                        } catch (Exception e) {
+                            logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+                        }
+                    }
+                    if (results.getCursor() == null) {
+                        break;
+                    }
+                    query.setCursor( results.getCursor() );
+                    results = em.searchCollection( em.getApplicationRef(), collection, query );
+                }
+
+                subscriber.onCompleted();
+                logger.info("Completed collection {}. Read {} entities", collection, count);
+
+            } catch ( Exception e ) {
+                subscriber.onError(e);
+            }
+        }
+    }
+
+
+    // ----------------------------------------------------------------------------------------
+    // writing data
+
+
+    /**
+     * Delete entities.
+     */
+    private class EntityDeleteAction implements Action1<ExportEntity> {
+
+        EntityManager em;
+        boolean performDelete;
+        boolean logEachEntity;
+
+        public EntityDeleteAction(EntityManager em, boolean performDelete, boolean logEachEntity) {
+            this.em = em;
+            this.performDelete = performDelete;
+            this.logEachEntity = logEachEntity;
+        }
+
+        public void call(ExportEntity entity) {
+
+            try {
+                entitiesFound.getAndIncrement();
+                if (logEachEntity) {
+                    logger.info("ENTITY: {}", entity.getEntity().asId().toString());
+                }
+
+                // check for entity dictionaries
+                if (entity.getDictionaries().size() > 0) {
+                    for (String dictionaryName : entity.getDictionaries().keySet()) {
+                        Map<Object, Object> dictMap = em.getDictionaryAsMap(entity.getEntity(), dictionaryName);
+                        for (Object key : dictMap.keySet()) {
+                            entityDictionaryEntriesFound.incrementAndGet();
+                            if (logEachEntity) {
+                                logger.info("ENTITY DICTIONARY ENTRY ({}-{}): ({}): ({})",
+                                    entity.getEntity().asId().toString(),
+                                    dictionaryName, key.toString(),
+                                    dictMap.get(key).toString());
+                            }
+                        }
+                    }
+                }
+
+                if (performDelete) {
+                    em.delete(entity.getEntity());
+                }
+
+            } catch (IOException e) {
+                throw new RuntimeException("Error deleting entity (IOException): " + e.getMessage(), e);
+            } catch (Exception e) {
+                throw new RuntimeException("Error deleting entity: " + e.getMessage(), e);
+            }
+        }
+    }
+
+
+    private class EntityDeleteWrapUpAction implements Action0 {
+
+        boolean performDelete;
+
+        public EntityDeleteWrapUpAction(boolean performDelete) {
+            this.performDelete = performDelete;
+        }
+
+        @Override
+        public void call() {
+
+            logger.info(logLineSeparator);
+            logger.info("Entity {} DONE! Entities: {}", performDelete ? "DELETE" : "LIST", entitiesFound.get());
+            logger.info(logLineSeparator);
+        }
+    }
+}
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java b/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java
index 26b5f5f..bfe5edf 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java
@@ -23,23 +23,16 @@
 
 import com.google.common.base.Optional;
 import com.netflix.astyanax.MutationBatch;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
-import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
-import org.apache.usergrid.corepersistence.service.CollectionSearch;
 import org.apache.usergrid.corepersistence.service.CollectionService;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.*;
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.index.utils.UUIDUtils;
 import org.apache.usergrid.persistence.model.entity.*;
-import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
 import org.apache.usergrid.utils.InflectionUtils;
 import static org.apache.commons.lang.StringUtils.isBlank;
@@ -63,7 +56,9 @@
 
     private static final String ENTITY_TYPE_ARG = "entityType";
 
-    private static final String REMOVE_CONNECTIONS_ARG = "removeConnections";
+    private static final String REMOVE_DUPLICATE_CONNECTIONS_ARG = "removeDuplicateConnections";
+
+    private static final String REMOVE_ORPHAN_CONNECTIONS_ARG = "removeOrphanConnections";
 
     private static final String LATEST_TIMESTAMP_ARG = "latestTimestamp";
 
@@ -95,11 +90,17 @@
 
         options.addOption( collectionOption );
 
-        Option removeConnectionsOption =
-                OptionBuilder.withArgName(REMOVE_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove orphaned connections" )
-                        .create(REMOVE_CONNECTIONS_ARG);
+        Option removeOrphanConnectionsOption =
+            OptionBuilder.withArgName(REMOVE_ORPHAN_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove orphaned connections" )
+                .create(REMOVE_ORPHAN_CONNECTIONS_ARG);
 
-        options.addOption( removeConnectionsOption );
+        options.addOption( removeOrphanConnectionsOption );
+
+        Option removeDuplicateConnectionsOption =
+                OptionBuilder.withArgName(REMOVE_DUPLICATE_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove duplicate connections" )
+                        .create(REMOVE_DUPLICATE_CONNECTIONS_ARG);
+
+        options.addOption( removeDuplicateConnectionsOption );
 
         Option earliestTimestampOption =
                 OptionBuilder.withArgName(EARLIEST_TIMESTAMP_ARG).hasArg().isRequired( false ).withDescription( "earliest timestamp to delete" )
@@ -137,7 +138,8 @@
 
         String applicationOption = line.getOptionValue(APPLICATION_ARG);
         String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG);
-        String removeConnectionsOption = line.getOptionValue(REMOVE_CONNECTIONS_ARG);
+        String removeOrphanConnectionsOption = line.getOptionValue(REMOVE_ORPHAN_CONNECTIONS_ARG);
+        String removeDuplicateConnectionsOption = line.getOptionValue(REMOVE_DUPLICATE_CONNECTIONS_ARG);
         String earliestTimestampOption = line.getOptionValue(EARLIEST_TIMESTAMP_ARG);
         String latestTimestampOption = line.getOptionValue(LATEST_TIMESTAMP_ARG);
         String secondsInPastOption = line.getOptionValue(SECONDS_IN_PAST_ARG);
@@ -152,7 +154,8 @@
         }
         String entityType = entityTypeOption;
 
-        final boolean removeOrphans = !isBlank(removeConnectionsOption) && removeConnectionsOption.toLowerCase().equals("yes");
+        final boolean removeOrphans = !isBlank(removeOrphanConnectionsOption) && removeOrphanConnectionsOption.toLowerCase().equals("yes");
+        final boolean removeDuplicates = !isBlank(removeDuplicateConnectionsOption) && removeDuplicateConnectionsOption.toLowerCase().equals("yes");
 
         if (!isBlank(secondsInPastOption) && !isBlank(latestTimestampOption)) {
             throw new RuntimeException("Can't specify both latest timestamp and seconds in past options.");
@@ -222,32 +225,63 @@
             new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                 Optional.absent(), false );
 
+        Set<UUID> uuidSet = new HashSet<>();
+
         gm.loadEdgesFromSource(search).map(markedEdge -> {
 
             UUID uuid = markedEdge.getTargetNode().getUuid();
+            long edgeTimestamp = markedEdge.getTimestamp();
+            String edgeType = markedEdge.getType();
+            boolean duplicate = uuidSet.contains(uuid);
+            if (!duplicate) {
+                uuidSet.add(uuid);
+            }
             try {
                     EntityRef entityRef = new SimpleEntityRef(entityType, uuid);
                     org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef);
 
                     long timestamp = 0;
+                    DateFormat df = new SimpleDateFormat();
+                    df.setTimeZone(TimeZone.getTimeZone("GMT"));
                     String dateString = "NOT TIME-BASED";
                     if (UUIDUtils.isTimeBased(uuid)){
                         timestamp = UUIDUtils.getTimestampInMillis(uuid);
                         Date uuidDate = new Date(timestamp);
-                        DateFormat df = new SimpleDateFormat();
-                        df.setTimeZone(TimeZone.getTimeZone("GMT"));
                         dateString = df.format(uuidDate) + " GMT";
                     }
+                    Date uuidEdgeDate = new Date(UUIDUtils.getUnixTimestampInMillisFromUUIDTimestamp(edgeTimestamp));
+                    String edgeDateString = df.format(uuidEdgeDate) + " GMT";
+
 
                     if ( retrieved != null ){
 
-                        logger.info("{} - {} - entity data found", uuid, dateString);
+                        if (duplicate) {
+                            if (removeDuplicates) {
+                                logger.info("DUPLICATE ENTITY (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
+                                    uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
+                                try {
+                                    MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID());
+                                    logger.info("BATCH: {}", batch);
+                                    batch.execute();
+                                } catch (Exception e) {
+                                    logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage());
+                                }
+                            } else {
+                                logger.info("DUPLICATE ENTITY (WON'T REMOVE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
+                                    uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
+                            }
+
+                        } else {
+                            logger.info("ENTITY: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
+                                uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
+                        }
+
+
                     }else{
                         if (removeOrphans && timestamp >= earliestTimestamp && timestamp <= latestTimestamp) {
-                            logger.info("{} - {} - entity data NOT found, REMOVING", uuid, dateString);
+                            logger.info("NOT FOUND (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
+                                uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
                             try {
-                                //em.removeItemFromCollection(headEntity, collectionName, entityRef );
-                                logger.info("entityRef.asId(): {}", entityRef.asId());
                                 MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID());
                                 logger.info("BATCH: {}", batch);
                                 batch.execute();
@@ -255,9 +289,11 @@
                                 logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage());
                             }
                         } else if (removeOrphans) {
-                            logger.info("{} - {} ({}) - entity data NOT found, not removing because timestamp not in range", uuid, dateString, timestamp);
+                            logger.info("NOT FOUND (TIMESTAMP OUT OF RANGE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
+                                uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
                         } else {
-                            logger.info("{} - {} ({}) - entity data NOT found", uuid, dateString, timestamp);
+                            logger.info("NOT FOUND: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
+                                uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
                         }
                     }
                 } catch (Exception e) {
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
index 821d011..b07d09d 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
@@ -17,378 +17,195 @@
 package org.apache.usergrid.tools;
 
 
+import static org.apache.usergrid.management.AccountCreationProps.PROPERTIES_USERGRID_BINARY_UPLOADER;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.usergrid.persistence.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.usergrid.management.OrganizationInfo;
-import org.apache.usergrid.management.UserInfo;
-import org.apache.usergrid.tools.bean.ExportOrg;
-import org.apache.usergrid.utils.JsonUtils;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.export.ExportRequestBuilder;
+import org.apache.usergrid.corepersistence.export.ExportRequestBuilderImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.management.UserInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.services.assets.BinaryStoreFactory;
+import org.apache.usergrid.services.assets.data.BinaryStore;
+import org.apache.usergrid.tools.bean.ExportOrg;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Optional;
 import com.google.common.collect.BiMap;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import rx.Observable;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
 
 
 public class Export extends ExportingToolBase {
 
     static final Logger logger = LoggerFactory.getLogger( Export.class );
+    public static final String LAST_ID = "lastId";
+    
+    
+    @Autowired
+    private BinaryStoreFactory binaryStoreFactory;
 
     JsonFactory jsonFactory = new JsonFactory();
-
+    
+    private AllEntityIdsObservable allEntityIdsObs;
+    private SimpleEdge lastEdge = null;
+    
+    //TODO : Add blocking queues for these executors where appropriate
+    private ExecutorService orgAppCollParallelizer = Executors.newFixedThreadPool(3);
+    private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
+	private ExecutorService enitityMemberFetcher = Executors.newFixedThreadPool(10);
+	private ExecutorService assetsFetcher = Executors.newFixedThreadPool(10);
+	
 
     @Override
-    public void runTool( CommandLine line ) throws Exception {
-        startSpring();
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
+  
+    	Options options = super.createOptions();
+    	
+    	Option lastId = OptionBuilder.withArgName( LAST_ID ).hasArg()
+                .withDescription( "Last Entity Id to resume from" ).create( LAST_ID );
+    	options.addOption( lastId);
+    	
+    	return options;
+    }
 
+    
+    @Override
+    public void runTool( CommandLine line ) throws Exception {
+    	
+        startSpring();
         setVerbose( line );
 
-        // ExportDataCreator dataCreator = new ExportDataCreator(emf,
-        // managementService);
-        // dataCreator.createTestData();
-
-        applyOrgId( line );
+	    Gson gson = new GsonBuilder().create(); 
+        
+        this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class);
+        applyExportParams(line);
         prepareBaseOutputFileName( line );
+        
+        
+        if(lastEdgeJson != null) {
+        	JSONObject lastEdgeJsonObj = new JSONObject(lastEdgeJson);
+        	UUID uuid = UUID.fromString(lastEdgeJsonObj.getJSONObject("sourceNode").getString("uuid"));
+        	Id sourceId = new SimpleId(uuid, lastEdgeJsonObj.getJSONObject("sourceNode").getString("type"));
+        	uuid = UUID.fromString(lastEdgeJsonObj.getJSONObject("targetNode").getString("uuid"));
+        	Id targetId = new SimpleId(uuid, lastEdgeJsonObj.getJSONObject("targetNode").getString("type"));
+        	lastEdge = new SimpleEdge(sourceId, lastEdgeJsonObj.getString("type"), targetId, lastEdgeJsonObj.getLong("timestamp"));
+        }
+        
         outputDir = createOutputParentDir();
         logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+        
 
         // Export organizations separately.
         exportOrganizations();
+        
+        logger.info("Finished export waiting for threads to end.");
 
-        // Loop through the organizations
-        Map<UUID, String> organizations = getOrgs();
-        for ( Entry<UUID, String> organization : organizations.entrySet() ) {
+		while(true) {
+			try {
+				//Spinning to prevent program execution from ending.
+				//Need to replace with some kind of countdown latch or task tracker
+				Thread.sleep(10000);
+			} catch (InterruptedException e) {
+				logger.error("Exception while waiting for export to complete.",e);
+			}
+		}
 
-            if ( organization.equals( properties.getProperty( "usergrid.test-account.organization" ) ) ) {
-                // Skip test data from being exported.
-                continue;
-            }
-
-            exportApplicationsForOrg( organization );
-        }
     }
-
-
-    private Map<UUID, String> getOrgs() throws Exception {
-        // Loop through the organizations
-        Map<UUID, String> organizationNames = null;
-
-        if ( orgId == null ) {
-            organizationNames = managementService.getOrganizations();
-        }
-
-        else {
-            OrganizationInfo info = managementService.getOrganizationByUuid( orgId );
-
-            if ( info == null ) {
-                logger.error( "Organization info is null!" );
-                System.exit( 1 );
-            }
-
-            organizationNames = new HashMap<UUID, String>();
-            organizationNames.put( orgId, info.getName() );
-        }
-
-
-        return organizationNames;
-    }
-
-
-    private void exportApplicationsForOrg( Entry<UUID, String> organization ) throws Exception {
-        logger.info( "" + organization );
-
-        // Loop through the applications per organization
-        BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( organization.getKey() );
-        for ( Entry<UUID, String> application : applications.entrySet() ) {
-
-            logger.info( application.getValue() + " : " + application.getKey() );
-
-            // Get the JSon serializer.
-            com.fasterxml.jackson.core.JsonGenerator jg =
-                getJsonGenerator( createOutputFile( "application", application.getValue() ) );
-
-            // load the dictionary
-            EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
-
-            Entity appEntity = rootEm.get( new SimpleEntityRef( "application", application.getKey()));
-
-            Map<String, Object> dictionaries = new HashMap<String, Object>();
-
-            for ( String dictionary : rootEm.getDictionaries( appEntity ) ) {
-                Map<Object, Object> dict = rootEm.getDictionaryAsMap( appEntity, dictionary );
-
-                // nothing to do
-                if ( dict.isEmpty() ) {
-                    continue;
-                }
-
-                dictionaries.put( dictionary, dict );
-            }
-
-            EntityManager em = emf.getEntityManager( application.getKey() );
-
-            // Get application
-            Entity nsEntity = em.get( new SimpleEntityRef( "application", application.getKey()));
-
-            Set<String> collections = em.getApplicationCollections();
-
-            // load app counters
-
-            Map<String, Long> entityCounters = em.getApplicationCounters();
-
-            nsEntity.setMetadata( "organization", organization );
-            nsEntity.setMetadata( "dictionaries", dictionaries );
-            // counters for collections
-            nsEntity.setMetadata( "counters", entityCounters );
-            nsEntity.setMetadata( "collections", collections );
-
-            jg.writeStartArray();
-            jg.writeObject( nsEntity );
-
-            // Create a GENERATOR for the application collections.
-            JsonGenerator collectionsJg =
-                getJsonGenerator( createOutputFile( "collections", application.getValue() ) );
-            collectionsJg.writeStartObject();
-
-            Map<String, Object> metadata = em.getApplicationCollectionMetadata();
-            echo( JsonUtils.mapToFormattedJsonString( metadata ) );
-
-            // Loop through the collections. This is the only way to loop
-            // through the entities in the application (former namespace).
-            for ( String collectionName : metadata.keySet() ) {
-
-                Query query = new Query();
-                query.setLimit( MAX_ENTITY_FETCH );
-                query.setResultsLevel( Query.Level.ALL_PROPERTIES );
-
-                Results entities = em.searchCollection( em.getApplicationRef(), collectionName, query );
-
-                while ( entities.size() > 0 ) {
-
-                    for ( Entity entity : entities ) {
-                        // Export the entity first and later the collections for
-                        // this entity.
-                        jg.writeObject( entity );
-                        echo( entity );
-
-                        saveCollectionMembers( collectionsJg, em, application.getValue(), entity );
-                    }
-
-                    //we're done
-                    if ( entities.getCursor() == null ) {
-                        break;
-                    }
-
-
-                    query.setCursor( entities.getCursor() );
-
-                    entities = em.searchCollection( em.getApplicationRef(), collectionName, query );
-                }
-            }
-
-            // Close writer for the collections for this application.
-            collectionsJg.writeEndObject();
-            collectionsJg.close();
-
-            // Close writer and file for this application.
-            jg.writeEndArray();
-            jg.close();
-        }
-    }
-
-
-    /**
-     * Serialize and save the collection members of this <code>entity</code>
-     *
-     * @param em Entity Manager
-     * @param application Application name
-     * @param entity entity
-     */
-    private void saveCollectionMembers( JsonGenerator jg, EntityManager em, String application, Entity entity )
-            throws Exception {
-
-        Set<String> collections = em.getCollections( entity );
-
-        // Only create entry for Entities that have collections
-        if ( ( collections == null ) || collections.isEmpty() ) {
-            return;
-        }
-
-        jg.writeFieldName( entity.getUuid().toString() );
-        jg.writeStartObject();
-
-        for ( String collectionName : collections ) {
-
-            jg.writeFieldName( collectionName );
-            // Start collection array.
-            jg.writeStartArray();
-
-            Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Query.Level.IDS, false );
-
-            List<UUID> entityIds = collectionMembers.getIds();
-
-            if ( ( entityIds != null ) && !entityIds.isEmpty() ) {
-                for ( UUID childEntityUUID : entityIds ) {
-                    jg.writeObject( childEntityUUID.toString() );
-                }
-            }
-
-            // End collection array.
-            jg.writeEndArray();
-        }
-
-        // Write connections
-        saveConnections( entity, em, jg );
-
-        // Write dictionaries
-        saveDictionaries( entity, em, jg );
-
-        // End the object if it was Started
-        jg.writeEndObject();
-    }
-
-
-    /** Persists the connection for this entity. */
-    private void saveDictionaries( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
-
-        jg.writeFieldName( "dictionaries" );
-        jg.writeStartObject();
-
-        Set<String> dictionaries = em.getDictionaries( entity );
-        for ( String dictionary : dictionaries ) {
-
-            Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
-
-            // nothing to do
-            if ( dict.isEmpty() ) {
-                continue;
-            }
-
-            jg.writeFieldName( dictionary );
-
-            jg.writeStartObject();
-
-            for ( Entry<Object, Object> entry : dict.entrySet() ) {
-                jg.writeFieldName( entry.getKey().toString() );
-                jg.writeObject( entry.getValue() );
-            }
-
-            jg.writeEndObject();
-        }
-        jg.writeEndObject();
-    }
-
-
-    /** Persists the connection for this entity. */
-    private void saveConnections( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
-
-        jg.writeFieldName( "connections" );
-        jg.writeStartObject();
-
-        Set<String> connectionTypes = em.getConnectionTypes( entity );
-        for ( String connectionType : connectionTypes ) {
-
-            jg.writeFieldName( connectionType );
-            jg.writeStartArray();
-
-            Results results = em.getTargetEntities(
-                    entity, connectionType, null, Query.Level.IDS );
-
-            List<ConnectionRef> connections = results.getConnections();
-
-            for ( ConnectionRef connectionRef : connections ) {
-                jg.writeObject( connectionRef.getTargetRefs().getUuid() );
-            }
-
-            jg.writeEndArray();
-        }
-        jg.writeEndObject();
-    }
-
-  /*-
-   * Set<String> collections = em.getCollections(entity);
-   * for (String collection : collections) {
-   *   Results collectionMembers = em.getCollection(
-   *    entity, collection, null,
-   *    MAX_ENTITY_FETCH, Level.IDS, false);
-   *    write entity_id : { "collectionName" : [ids]
-   *  }
-   * }
-   *
-   *
-   *   {
-   *     entity_id :
-   *       { collection_name :
-   *         [
-   *           collected_entity_id,
-   *           collected_entity_id
-   *         ]
-   *       },
-   *     f47ac10b-58cc-4372-a567-0e02b2c3d479 :
-   *       { "activtites" :
-   *         [
-   *           f47ac10b-58cc-4372-a567-0e02b2c3d47A,
-   *           f47ac10b-58cc-4372-a567-0e02b2c3d47B
-   *         ]
-   *       }
-   *   }
-   *
-   * http://jackson.codehaus.org/1.8.0/javadoc/org/codehaus/jackson/JsonGenerator.html
-   *
-   *
-   *-
-   * List<ConnectedEntityRef> connections = em.getConnections(entityId, query);
-   */
-
-
+    
+    
+    
     private void exportOrganizations() throws Exception, UnsupportedEncodingException {
 
+		for (Entry<UUID, String> organizationName : getOrgs().entrySet()) {
 
-        for ( Entry<UUID, String> organizationName : getOrgs().entrySet() ) {
+			// Let's skip the test entities.
+			if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) {
+				continue;
+			}
 
-            // Let's skip the test entities.
-            if ( organizationName.equals( properties.getProperty( "usergrid.test-account.organization" ) ) ) {
-                continue;
-            }
+			OrganizationInfo orgInfo = managementService.getOrganizationByUuid(organizationName.getKey());
+			logger.info("Exporting Organization: " + orgInfo.getName());
 
-            OrganizationInfo acc = managementService.getOrganizationByUuid( organizationName.getKey() );
-            logger.info( "Exporting Organization: " + acc.getName() );
+			ExportOrg exportOrg = new ExportOrg(orgInfo);
 
-            ExportOrg exportOrg = new ExportOrg( acc );
+			List<UserInfo> users = managementService.getAdminUsersForOrganization(organizationName.getKey());
 
-            List<UserInfo> users = managementService.getAdminUsersForOrganization( organizationName.getKey() );
+			for (UserInfo user : users) {
+				exportOrg.addAdmin(user.getUsername());
+			}
 
-            for ( UserInfo user : users ) {
-                exportOrg.addAdmin( user.getUsername() );
-            }
+			File orgDir = createOrgDir(orgInfo.getName());
 
-            // One file per Organization.
-            saveOrganizationInFile( exportOrg );
-        }
-    }
+			// One file per Organization.
+			saveOrganizationMetadata(orgDir, exportOrg);
 
+			exportApplicationsForOrg(orgDir, organizationName.getKey(), organizationName.getValue());
+		}
+	}
 
     /**
      * Serialize an Organization into a json file.
+     * @param orgDir 
      *
      * @param acc OrganizationInfo
      */
-    private void saveOrganizationInFile( ExportOrg acc ) {
+    private void saveOrganizationMetadata( File orgDir, ExportOrg acc ) {
+    	
         try {
 
-            File outFile = createOutputFile( "organization", acc.getName() );
+            File outFile = createOutputFile( orgDir, "organization", acc.getName() );
             com.fasterxml.jackson.core.JsonGenerator jg = getJsonGenerator( outFile );
             jg.writeObject( acc );
             jg.close();
@@ -399,32 +216,503 @@
     }
 
 
-    public void streamOutput( File file, List<Entity> entities ) throws Exception {
-        JsonFactory jsonFactory = new JsonFactory();
-        // or, for data binding,
-        // org.codehaus.jackson.mapper.MappingJsonFactory
-        JsonGenerator jg = jsonFactory.createJsonGenerator( file, JsonEncoding.UTF8 );
-        // or Stream, Reader
+    private Map<UUID, String> getOrgs() throws Exception {
+        // Loop through the organizations
+        Map<UUID, String> organizationNames = null;
+
+        if ( orgId == null && (orgName == null || orgName.trim().equals(""))) {
+            organizationNames = managementService.getOrganizations();
+        }
+        else {
+            OrganizationInfo info = null;
+            
+            if( orgId != null ) {
+            	info = managementService.getOrganizationByUuid( orgId );
+            }
+            else  {
+            	info = managementService.getOrganizationByName( orgName );
+            }
+
+            if ( info == null ) {
+                logger.error( "Organization info is null!" );
+                System.exit( 1 );
+            }
+
+            organizationNames = new HashMap<UUID, String>();
+            organizationNames.put( info.getUuid(), info.getName() );
+        }
+
+        return organizationNames;
+    }
+    
+    private void exportApplicationsForOrg(File orgDir, UUID orgId, String orgName ) throws Exception {
+        
+    	logger.info("Exporting applications for {} : {} ",orgId, orgName);
+
+        // Loop through the applications per organization
+        BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( orgId );
+        
+        if ( applicationId == null && (applicationName == null || applicationName.trim().equals(""))) {
+        	//export all apps as appId or name is not provided
+        	
+        	Observable.from(applications.entrySet())
+        	.subscribeOn(Schedulers.from(orgAppCollParallelizer))
+        	.subscribe(appEntry -> {
+        		UUID appId = appEntry.getKey();
+	        	String appName = appEntry.getValue().split("/")[1];
+	        	try {
+					exportApplication(orgDir, appId, appName);
+				} catch (Exception e) {
+					logger.error("There was an exception exporting application {} : {}",appName, appId, e);
+				}
+        	});
+        	
+        }
+        else {
+        	
+        	UUID appId = applicationId;
+        	String appName = applicationName;
+        	
+        	if( applicationId != null ) {
+            	appName = applications.get(appId);
+            }
+            else  {
+            	appId = applications.inverse().get(orgName+'/'+appName);
+            }
+	        
+        	try {
+				exportApplication(orgDir, appId, appName);
+			} catch (Exception e) {
+				logger.error("There was an exception exporting application {} : {}",appName, appId, e);
+			}
+
+        }
+    }
+    
+    private void exportApplication(File orgDir, UUID appId, String appName) throws Exception {
+    
+    	
+    	logger.info( "Starting application export for {} : {} ",appName, appId );
+    	File appDir = createApplicationDir(orgDir, appName);
+    	
+    	JsonGenerator jg =
+                getJsonGenerator( createOutputFile( appDir, "application", appName) );
+
+        // load the dictionary
+        EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+
+        Entity appEntity = rootEm.get( new SimpleEntityRef( "org_application", appId));
+
+        Map<String, Object> dictionaries = new HashMap<String, Object>();
+        
+        for ( String dictionary : rootEm.getDictionaries( appEntity ) ) {
+            Map<Object, Object> dict = rootEm.getDictionaryAsMap( appEntity, dictionary );
+
+            // nothing to do
+            if ( dict.isEmpty() ) {
+                continue;
+            }
+
+            dictionaries.put( dictionary, dict );
+        }
+
+        EntityManager em = emf.getEntityManager( appId);
+
+        // Get application
+        Entity nsEntity = em.get( new SimpleEntityRef( "application", appId));
+
+        Set<String> collections = em.getApplicationCollections();
+
+        // load app counters
+
+        Map<String, Long> entityCounters = em.getApplicationCounters();
+
+        //nsEntity.setMetadata( "organization", orgName );
+        nsEntity.setMetadata( "dictionaries", dictionaries );
+        // counters for collections
+        nsEntity.setMetadata( "counters", entityCounters );
+        nsEntity.setMetadata( "collections", collections );
 
         jg.writeStartArray();
-        for ( Entity entity : entities ) {
-            jg.writeObject( entity );
-        }
-        jg.writeEndArray();
-
+        jg.writeObject( nsEntity );
         jg.close();
-    }
+        
+        if ( collNames == null || collNames.length <= 0) {
+        	//export all collections as collection names are not provided
+        	
+        	Observable.from(collections)
+        	.subscribeOn(Schedulers.from(orgAppCollParallelizer))
+        	.subscribe(collectionName -> {
+        		exportCollection(appDir, appId, collectionName, em);
+        	});
+        	
+        }
+        else {
+        	Observable.from(collNames)
+        	.subscribeOn(Schedulers.from(orgAppCollParallelizer))
+        	.subscribe(collectionName -> {
+        		if(collections.contains(collectionName)) {
+        			exportCollection(appDir, appId, collectionName, em);
+        		}
+        	});
+        }
+		
+	}
 
-    // to generate the activities and user relationship, follow this:
+	private void exportCollection(File appDir, UUID appId, String collectionName, EntityManager em) {
+		File collectionDir = createCollectionDir(appDir, collectionName);
+		extractEntityIdsForCollection(collectionDir, appId, collectionName);
+	}
+	
+	private void extractEntityIdsForCollection(File collectionDir, UUID applicationId, String collectionName) {
+		
+		AtomicInteger batch = new AtomicInteger(1);
+		
+		final EntityManager rootEm = emf.getEntityManager(applicationId);
+		final Gson gson = new GsonBuilder().create();
+		ManagerCache managerCache = injector.getInstance(ManagerCache.class);
+		ExportRequestBuilder builder = new ExportRequestBuilderImpl().withApplicationId(applicationId);
+		final ApplicationScope appScope = builder.getApplicationScope().get();
+		GraphManager gm = managerCache.getGraphManager(appScope);
+		EntityCollectionManagerFactory entityCollectionManagerFactory = injector
+				.getInstance(EntityCollectionManagerFactory.class);
+		final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
 
-    // write field name (id)
-    // write start object
-    // write field name (collection name)
-    // write start array
-    // write object/string
-    // write another object
-    // write end array
-    // write end object
-    // ...... more objects
-    //
+		ExecutorService entityIdWriter = Executors.newFixedThreadPool(1);
+		allEntityIdsObs
+		.getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),Optional.fromNullable(CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),(this.lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
+		.buffer(1000)
+		.finallyDo(()-> {
+			entityIdWriter.shutdown();
+			logger.info("Finished fetching entity ids for {}. Shutting down entity id writer executor ", collectionName);
+			while(!entityIdWriter.isTerminated()) {
+				try {
+					entityIdWriter.awaitTermination(10, TimeUnit.SECONDS);
+				} catch (InterruptedException e) {
+				}
+			}
+			logger.info("Entity id writer executor terminated after shutdown for {}", collectionName);
+		})
+		.subscribe(edges -> {
+			
+			logger.info("For collection {}" , collectionName);
+			Integer batchId = batch.getAndIncrement();
+			logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
+			Observable.just(edges)
+			.subscribeOn(Schedulers.from(entityIdWriter)) 
+			.subscribe(edgeScopes -> {
+
+				List<UUID> entityIds = new ArrayList<UUID>(1000);
+	
+				for (EdgeScope edgeScope : edgeScopes) {
+					// write to file
+					Id entityId = edgeScope.getEdge().getTargetNode();
+					if (entityId != null) {
+						entityIds.add(entityId.getUuid());
+					} else {
+						edgeScopes.remove(edgeScope);
+					}
+				}
+				// extract name for this batch
+				try {
+					writeEntityIdsBatch(collectionDir, edgeScopes, batchId, collectionName);
+					String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
+					
+					Observable.just(entityIds)
+					.subscribeOn(Schedulers.from(entityFetcher)) // change to
+					.subscribe(entIds -> {
+						
+						// get entities here
+						logger.info("entIds count {} for type {}", entIds.size(), type);
+						Results entities = rootEm.getEntities(entIds, type);
+						int size = entities.getEntities().size();
+						logger.info("Got {} entities.", size);
+						
+						if(!skipConnections || !skipDictionaries || !skipAssets) {
+							
+							ConnectableObservable<Results> entityObs = Observable.just(entities)
+									.publish();
+							entityObs.subscribeOn(Schedulers.from(enitityMemberFetcher));
+	
+							
+							// fetch and write connections
+							if(!skipConnections) {
+								entityObs.subscribe(entity -> {
+									fetchConnections(gm, ecm, entity, collectionDir, collectionName, batchId, gson);
+	
+								});
+							}
+							// fetch and write dictionaries
+							if(!skipDictionaries) {
+								entityObs.subscribe(entity -> {
+									fetchDictionaries(collectionDir, collectionName, rootEm,
+										entity, gson, batchId);
+								});
+							}
+							
+							if(!skipAssets) {
+								File assetsDir = createDir(collectionDir.getAbsolutePath(), "files"); 
+								entityObs.subscribe(entity -> {
+									try {
+										fetchAssets(assetsDir, applicationId, collectionName, batchId, entities);
+									} catch (Exception e) {
+										logger.error("Exception while trying to fetch assets for app {}, collection {}, batch {} ",
+												applicationId, collectionName, batchId, e);
+									}
+								});
+							}
+							entityObs.connect();
+						}
+						writeEntities(collectionDir, entities, batchId, collectionName, gson);
+					});
+	
+				} catch (Exception e) {
+					logger.error("There was an error writing entity ids to file for "
+							+ edgeScopes.get(0).getEdge(), e);
+					// since entity id writing has failed, we need to see how we can not exit the
+					// whole program
+					System.exit(0);
+				}
+			});
+
+			logger.info("Finished fetching details for collection {} for batch {}", collectionName, batchId);
+		});
+		logger.info("Exiting extractEntityIdsForCollection() method.");
+	}
+
+	private void fetchAssets(File assetsDir, UUID applicationId, String collectionName, Integer batchId,
+			Results entities) throws Exception {
+
+		List<Entity> entitiesWithAssets = new ArrayList<>();
+
+		for (Entity e : entities.getEntities()) {
+			if (e.getProperty("file-metadata") != null) {
+				entitiesWithAssets.add(e);
+			}
+		}
+
+		if (!entitiesWithAssets.isEmpty()) {
+
+			writeAssets(assetsDir, collectionName, batchId, entitiesWithAssets);
+
+			ConnectableObservable<Entity> entityAssets = Observable.from(entitiesWithAssets).publish();
+			entityAssets.subscribeOn(Schedulers.from(assetsFetcher));
+			entityAssets.subscribe(e -> {
+				// Write code to fetch these assets from entity store.
+				BinaryStore binaryStore = null;
+				try {
+					binaryStore = binaryStoreFactory
+							.getBinaryStore(properties.getProperty(PROPERTIES_USERGRID_BINARY_UPLOADER));
+				} catch (Exception e2) {
+					logger.error("Except on while trying to get binary store for property {}, ", properties.getProperty(PROPERTIES_USERGRID_BINARY_UPLOADER), e2 );
+				}
+
+						File file = new File(assetsDir + "/" + collectionName + "_assets_" + e.getUuid());
+						try (InputStream in = binaryStore.read(applicationId, e);
+								OutputStream out = new BufferedOutputStream(new FileOutputStream(file));) {
+
+							int read = -1;
+
+							while ((read = in.read()) != -1) {
+								out.write(read);
+							}
+
+						} catch (Exception e1) {
+							logger.error("Exception while to write assets file for entity {}", e.getUuid(), e1);
+						}
+
+			});
+			entityAssets.connect();
+		}
+	}
+
+	private void writeAssets(final File collectionDir, final String collectionName, final Integer batchId,
+			List<Entity> entitiesWithAssets2) {
+
+		try (BufferedWriter assetsWriter = new BufferedWriter(
+				new FileWriter(new File(collectionDir + "/" + collectionName + "_assets_" + batchId + ".json")));) {
+			for (Entity e : entitiesWithAssets2) {
+				JSONObject object = new JSONObject();
+				object.put("uuid", e.getUuid());
+				object.put("type", e.getType());
+				object.put("file-metadata", e.getProperty("file-metadata"));
+				object.put("file", (e.getProperty("file") != null) ? e.getProperty("file") : null);
+				assetsWriter.write(object.toString());
+				assetsWriter.newLine();
+			}
+		} catch (Exception ex) {
+			logger.error("Exception while trying to write entities collection {} batch {}", collectionName, batchId,
+					ex);
+		}
+	}
+
+	private void fetchDictionaries(File collectionDir, String collectionName, final EntityManager rootEm,
+			Results entity, Gson gson, Integer batchId) {
+		
+		//TODO : still using JsonGenerator 
+		JsonGenerator jgDictionaries = null;
+		try {
+			jgDictionaries = getJsonGenerator(new File(collectionDir + "/" + collectionName + "_" + "dictionaries_" + batchId));
+
+			for (Entity et : entity.getEntities()) {
+				Set<String> dictionaries;
+				try {
+					dictionaries = rootEm.getDictionaries(et);
+					
+					jgDictionaries.writeStartArray();
+					if (dictionaries != null && !dictionaries.isEmpty()) {
+						for (String dictionary : dictionaries) {
+							Map<Object, Object> dict = rootEm.getDictionaryAsMap(et, dictionary);
+							if (dict != null && dict.isEmpty()) {
+								continue;
+							}
+							
+							jgDictionaries.writeStartObject();
+							jgDictionaries.writeObjectField(dictionary, dict);
+							jgDictionaries.writeEndObject();
+						}
+						jgDictionaries.writeEndArray();
+					}
+				} catch (Exception e) {
+					logger.error("Exception while trying to fetch dictionaries.", e);
+				}
+			}
+		} catch (Exception e) {
+			logger.error("Exception while trying to fetch dictionaries.", e);
+		} finally {
+			if (jgDictionaries != null) {
+				try {
+					jgDictionaries.close();
+				} catch (IOException e) {
+					logger.error("Exception while trying to close dictionaries writer.", e);
+				}
+			}
+		}
+	}
+
+	private void fetchConnections(GraphManager gm, final EntityCollectionManager ecm, Results entity,
+			File collectionDir, String collectionName, Integer batchId, Gson gson) {
+		
+		try(BufferedWriter bufferedWriter = new BufferedWriter(
+				new FileWriter(new File(collectionDir + "/" + collectionName + "_" + "connections_" + batchId)));){
+			
+			for (Entity et : entity.getEntities()) {
+				
+				List<ConnectionPojo> connections = new ArrayList<>();
+				
+				SimpleId id = new SimpleId();
+				id.setType(et.getType());
+				id.setUuid(et.getUuid());
+
+				gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(id))
+						.flatMap(emittedEdgeType -> {
+							logger.debug("loading edges of type {} from node {}", emittedEdgeType, id);
+							return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(id, emittedEdgeType,
+									Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
+						}).map(markedEdge -> {
+
+							if (!markedEdge.isDeleted() && !markedEdge.isTargetNodeDeleted()
+									&& markedEdge.getTargetNode() != null) {
+
+								// doing the load to just again make sure bad
+								// connections are not exported
+								org.apache.usergrid.persistence.model.entity.Entity en = ecm
+										.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+
+								if (en != null) {
+
+									try {
+										
+										ConnectionPojo connectionPojo = new ConnectionPojo();
+										connectionPojo.setRelationship(CpNamingUtils
+														.getConnectionNameFromEdgeName(markedEdge.getType()));
+										connectionPojo.setSourceNodeUUID(markedEdge.getSourceNode().getUuid().toString());
+										connectionPojo.setTargetNodeUUID(markedEdge.getTargetNode().getUuid().toString());
+
+										connections.add(connectionPojo);
+										
+									} catch (Exception e) {
+										logger.error("Exception while trying process connection entity", e);
+									}
+								} else {
+									logger.warn(
+											"Exported connection has a missing target node, not creating connection in export. Edge: {}",
+											markedEdge);
+								}
+							}
+							return null;
+
+						}).toBlocking().lastOrDefault(null);
+				
+				for(ConnectionPojo c : connections) {
+					bufferedWriter.write(gson.toJson(c));
+					bufferedWriter.newLine();
+				}
+			}
+		}catch (Exception e) {
+			logger.error("Exception while trying to write connection to file.", e);
+		}
+		logger.info("Finished fetching details for collection {} batch {}", collectionName, batchId);
+	}
+	
+	
+	class ConnectionPojo {
+		private String sourceNodeUUID;
+		private String relationship;
+		private String targetNodeUUID;
+		public String getSourceNodeUUID() {
+			return sourceNodeUUID;
+		}
+		public void setSourceNodeUUID(String sourceNodeUUID) {
+			this.sourceNodeUUID = sourceNodeUUID;
+		}
+		public String getRelationship() {
+			return relationship;
+		}
+		public void setRelationship(String relationship) {
+			this.relationship = relationship;
+		}
+		public String getTargetNodeUUID() {
+			return targetNodeUUID;
+		}
+		public void setTargetNodeUUID(String targetNodeUUID) {
+			this.targetNodeUUID = targetNodeUUID;
+		}
+		
+	}
+
+	private void writeEntities(File collectionDir, Results entities, Integer batchId, String collectionName, Gson gson) {
+		logger.info("Started writing entities for collection {} batch {} ", collectionName, batchId);
+		
+		try(BufferedWriter bufferedWriter = new BufferedWriter(
+				new FileWriter(new File(collectionDir + "/" + collectionName + "_data_" + batchId + ".json")));) {
+			
+			logger.info("Got count {} entities for file writing", entities.getEntities().size());
+			for(Entity e : entities.getEntities()) {
+				bufferedWriter.write(gson.toJson(e));
+				bufferedWriter.newLine();
+			}
+			
+		} catch (Exception e) {
+			logger.error("Exception while trying to write entities collection {} batch {}", collectionName, batchId, e);
+		}
+		
+		logger.info("Finised writing entities for collection {} batch {} ", collectionName, batchId);
+	}
+
+	private void writeEntityIdsBatch(File collectionDir, List<EdgeScope> edgeScopes, Integer batchId,
+			String collectionName) throws Exception {
+		logger.info("Started writing ids for collection {} batch {} ", collectionName, batchId);
+		try (BufferedWriter bufferedWriter = new BufferedWriter(
+				new FileWriter(new File(collectionDir + "/" + collectionName + "_" + batchId)));) {
+			for (EdgeScope es : edgeScopes) {
+				bufferedWriter.write(es.getEdge().toString());
+				bufferedWriter.newLine();
+			}
+		} catch (Exception e) {
+			logger.error("Exception while tryign to write entity ids for collection {} batch {}", collectionName, batchId, e);
+		}
+		logger.info("Finished writing ids for collection {} batch {} ", collectionName, batchId);
+	}
+
 }
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c46f4b7..53e5ef3 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -293,8 +293,8 @@
                         try {
 
                             ExportConnection connection = new ExportConnection(
-                                    applicationName,
                                     organizationName,
+                                    applicationName,
                                     connectionType,
                                     exportEntity.getEntity().getUuid(),
                                     connectedEntity.getUuid());
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 6fb0911..5865d15 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -26,11 +26,15 @@
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.services.assets.BinaryStoreFactory.Provider;
 import org.apache.usergrid.utils.ConversionUtils;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.MissingOptionException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -39,7 +43,6 @@
 /**
  * Base class for ToolBase implementations that write output to an output directory and file.
  *
- * @author zznate
  */
 public abstract class ExportingToolBase extends ToolBase {
 
@@ -49,9 +52,31 @@
 
     /** Output dir option: -outputDir */
     protected static final String OUTPUT_DIR = "outputDir";
+    protected static final String ORG_ID = "orgid";
+    protected static final String ORG_NAME = "orgName";
+    protected static final String APP_ID = "appId";
+    protected static final String APP_NAME = "appName";
+    protected static final String COLL_NAMES = "collNames";
+    protected static final String APPEND_TIMESTAMP = "appendTimestamp";
+    protected static final String SKIP_CONN = "skipConnections";
+    protected static final String SKIP_DICT = "skipDictionaries";
+    protected static final String LAST_EDGE = "lastEdge";
+    protected static final String SKIP_ASSETS = "skipAssets";
+    protected static final String FIELD_TYPE = "fieldType";
+    protected static final String COLLECTION_NAME = "collectionName";
 
     protected String baseOutputDirName = "export";
     protected UUID orgId;
+    protected String orgName;
+    protected UUID applicationId;
+    protected String applicationName;
+    protected String[] collNames;
+    protected String fieldType;
+    protected boolean skipConnections = false;
+    protected boolean skipDictionaries = false;
+    protected boolean skipAssets = false;
+    protected String lastEdgeJson = null;
+    
     JsonFactory jsonFactory = new JsonFactory();
     protected long startTime = System.currentTimeMillis();
 
@@ -62,11 +87,41 @@
 
         Options options = super.createOptions();
 
-        Option outputDir = OptionBuilder.hasArg().withDescription( "output file name -outputDir" ).create( OUTPUT_DIR );
-        Option orgId = OptionBuilder.hasArg().withDescription( "Use a specific organization -orgId" ).create( "orgId" );
-
+        Option outputDir = OptionBuilder.hasArg().withDescription( "Output file name -outputDir" ).create( OUTPUT_DIR );
+        Option orgId = OptionBuilder.hasArg().withDescription( "Use a specific organization -orgId" ).create( ORG_ID );
+        Option appId = OptionBuilder.hasArg().withDescription( "Use a specific application -appId (Needs -orgId or -orgName)" ).create( APP_ID );
+        Option orgName = OptionBuilder.hasArg().withDescription( "Use a specific organization name -orgName" ).create( ORG_NAME );
+        Option appName = OptionBuilder.hasArg().withDescription( "Use a specific application name -appName (Needs -orgId or -orgName)" ).create( APP_NAME );
+        Option collNames = OptionBuilder.hasArg().withDescription( "Export list of comma separated collections -collNames (Needs -orgId or -orgName and -appId or -appName)" ).create( COLL_NAMES );
+        Option appendTimestamp = OptionBuilder.withDescription( "Attach timestamp to output directory -appendTimestamp" ).create( APPEND_TIMESTAMP );
+        Option skipConns = OptionBuilder.withDescription( "Skip exporting connections for entities -skipConnections" ).create( SKIP_CONN );
+        Option skipDicts = OptionBuilder.withDescription( "Skip exporting dictionaries for entities -skipDictionaries" ).create( SKIP_DICT );
+        Option lastEdge = OptionBuilder.hasArg().withDescription( "Last Edge from previous run to resume export -lastEdge" ).create( LAST_EDGE );
+        Option skipAssets = OptionBuilder.withDescription( "Skip exporting assets for entities -skipAssets" ).create( SKIP_ASSETS );
+        Option awsKey =  OptionBuilder.hasArg().withDescription( "AWS access key -awsKey" ).create( AWS_KEY );
+        Option storeType = OptionBuilder.hasArg().withDescription( "Binary store type -storeType (aws, google, local)" ).create( STORE_TYPE );
+        Option awsId = OptionBuilder.hasArg().withDescription( "AWS access id -awsId" ).create( AWS_ID );
+        Option bucketName = OptionBuilder.hasArg().withDescription( "Binary storage bucket name -bucketName" ).create( BINARY_BUCKET_NAME );
+        Option fieldType = OptionBuilder.hasArg().withDescription( "Field type for unique value check -fieldType" ).create( FIELD_TYPE );
+        Option collectionName = OptionBuilder.hasArg().withDescription( "Collection name for unique value check -collectionName" ).create( COLLECTION_NAME );
+        
         options.addOption( outputDir );
         options.addOption( orgId );
+        options.addOption( appId );
+        options.addOption( collNames );
+        options.addOption( appName );
+        options.addOption( orgName );
+        options.addOption( appendTimestamp );
+        options.addOption( skipConns );
+        options.addOption( skipDicts );
+        options.addOption( lastEdge );
+        options.addOption( skipAssets );
+        options.addOption( awsKey );
+        options.addOption( awsId );
+        options.addOption( bucketName );
+        options.addOption( storeType );
+        options.addOption(fieldType);
+        options.addOption(collectionName);
 
         return options;
     }
@@ -75,10 +130,15 @@
     protected void prepareBaseOutputFileName( CommandLine line ) {
 
         boolean hasOutputDir = line.hasOption( OUTPUT_DIR );
+        boolean appendTimestamp = line.hasOption( APPEND_TIMESTAMP );
 
         if ( hasOutputDir ) {
             baseOutputDirName = line.getOptionValue( OUTPUT_DIR );
         }
+        
+        if(appendTimestamp) {
+        	baseOutputDirName = baseOutputDirName + "_"+startTime;
+        }
     }
 
 
@@ -87,6 +147,71 @@
             orgId = ConversionUtils.uuid( line.getOptionValue( "orgId" ) );
         }
     }
+    
+    
+    protected void applyExportParams( CommandLine line ) {
+    	
+        if ( line.hasOption( ORG_ID ) ) {
+            orgId = ConversionUtils.uuid( line.getOptionValue( ORG_ID ) );
+        }
+        else if ( line.hasOption( ORG_NAME ) ) {
+            orgName = line.getOptionValue( ORG_NAME ) ;
+        }
+        
+        if ( line.hasOption( APP_ID ) ) {
+            applicationId = ConversionUtils.uuid( line.getOptionValue( APP_ID ) );
+        }
+        else if ( line.hasOption( APP_NAME ) ) {
+            applicationName = line.getOptionValue( APP_NAME ) ;
+        }
+        if ( line.hasOption( COLL_NAMES ) ) {
+            collNames = line.getOptionValue( COLL_NAMES ).split(",");
+        }
+        if(line.hasOption( COLLECTION_NAME )) {
+        	collNames = new String[] {line.getOptionValue( COLLECTION_NAME )};
+        }
+        skipConnections = line.hasOption( SKIP_CONN );
+        skipDictionaries = line.hasOption( SKIP_DICT );
+        
+        if(line.hasOption(LAST_EDGE)) {
+        	lastEdgeJson = line.getOptionValue(LAST_EDGE);
+        }
+        
+        skipAssets = line.hasOption( SKIP_ASSETS );
+        
+        if(line.hasOption( FIELD_TYPE )) {
+        	fieldType = line.getOptionValue( FIELD_TYPE );
+        }
+    }
+    
+	protected void validateOptions(CommandLine line) throws MissingOptionException {
+		if ((line.hasOption(APP_ID) || line.hasOption(APP_NAME))
+				&& !(line.hasOption(ORG_ID) || line.hasOption(ORG_NAME))) {
+			throw new MissingOptionException("-orgId or -orgName is required if you pass -appId or -appName");
+		}
+		if (line.hasOption(COLL_NAMES) && !(line.hasOption(APP_ID) || line.hasOption(APP_NAME))) {
+			throw new MissingOptionException(
+					"[-appId or -appName] and [-orgId or -orgName] are required if you pass -collNames");
+		}
+
+		if (!line.hasOption(SKIP_ASSETS)) {
+			if (line.hasOption(STORE_TYPE)) {
+				String storeType = line.getOptionValue(STORE_TYPE);
+				if (storeType.equals(Provider.aws.toString())) {
+					if (!line.hasOption(AWS_ID) || !line.hasOption(AWS_KEY) || !line.hasOption(BINARY_BUCKET_NAME)) {
+						throw new MissingOptionException(
+								"[-awsId and -awsKey and -bucketName] are required if you pass -storeType as aws");
+					}
+				} else if (storeType.equals(Provider.google.toString())) {
+					if (!line.hasOption(BINARY_BUCKET_NAME)) {
+						throw new MissingOptionException("[-bucketName] is required if you pass -storeType as google");
+					}
+				}
+			} else {
+				throw new MissingOptionException("[-storeType] is required if you do not pass -skipAssets");
+			}
+		}
+	}
 
 
     /**
@@ -116,10 +241,24 @@
     protected File createOutputFile( File parent, String type, String name ) {
         return new File( parent, prepareOutputFileName( type, name ) );
     }
+    
+    protected File createOrgDir( String orgName ) {
+        return createDir( outputDir.getAbsolutePath(), orgName);
+    }
+    
+    protected File createApplicationDir( File orgDir, String applicationName ) {
+        return createDir( orgDir.getAbsolutePath(), applicationName);
+    }
 
 
-    protected File createCollectionsDir( String applicationName ) {
-        return createDir( String.format( "%s/%s.applicationName.collections", outputDir, applicationName ) );
+    protected File createCollectionDir( File appDir, String collectionName ) {
+        return createDir( appDir.getAbsolutePath(), collectionName) ;
+    }
+    
+    
+    protected File createDir( String baseDir, String dirName ) {
+        
+        return createDir((baseDir!=null && !baseDir.trim().equals(""))?(baseDir + File.separator +dirName): dirName);
     }
 
 
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index 62636ea..e2711bb 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@ -71,9 +71,11 @@
     protected CassandraService cass;
 
     protected Injector injector;
-
-
-
+    
+    protected static final String AWS_KEY = "awsKey";
+    protected static final String AWS_ID = "awsId";
+    protected static final String BINARY_BUCKET_NAME = "bucketName";
+    protected static final String STORE_TYPE = "storeType";
 
     public void startTool( String[] args ) {
         startTool( args, true );
@@ -93,6 +95,15 @@
         if ( line == null ) {
             return;
         }
+        
+        
+        try {
+            validateOptions(line);
+        }
+        catch ( MissingOptionException exp ) {
+            printCliHelp( "Required or dependent options are missing.  Reason: " + exp.getMessage() );
+        }
+        
 
         // notification queue listener not needed for tools
         System.setProperty("usergrid.notifications.listener.run", "false");
@@ -105,11 +116,44 @@
 
         if ( line.hasOption( "host" ) ) {
             System.setProperty( "cassandra.url", line.getOptionValue( "host" ) );
+        }
+
+        if ( line.hasOption( "eshost" ) ) {
             System.setProperty( "elasticsearch.hosts", line.getOptionValue( "eshost" ) );
+        }
+
+        if ( line.hasOption( "escluster" ) ) {
             System.setProperty( "elasticsearch.cluster_name", line.getOptionValue( "escluster" ) );
+        }
+
+        if ( line.hasOption( "ugcluster" ) ) {
             System.setProperty( "usergrid.cluster_name", line.getOptionValue( "ugcluster" )  );
         }
 
+        if ( line.hasOption( "appkeyspace" ) ) {
+            System.setProperty( "cassandra.keyspace.application", line.getOptionValue( "appkeyspace" ) );
+        }
+
+        if ( line.hasOption( "lockskeyspace" ) ) {
+            System.setProperty( "cassandra.lock.keyspace", line.getOptionValue( "lockskeyspace" ) );
+        }
+
+        if(line.hasOption(AWS_ID)) {
+        	System.setProperty("AWS_ACCESS_KEY_ID", line.getOptionValue( AWS_ID ));
+        }
+        
+        if(line.hasOption(AWS_KEY)) {
+        	System.setProperty("AWS_SECRET_KEY", line.getOptionValue( AWS_KEY ));
+        }
+       
+        if(line.hasOption(BINARY_BUCKET_NAME)) {
+        	System.setProperty("usergrid.binary.bucketname", line.getOptionValue( BINARY_BUCKET_NAME ));
+        }
+        
+        if(line.hasOption( STORE_TYPE )) {
+        	System.setProperty("usergrid.binary.uploader", line.getOptionValue( STORE_TYPE ));
+        }
+
         try {
             runTool( line );
         }
@@ -153,6 +197,12 @@
         Option remoteOption = OptionBuilder
             .withDescription( "Use remote Cassandra instance" ).create( "remote" );
 
+        Option ugAppKeyspace = OptionBuilder.withArgName( "appkeyspace" ).hasArg()
+            .withDescription( "Usergrid Application keyspace" ).create( "appkeyspace" );
+
+        Option ugLocksKeyspace = OptionBuilder.withArgName( "lockskeyspace" ).hasArg()
+            .withDescription( "Usergrid Locks keyspace" ).create( "lockskeyspace" );
+
         Option verbose = OptionBuilder
             .withDescription( "Print on the console an echo of the content written to the file" )
             .create( VERBOSE );
@@ -163,10 +213,17 @@
         options.addOption( esClusterOption );
         options.addOption( ugClusterOption );
         options.addOption( remoteOption );
+        options.addOption( ugAppKeyspace );
+        options.addOption( ugLocksKeyspace );
         options.addOption( verbose );
 
         return options;
     }
+    
+    
+    protected void validateOptions(CommandLine line) throws MissingOptionException {
+    	
+    }
 
 
     public void startEmbedded() throws Exception {
@@ -278,7 +335,10 @@
             "   cassandra.connections: {}\n" +
             "   usergrid.notifications.listener.run: {}\n" +
             "   usergrid.push.worker_count: {}\n" +
-            "   usergrid.scheduler.enabled: {}\n",
+            "   usergrid.scheduler.enabled: {}\n" +
+            "   cassandra.readcl: {}\n" +
+            "   usergrid.read.cl: {}\n" +
+            "   usergrid.binary.uploader: {}\n",
             properties.get("cassandra.url"),
             properties.get("cassandra.datacenter.local"),
             properties.get("cassandra.username"),
@@ -290,7 +350,11 @@
             properties.get("cassandra.connections"),
             properties.get("usergrid.notifications.listener.run"),
             properties.get("usergrid.push.worker_count"),
-            properties.get("usergrid.scheduler.enabled")
+            properties.get("usergrid.scheduler.enabled"),
+            properties.get("cassandra.readcl"),
+            properties.get("usergrid.read.cl"),
+            properties.get("usergrid.binary.uploader")
+            
         );
     }
 
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
new file mode 100644
index 0000000..04915be
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.common.base.Optional;
+import com.google.common.collect.BiMap;
+
+import rx.Observable;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+public class UniqueValueRepairer extends ExportingToolBase {
+
+	static final Logger logger = LoggerFactory.getLogger(UniqueValueRepairer.class);
+
+	JsonFactory jsonFactory = new JsonFactory();
+	public static final String LAST_ID = "lastId";
+
+	public static final String FIND_MISSING_UNIQUE_VALUES = "findMissingUniqueValues";
+	public static final String FIX_MISSING_VALUES = "fixUniqueValues";
+
+	private boolean findMissingUniqueValues = false;
+	private boolean fixMissingValue = false;
+
+	private AllEntityIdsObservable allEntityIdsObs;
+	private SimpleEdge lastEdge = null;
+
+	private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
+	private ExecutorService uniqueValueChecker = Executors.newFixedThreadPool(50);
+
+	private Session session;
+	private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+	private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+	@Override
+	@SuppressWarnings("static-access")
+	public Options createOptions() {
+
+		Options options = super.createOptions();
+
+		Option findMissingUniqueValues = OptionBuilder
+				.withDescription("Find entities with missing unique value entry  -findMissingUniqueValues")
+				.create(FIND_MISSING_UNIQUE_VALUES);
+		Option fixMissingUniqueValueEntries = OptionBuilder
+				.withDescription("Fix entities with missing unique value entry  -fixUniqueValues")
+				.create(FIX_MISSING_VALUES);
+
+		options.addOption(findMissingUniqueValues);
+		options.addOption(fixMissingUniqueValueEntries);
+
+		return options;
+	}
+
+	@Override
+	public void runTool(CommandLine line) throws Exception {
+
+		startSpring();
+		setVerbose(line);
+
+		this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class);
+		applyInputParams(line);
+
+		mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
+		uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
+		session = injector.getInstance(Session.class);
+
+		startEntityScan();
+
+		logger.info("Finished checking entities. Waiting for threads to complete execution.");
+
+		while (true) {
+			try {
+				// Spinning to prevent program execution from ending.
+				// Need to replace with some kind of countdown latch or task tracker
+				Thread.sleep(10000);
+			} catch (InterruptedException e) {
+				logger.error("Exception while waiting for unique check to complete.", e);
+			}
+		}
+	}
+
+	private void startEntityScan() throws Exception, UnsupportedEncodingException {
+
+		for (Entry<UUID, String> organizationName : getOrgs().entrySet()) {
+
+			// Let's skip the test entities.
+			if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) {
+				continue;
+			}
+			fetchApplicationsForOrgs(organizationName.getKey(), organizationName.getValue());
+		}
+	}
+
+	private Map<UUID, String> getOrgs() throws Exception {
+		// Loop through the organizations
+		Map<UUID, String> organizationNames = null;
+
+		if (orgId == null && (orgName == null || orgName.trim().equals(""))) {
+			organizationNames = managementService.getOrganizations();
+		} else {
+			OrganizationInfo info = null;
+
+			if (orgId != null) {
+				info = managementService.getOrganizationByUuid(orgId);
+			} else {
+				info = managementService.getOrganizationByName(orgName);
+			}
+
+			if (info == null) {
+				logger.error("Organization info is null!");
+				System.exit(1);
+			}
+
+			organizationNames = new HashMap<UUID, String>();
+			organizationNames.put(info.getUuid(), info.getName());
+		}
+
+		return organizationNames;
+	}
+
+	private void fetchApplicationsForOrgs(UUID orgId, String orgName) throws Exception {
+
+		logger.info("Fetch applications for {} : {} ", orgId, orgName);
+
+		// Loop through the applications per organization
+		BiMap<UUID, String> applications = managementService.getApplicationsForOrganization(orgId);
+
+		if (applicationId == null && (applicationName == null || applicationName.trim().equals(""))) {
+			// export all apps as appId or name is not provided
+
+			Observable.from(applications.entrySet()).subscribe(appEntry -> {
+				UUID appId = appEntry.getKey();
+				String appName = appEntry.getValue().split("/")[1];
+				try {
+					fetchApplications(appId, appName);
+				} catch (Exception e) {
+					logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+				}
+			});
+
+		} else {
+
+			UUID appId = applicationId;
+			String appName = applicationName;
+
+			if (applicationId != null) {
+				appName = applications.get(appId);
+			} else {
+				appId = applications.inverse().get(orgName + '/' + appName);
+			}
+
+			try {
+				fetchApplications(appId, appName);
+			} catch (Exception e) {
+				logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+			}
+
+		}
+	}
+
+	private void fetchApplications(UUID appId, String appName) throws Exception {
+
+		logger.info("Fetching application for {} : {} ", appName, appId);
+
+		EntityManager em = emf.getEntityManager(appId);
+
+		Set<String> collections = em.getApplicationCollections();
+
+		if (collNames == null || collNames.length <= 0) {
+			logger.info("Please pass collection name ( -collectionName testCollection ) ");
+		} else {
+			Observable.from(collNames).subscribe(collectionName -> {
+				if (collections.contains(collectionName)) {
+					fetchCollections(appId, collectionName, em);
+				}
+			});
+		}
+
+	}
+
+	private void fetchCollections(UUID appId, String collectionName, EntityManager em) {
+		extractEntitiesForCollection(appId, collectionName);
+	}
+
+	private void extractEntitiesForCollection(UUID applicationId, String collectionName) {
+
+		AtomicInteger batch = new AtomicInteger(1);
+
+		final EntityManager rootEm = emf.getEntityManager(applicationId);
+
+		ExecutorService edgeScopeFetcher = Executors.newFixedThreadPool(1);
+		allEntityIdsObs
+				.getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
+						Optional.fromNullable(
+								CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
+						(lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
+				.buffer(1000).finallyDo(() -> {
+					edgeScopeFetcher.shutdown();
+					logger.info("Finished fetching entity ids for {}. Shutting down entity edge scope fetcher ",
+							collectionName);
+					while (!edgeScopeFetcher.isTerminated()) {
+						try {
+							edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
+						} catch (InterruptedException e) {
+						}
+					}
+					logger.info("Entity edge scope fetcher terminated after shutdown for {}", collectionName);
+				}).subscribe(edges -> {
+
+					logger.info("For collection {}", collectionName);
+					Integer batchId = batch.getAndIncrement();
+					logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
+					Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes -> {
+
+						List<UUID> entityIds = new ArrayList<UUID>(1000);
+
+						for (EdgeScope edgeScope : edgeScopes) {
+							Id entityId = edgeScope.getEdge().getTargetNode();
+							if (entityId != null) {
+								entityIds.add(entityId.getUuid());
+							} else {
+								edgeScopes.remove(edgeScope);
+							}
+						}
+						try {
+							String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
+
+							Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // change to
+									.subscribe(entIds -> {
+
+										logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
+												type, batchId);
+										Results entities = rootEm.getEntities(entIds, type);
+										logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
+												type, batchId);
+										try {
+
+											ConnectableObservable<Entity> entityObs = Observable
+													.from(entities.getEntities()).publish();
+											entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
+											entityObs.subscribe(t -> {
+												logger.info("Fetched entity with UUID : {}", t.getUuid());
+												if (findMissingUniqueValues) {
+													String fieldValue = null;
+													//We can search entity with UUID or name/email based on the entity type. 
+													//This mapping between unique value field(name/email etc) and UUID,
+													//is stored in unique value table. This can either be name / email or any other type.
+													//This value is being passed as field type. 
+										            //The code below takes the parameter and retrieves the value of the field using the getter method. 
+													if (fieldType == null || fieldType.equals("")
+															|| fieldType.equals("name")) {
+														fieldType = "name";
+														fieldValue = t.getName();
+													} else {
+														try {
+															Method method = t.getClass()
+																	.getMethod("get"
+																			+ fieldType.substring(0, 1).toUpperCase()
+																			+ fieldType.substring(1));
+															fieldValue = (String) method.invoke(t);
+														} catch (Exception e1) {
+															logger.error(
+																	"Exception while trying to fetch field value of type {} for entity {} batch {}",
+																	fieldType, t.getUuid(), batchId, e1);
+														}
+													}
+													try {
+														if (fieldValue != null) {
+
+															Entity e = rootEm.getUniqueEntityFromAlias(t.getType(),
+																	fieldValue, false);
+
+															if (e == null) {
+																logger.info(
+																		"No entity found for field type {} and field value {} but exists for UUID {}",
+																		fieldType, fieldValue, t.getUuid());
+																if (fixMissingValue) {
+																	logger.info(
+																			"Trying to repair unique value mapping for {} ",
+																			t.getUuid());
+																	UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy
+																			.load(new ApplicationScopeImpl(new SimpleId(
+																					applicationId, "application")),
+																					ConsistencyLevel
+																							.valueOf(System.getProperty(
+																									"usergrid.read.cl",
+																									"LOCAL_QUORUM")),
+																					t.getType(),
+																					Collections.singletonList(
+																							new StringField(fieldType,
+																									fieldValue)),
+																					false);
+
+																	ApplicationScope applicationScope = new ApplicationScopeImpl(
+																			new SimpleId(applicationId, "application"));
+																	com.google.common.base.Optional<MvccEntity> entity = mvccEntitySerializationStrategy
+																			.load(applicationScope, new SimpleId(
+																					t.getUuid(), t.getType()));
+
+																	if (!entity.isPresent()
+																			|| !entity.get().getEntity().isPresent()) {
+																		throw new RuntimeException(
+																				"Unable to update unique value index because supplied UUID "
+																						+ t.getUuid()
+																						+ " does not exist");
+																	}
+																	logger.info("Delete unique value: {}",
+																			uniqueValueSet.getValue(fieldType));
+																	try {
+																		session.execute(uniqueValueSerializationStrategy
+																				.deleteCQL(applicationScope,
+																						uniqueValueSet
+																								.getValue(fieldType)));
+																	} catch (Exception ex) {
+																		logger.error(
+																				"Exception while trying to delete the Unique value for {}. Will proceed with creating new entry",
+																				t.getUuid(), ex);
+																	}
+
+																	UniqueValue newUniqueValue = new UniqueValueImpl(
+																			new StringField(fieldType, fieldValue),
+																			entity.get().getId(),
+																			entity.get().getVersion());
+																	logger.info("Writing new unique value: {}",
+																			newUniqueValue);
+																	session.execute(uniqueValueSerializationStrategy
+																			.writeCQL(applicationScope, newUniqueValue,
+																					-1));
+																}
+
+															} else {
+																logger.info(
+																		"Found entity {} for field type {} and field value {}",
+																		e.getUuid(), fieldType, fieldValue);
+															}
+														} else {
+															logger.info("No value found for field {} for entity {}",
+																	fieldType, t.getUuid());
+														}
+													} catch (Exception e) {
+														logger.error(
+																"Error while checking unique values for batch id : {} for entity {}",
+																batchId, t.getUuid(), e);
+													}
+												}
+											});
+											entityObs.connect();
+
+										} catch (Exception e) {
+											logger.error(
+													"Error while checking unique values for batch id : {} for collection {}",
+													batchId, collectionName, e);
+										}
+									});
+
+						} catch (Exception e) {
+							logger.error("Exception while traversing entities " + edgeScopes.get(0).getEdge(), e);
+							System.exit(0);
+						}
+					});
+					logger.info("Finished entity walk for collection {} for batch {}", collectionName, batchId);
+				});
+		logger.info("Exiting extractEntitiesForCollection() method.");
+	}
+
+	protected void applyInputParams(CommandLine line) {
+
+		if (line.hasOption(ORG_ID)) {
+			orgId = ConversionUtils.uuid(line.getOptionValue(ORG_ID));
+		} else if (line.hasOption(ORG_NAME)) {
+			orgName = line.getOptionValue(ORG_NAME);
+		}
+
+		if (line.hasOption(APP_ID)) {
+			applicationId = ConversionUtils.uuid(line.getOptionValue(APP_ID));
+		} else if (line.hasOption(APP_NAME)) {
+			applicationName = line.getOptionValue(APP_NAME);
+		}
+		if (line.hasOption(COLL_NAMES)) {
+			collNames = line.getOptionValue(COLL_NAMES).split(",");
+		}
+		if (line.hasOption(COLLECTION_NAME)) {
+			collNames = new String[] { line.getOptionValue(COLLECTION_NAME) };
+		}
+		findMissingUniqueValues = line.hasOption(FIND_MISSING_UNIQUE_VALUES);
+		fixMissingValue = line.hasOption(FIX_MISSING_VALUES);
+
+	}
+}