Merge branch 'fixOrphanedEdges'
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index a0748e6..909c073 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -27,6 +27,7 @@
import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
import org.apache.usergrid.corepersistence.migration.DeDupConnectionDataMigration;
import org.apache.usergrid.corepersistence.pipeline.PipelineModule;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadRepairFig;
import org.apache.usergrid.corepersistence.rx.impl.*;
import org.apache.usergrid.corepersistence.service.*;
import org.apache.usergrid.locking.guice.LockModule;
@@ -160,6 +161,8 @@
install( new GuicyFigModule( EntityManagerFig.class ) );
+ install( new GuicyFigModule( ReadRepairFig.class ) );
+
install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
install( new GuicyFigModule( ServiceSchedulerFig.class ) );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
index 3f6e26d..cc82bd8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
@@ -23,6 +23,11 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,11 +57,17 @@
private static final Logger logger = LoggerFactory.getLogger( EntityLoadVerifyFilter.class );
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final GraphManagerFactory graphManagerFactory;
+ private final ReadRepairFig readRepairFig;
@Inject
- public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+ public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final GraphManagerFactory graphManagerFactory,
+ final ReadRepairFig readRepairFig) {
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.graphManagerFactory = graphManagerFactory;
+ this.readRepairFig = readRepairFig;
}
@@ -64,8 +75,9 @@
public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+ entityCollectionManagerFactory.createCollectionManager( applicationScope );
//it's more efficient to make 1 network hop to get everything, then drop our results if required
final Observable<FilterResult<Entity>> entityObservable =
@@ -80,9 +92,10 @@
.flatMap( ids -> entityCollectionManager.load( ids ) );
- //now we have a collection, validate our canidate set is correct.
-
- return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+ //now we have a collection, validate our candidate set is correct.
+ GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
+ return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager,
+ entitySet, bufferedIds, readRepairFig ) )
.doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
entityCollector -> Observable.from( entityCollector.getResults() ) );
} );
@@ -102,12 +115,20 @@
private final List<FilterResult<Id>> candidateResults;
private final EntitySet entitySet;
+ private final GraphManager graphManager;
+ private final ApplicationScope applicationScope;
+ private final ReadRepairFig readRepairFig;
- public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+ public EntityVerifier( final ApplicationScope applicationScope, final GraphManager graphManager,
+ final EntitySet entitySet, final List<FilterResult<Id>> candidateResults,
+ final ReadRepairFig readRepairFig) {
+ this.applicationScope = applicationScope;
+ this.graphManager = graphManager;
this.entitySet = entitySet;
this.candidateResults = candidateResults;
this.results = new ArrayList<>( entitySet.size() );
+ this.readRepairFig = readRepairFig;
}
@@ -137,11 +158,42 @@
//doesn't exist warn and drop
if ( entity == null || !entity.getEntity().isPresent() ) {
- logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
- + " Ignoring since this could be a region sync issue", candidateId );
+
+ // look for orphaned edges
+ String edgeTypeName = CpNamingUtils.getEdgeTypeFromCollectionName(Schema.defaultCollectionName(candidateId.getType()));
+ final SearchByEdge searchByEdge =
+ new SimpleSearchByEdge( applicationScope.getApplication(), edgeTypeName, candidateId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ Optional.absent() );
+
+ int edgesDeleted = 0;
+ List<MarkedEdge> edgeList = graphManager.loadEdgeVersions(searchByEdge).toList().toBlocking().last();
+ if (edgeList.size() > 0) {
+ MarkedEdge firstEdge = edgeList.get(0);
+ long currentTimestamp = CpNamingUtils.createGraphOperationTimestamp();
+ long edgeTimestamp = firstEdge.getTimestamp();
+ long timestampDiff = currentTimestamp - edgeTimestamp;
+ long orphanDelaySecs = readRepairFig.getEdgeOrphanDelaySecs();
+ // timestamps are in 100 nanoseconds, convert from seconds
+ long allowedDiff = orphanDelaySecs * 1000L * 1000L * 10L;
+ if (timestampDiff > allowedDiff) {
+ // edges must be orphans, delete edges
+ for (MarkedEdge edge: edgeList) {
+ graphManager.markEdge(edge).toBlocking().lastOrDefault(null);
+ edgesDeleted++;
+ }
+ graphManager.deleteEdge(firstEdge).toBlocking().lastOrDefault(null);
+ }
+ }
+
+ if (edgesDeleted > 0) {
+ logger.warn("Read graph edge and received candidate with entityId {} (application {}), yet was not found in cassandra."
+ + " Deleted at least {} edges.", candidateId, applicationScope.getApplication().getUuid().toString(), edgesDeleted);
+ } else {
+ logger.warn("Read graph edge and received candidate with entityId {} (application {}), yet was not found in cassandra."
+ + " Ignoring since this could be a region sync issue", candidateId, applicationScope.getApplication().getUuid().toString());
+ }
- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
return;
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
new file mode 100644
index 0000000..2f3e6e4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+/**
+ * Read repair fig
+ */
+@FigSingleton
+public interface ReadRepairFig extends GuicyFig {
+
+ @Key( "usergrid.edge.orphan.delete.delay.secs" )
+ @Default( "86400" ) // 1 day
+ long getEdgeOrphanDelaySecs();
+
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
index c5f07b4..5812c6f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
@@ -141,8 +141,9 @@
//sort by the edge timestamp
srb.addSort( SortBuilders.fieldSort( IndexingUtils.EDGE_TIMESTAMP_FIELDNAME ).order( SortOrder.DESC ) );
+ // removing secondary sort by entity ID -- takes ES resources and provides no benefit
//sort by the entity id if our times are equal
- srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME ).order( SortOrder.ASC ) );
+ //srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME ).order( SortOrder.ASC ) );
return;
}