Merge branch 'fixOrphanedEdges'
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index a0748e6..909c073 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -27,6 +27,7 @@
 import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
 import org.apache.usergrid.corepersistence.migration.DeDupConnectionDataMigration;
 import org.apache.usergrid.corepersistence.pipeline.PipelineModule;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadRepairFig;
 import org.apache.usergrid.corepersistence.rx.impl.*;
 import org.apache.usergrid.corepersistence.service.*;
 import org.apache.usergrid.locking.guice.LockModule;
@@ -160,6 +161,8 @@
 
         install( new GuicyFigModule( EntityManagerFig.class ) );
 
+        install( new GuicyFigModule( ReadRepairFig.class ) );
+
         install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
 
         install( new GuicyFigModule( ServiceSchedulerFig.class ) );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
index 3f6e26d..cc82bd8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
@@ -23,6 +23,11 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,11 +57,17 @@
     private static final Logger logger = LoggerFactory.getLogger( EntityLoadVerifyFilter.class );
 
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
+    private final ReadRepairFig readRepairFig;
 
 
     @Inject
-    public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+    public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                   final GraphManagerFactory graphManagerFactory,
+                                   final ReadRepairFig readRepairFig) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
+        this.readRepairFig = readRepairFig;
     }
 
 
@@ -64,8 +75,9 @@
     public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
 
 
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
         final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
 
         //it's more efficient to make 1 network hop to get everything, then drop our results if required
         final Observable<FilterResult<Entity>> entityObservable =
@@ -80,9 +92,10 @@
                               .flatMap( ids -> entityCollectionManager.load( ids ) );
 
 
-                //now we have a collection, validate our canidate set is correct.
-
-                return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+                //now we have a collection, validate our candidate set is correct.
+                GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
+                return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager,
+                    entitySet, bufferedIds, readRepairFig ) )
                                           .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
                         entityCollector -> Observable.from( entityCollector.getResults() ) );
             } );
@@ -102,12 +115,20 @@
 
         private final List<FilterResult<Id>> candidateResults;
         private final EntitySet entitySet;
+        private final GraphManager graphManager;
+        private final ApplicationScope applicationScope;
+        private final ReadRepairFig readRepairFig;
 
 
-        public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+        public EntityVerifier( final ApplicationScope applicationScope, final GraphManager graphManager,
+                               final EntitySet entitySet, final List<FilterResult<Id>> candidateResults,
+                               final ReadRepairFig readRepairFig) {
+            this.applicationScope = applicationScope;
+            this.graphManager = graphManager;
             this.entitySet = entitySet;
             this.candidateResults = candidateResults;
             this.results = new ArrayList<>( entitySet.size() );
+            this.readRepairFig = readRepairFig;
         }
 
 
@@ -137,11 +158,42 @@
 
             //doesn't exist warn and drop
             if ( entity == null || !entity.getEntity().isPresent() ) {
-                logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
-                    + "  Ignoring since this could be a region sync issue", candidateId );
+
+                // look for orphaned edges
+                String edgeTypeName = CpNamingUtils.getEdgeTypeFromCollectionName(Schema.defaultCollectionName(candidateId.getType()));
+                final SearchByEdge searchByEdge =
+                    new SimpleSearchByEdge( applicationScope.getApplication(), edgeTypeName, candidateId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        Optional.absent() );
+
+                int edgesDeleted = 0;
+                List<MarkedEdge> edgeList = graphManager.loadEdgeVersions(searchByEdge).toList().toBlocking().last();
+                if (edgeList.size() > 0) {
+                    MarkedEdge firstEdge = edgeList.get(0);
+                    long currentTimestamp = CpNamingUtils.createGraphOperationTimestamp();
+                    long edgeTimestamp = firstEdge.getTimestamp();
+                    long timestampDiff = currentTimestamp - edgeTimestamp;
+                    long orphanDelaySecs = readRepairFig.getEdgeOrphanDelaySecs();
+                    // timestamps are in 100 nanoseconds, convert from seconds
+                    long allowedDiff = orphanDelaySecs * 1000L * 1000L * 10L;
+                    if (timestampDiff > allowedDiff) {
+                        // edges must be orphans, delete edges
+                        for (MarkedEdge edge: edgeList) {
+                            graphManager.markEdge(edge).toBlocking().lastOrDefault(null);
+                            edgesDeleted++;
+                        }
+                        graphManager.deleteEdge(firstEdge).toBlocking().lastOrDefault(null);
+                    }
+                }
+
+                if (edgesDeleted > 0) {
+                    logger.warn("Read graph edge and received candidate with entityId {} (application {}), yet was not found in cassandra."
+                        + "  Deleted at least {} edges.", candidateId, applicationScope.getApplication().getUuid().toString(), edgesDeleted);
+                } else {
+                    logger.warn("Read graph edge and received candidate with entityId {} (application {}), yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue", candidateId, applicationScope.getApplication().getUuid().toString());
+                }
 
 
-                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
 
                 return;
             }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
new file mode 100644
index 0000000..2f3e6e4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+/**
+ * Read repair fig
+ */
+@FigSingleton
+public interface ReadRepairFig extends GuicyFig {
+
+    @Key( "usergrid.edge.orphan.delete.delay.secs" )
+    @Default( "86400" ) // 1 day
+    long getEdgeOrphanDelaySecs();
+
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
index c5f07b4..5812c6f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
@@ -141,8 +141,9 @@
         //sort by the edge timestamp
         srb.addSort( SortBuilders.fieldSort( IndexingUtils.EDGE_TIMESTAMP_FIELDNAME ).order( SortOrder.DESC ) );
 
+        // removing secondary sort by entity ID -- takes ES resources and provides no benefit
         //sort by the entity id if our times are equal
-        srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME ).order( SortOrder.ASC ) );
+        //srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME ).order( SortOrder.ASC ) );
 
         return;
     }