Makes index rebuild parallel.
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index e117a73..7eb9f94 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -2751,7 +2751,7 @@
*/
public void reindex( final EntityManagerFactory.ProgressObserver po ) throws Exception {
- CpWalker walker = new CpWalker( po.getWriteDelayTime() );
+ CpWalker walker = new CpWalker( );
walker.walkCollections( this, application, new CpVisitor() {
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index ff631ed..8a9eed5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -34,7 +34,7 @@
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
-
+import rx.schedulers.Schedulers;
/**
@@ -44,26 +44,25 @@
private static final Logger logger = LoggerFactory.getLogger( CpWalker.class );
- private final long writeDelayMs;
/**
* Wait the set amount of time between successive writes.
- * @param writeDelayMs
+ * @param
*/
- public CpWalker(final long writeDelayMs){
- this.writeDelayMs = writeDelayMs;
+ public CpWalker(){
+
}
- public void walkCollections( final CpEntityManager em, final EntityRef start,
+ public void walkCollections( final CpEntityManager em, final EntityRef start,
final CpVisitor visitor ) throws Exception {
doWalkCollections( em, new SimpleId( start.getUuid(), start.getType() ), visitor );
}
- private void doWalkCollections(
+ private void doWalkCollections(
final CpEntityManager em, final Id applicationId, final CpVisitor visitor ) {
final ApplicationScope applicationScope = em.getApplicationScope();
@@ -89,45 +88,45 @@
logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
- return gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
- applicationId,
- edgeType,
- Long.MAX_VALUE,
- SearchByEdgeType.Order.DESCENDING,
- null ) );
+ return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( applicationId, edgeType, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ) );
}
- } ).doOnNext( new Action1<Edge>() {
-
+ } )
+ //process our edges in parallel for as much efficiency as possible
+ .parallel( new Func1<Observable<Edge>, Observable<Edge>>() {
@Override
- public void call( Edge edge ) {
+ public Observable<Edge> call( final Observable<Edge> edgeObservable ) {
+ //visit and update the entity
+ return edgeObservable.doOnNext( new Action1<Edge>() {
- logger.info( "Re-indexing edge {}", edge );
+ @Override
+ public void call( Edge edge ) {
- EntityRef targetNodeEntityRef = new SimpleEntityRef(
- edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
+ logger.info( "Re-indexing edge {}", edge );
- Entity entity;
- try {
- entity = em.get( targetNodeEntityRef );
- }
- catch ( Exception ex ) {
- logger.error( "Error getting sourceEntity {}:{}, continuing",
- targetNodeEntityRef.getType(), targetNodeEntityRef.getUuid() );
- return;
- }
+ EntityRef targetNodeEntityRef =
+ new SimpleEntityRef( edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
+
+ Entity entity;
+ try {
+ entity = em.get( targetNodeEntityRef );
+ }
+ catch ( Exception ex ) {
+ logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(),
+ targetNodeEntityRef.getUuid() );
+ return;
+ }
- String collName = CpNamingUtils.getCollectionName( edge.getType() );
+ String collName = CpNamingUtils.getCollectionName( edge.getType() );
- visitor.visitCollectionEntry( em, collName, entity );
-
- try {
- Thread.sleep( writeDelayMs );
- }
- catch ( InterruptedException e ) {
- throw new RuntimeException( "Unable to wait" );
- }
+ visitor.visitCollectionEntry( em, collName, entity );
+ }
+ } );
}
- } ).toBlocking().lastOrDefault( null ); // end foreach on edges
+ }, Schedulers.io() )
+
+ //wait for it to complete
+ .toBlocking().lastOrDefault( null ); // end foreach on edges
}
}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
index 5cbc499..acce2d8 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
@@ -117,22 +117,32 @@
final UUID appId = UUIDUtils.tryExtractUUID(applicationIdStr);
ApiResponse response = createApiResponse();
- response.setAction( "rebuild indexes" );
+ response.setAction( "rebuild indexes started" );
+
+ final EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+
+ @Override
+ public void onProgress( final EntityRef entity ) {
+ logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid() );
+ }
- final EntityManager em = emf.getEntityManager( appId );
+ @Override
+ public long getWriteDelayTime() {
+ return delay;
+ }
+ };
- final Set<String> collectionNames = em.getApplicationCollections();
final Thread rebuild = new Thread() {
@Override
public void run() {
- for ( String collectionName : collectionNames )
-
-
- {
- rebuildCollection( appId, collectionName, delay );
+ try {
+ emf.rebuildApplicationIndexes( appId, po );
+ }
+ catch ( Exception e ) {
+ logger.error( "Unable to re-index application" );
}
}
};