blob: d9ee87eedf4346b6f916cf2b9bdd9cbbf6b624fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.corepersistence.index;
import com.google.inject.Inject;
import net.jcip.annotations.NotThreadSafe;
import org.apache.usergrid.corepersistence.TestIndexModule;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueActor;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.impl.EsRunner;
import org.apache.usergrid.persistence.index.impl.IndexOperation;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import rx.Observable;
import rx.schedulers.Schedulers;
import java.util.*;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.junit.Assert.*;
@RunWith( EsRunner.class )
@UseModules( { TestIndexModule.class } )
@NotThreadSafe//anything that changes the system version state is not safe to be run concurrently
public class IndexServiceTest {
@Inject
public IndexService indexService;
@Inject
public GraphManagerFactory graphManagerFactory;
@Inject
public IndexProducer indexProducer;
@Inject
public EntityCollectionManagerFactory entityCollectionManagerFactory;
@Inject
public EntityIndexFactory entityIndexFactory;
@Inject
public IndexLocationStrategyFactory indexLocationStrategyFactory;
@Inject
public IndexFig indexFig;
public GraphManager graphManager;
public ApplicationScope applicationScope;
@Inject
ActorSystemManager actorSystemManager;
@Inject
UniqueValuesService uniqueValuesService;
private static Map<Integer, Boolean> startedAkka = new HashMap<>();
protected synchronized void initAkka(
int port, ActorSystemManager actorSystemManager, UniqueValuesService uniqueValuesService ) {
if ( startedAkka.get(port) == null ) {
actorSystemManager.registerRouterProducer( uniqueValuesService );
actorSystemManager.start( "localhost", port, "us-east" );
actorSystemManager.waitForClientActor();
startedAkka.put( port, true );
}
}
@Before
public void setup() {
applicationScope = getApplicationScope( UUIDGenerator.newTimeUUID() );
graphManager = graphManagerFactory.createEdgeManager( applicationScope );
initAkka( 2555, actorSystemManager, uniqueValuesService );
}
@Test
public void testSingleIndexFromSource() {
final Entity entity = new Entity( createId( "test" ), UUIDGenerator.newTimeUUID() );
entity.setField( new StringField( "string", "foo" ) );
final Edge collectionEdge = createCollectionEdge( applicationScope.getApplication(), "tests", entity.getId() );
//write the edge
graphManager.writeEdge( collectionEdge ).toBlocking().last();
//index the edge
final Observable<IndexOperationMessage> indexed = indexService.indexEntity( applicationScope, entity );
//real users should never call to blocking, we're not sure what we'll get
final IndexOperationMessage results = indexed.toBlocking().last();
indexProducer.put(results).subscribe();
final Set<IndexOperation> indexRequests = results.getIndexRequests();
//ensure our value made it to the index request
final IndexOperation indexRequest = indexRequests.iterator().next();
assertNotNull( indexRequest );
}
// @Test( timeout = 60000 )
@Test( )
public void testSingleCollectionConnection() throws InterruptedException {
ApplicationScope applicationScope =
new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) );
final Entity testEntity = new Entity( createId( "thing" ), UUIDGenerator.newTimeUUID() );
testEntity.setField( new StringField( "string", "foo" ) );
//write the entity before indexing
final EntityCollectionManager collectionManager =
entityCollectionManagerFactory.createCollectionManager( applicationScope );
collectionManager.write( testEntity, null ).toBlocking().last();
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
//create our collection edge
final Edge collectionEdge =
CpNamingUtils.createCollectionEdge( applicationScope.getApplication(), "things", testEntity.getId() );
graphManager.writeEdge( collectionEdge ).toBlocking().last();
final Id connectingId = createId( "connecting" );
final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
final Edge connectionSearch = graphManager.writeEdge( edge ).toBlocking().last();
//now index
final int batches = indexService.indexEntity( applicationScope, testEntity )
.flatMap(mesage -> indexProducer.put(mesage)).count().toBlocking().last();
assertEquals(1, batches);
final EntityIndex EntityIndex =
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
//query until the collection edge is available
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
final CandidateResults collectionResults = getResults( EntityIndex, collectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1);
assertEquals( 1, collectionResults.size() );
// with collection versioning, empty versions are included
assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(collectionResults.get(0).getId().getType()));
assertEquals(testEntity.getId().getUuid(), collectionResults.get(0).getId().getUuid());
//query until the connection edge is available
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
final CandidateResults connectionResults = getResults( EntityIndex, connectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
assertEquals( 1, connectionResults.size() );
// with collection versioning, empty versions are included
assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(collectionResults.get(0).getId().getType()));
assertEquals(testEntity.getId().getUuid(), collectionResults.get(0).getId().getUuid());
}
/**
* Tests that when we have large connections, we batch appropriately
* @throws InterruptedException
*/
// @Test( timeout = 60000 )
@Test( )
public void testConnectingIndexingBatches() throws InterruptedException {
ApplicationScope applicationScope =
new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) );
final Entity testEntity = new Entity( createId( "thing" ), UUIDGenerator.newTimeUUID() );
testEntity.setField( new StringField( "string", "foo" ) );
//write the entity before indexing
final EntityCollectionManager collectionManager =
entityCollectionManagerFactory.createCollectionManager( applicationScope );
collectionManager.write( testEntity, null ).toBlocking().last();
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
//create our collection edge
final Edge collectionEdge =
CpNamingUtils.createCollectionEdge( applicationScope.getApplication(), "things", testEntity.getId() );
graphManager.writeEdge( collectionEdge ).toBlocking().last();
/**
* Write 10k edges 10 at a time in parallel
*/
// final int edgeCount = indexFig.getIndexBatchSize()*2;
final int edgeCount = 100;
final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
final Id connectingId = createId( "connecting" );
final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
return graphManager.writeEdge( edge ).subscribeOn( Schedulers.io() );
}).toList().toBlocking().last();
assertEquals( "All edges saved", edgeCount, connectionSearchEdges.size() );
//get the first and last edge
final Edge connectionSearch = connectionSearchEdges.get( 0 );
final Edge lastSearch = connectionSearchEdges.get( edgeCount - 1 );
//now index
final int batches = indexService.indexEntity( applicationScope, testEntity )
.flatMap(mesage -> indexProducer.put(mesage)).count().toBlocking().last();
//take our edge count + 1 and divided by batch sizes
final int expectedSize = ( int ) Math.ceil( ( (double)edgeCount + 1 ) / indexFig.getIndexBatchSize() );
assertEquals(expectedSize, batches);
final EntityIndex EntityIndex =
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
//query until it's available
final CandidateResults collectionResults = getResults( EntityIndex, collectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
assertEquals( 1, collectionResults.size() );
// with collection versioning, empty versions are included
assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(collectionResults.get(0).getId().getType()));
assertEquals(testEntity.getId().getUuid(), collectionResults.get(0).getId().getUuid());
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
//query until it's available
final CandidateResults connectionResults = getResults( EntityIndex, connectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
assertEquals( 1, connectionResults.size() );
assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(collectionResults.get(0).getId().getType()));
assertEquals(testEntity.getId().getUuid(), collectionResults.get(0).getId().getUuid());
final SearchEdge lastConnectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( lastSearch );
//query until it's available
final CandidateResults lastConnectionResults = getResults( EntityIndex, lastConnectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
assertEquals( 1, lastConnectionResults.size() );
assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(lastConnectionResults.get(0).getId().getType()));
assertEquals(testEntity.getId().getUuid(), lastConnectionResults.get(0).getId().getUuid());
}
/**
*This test must do the following steps.
*1. Delete the connecting edge
*2. Run the deleteIndexEdge using the search edge that gets returned from the delete call
*3. Run queries to make sure that the collection entity still exists while the connection search edge is gone.
* @throws InterruptedException
*/
@Test
public void testDeleteSingleConnectingEdge() throws InterruptedException {
ApplicationScope applicationScope =
new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) );
final EntityIndex EntityIndex =
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
final Entity testEntity = new Entity( createId( "thing" ), UUIDGenerator.newTimeUUID() );
testEntity.setField( new StringField( "string", "foo" ) );
//write entity
final Edge connectionSearch =
createTestEntityAndReturnConnectionEdge( applicationScope,graphManager,testEntity );
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
//ensure that no edges remain
CandidateResults connectionResultsEmpty = EntityIndex.search( connectionSearchEdge,
SearchTypes.fromTypes( "thing" ),"select *",10,0, false );
assertEquals(1,connectionResultsEmpty.size());
//step 1
//(We need to mark then delete things in the graph manager.)
final Edge toBeMarkedEdge = graphManager.markEdge( connectionSearch ).toBlocking().firstOrDefault( null );
final Edge toBeDeletedEdge = graphManager.deleteEdge( toBeMarkedEdge ).toBlocking().firstOrDefault( null );
//step 2
IndexOperationMessage indexOperationMessage =
indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ).toBlocking().lastOrDefault(
null );
assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
indexProducer.put(indexOperationMessage).toBlocking().last();
Thread.sleep(1000); // wait for the operation to flush at Elasticsearch
//ensure that no edges remain
connectionResultsEmpty = EntityIndex.search( connectionSearchEdge,
SearchTypes.fromTypes( "thing" ),"select *",10,0, false );
assertEquals(0,connectionResultsEmpty.size());
}
@Test
public void testDeleteMultipleConnectingEdges() throws InterruptedException {
ApplicationScope applicationScope =
new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) );
final EntityIndex EntityIndex =
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
final Entity testEntity = new Entity( createId( "thing" ), UUIDGenerator.newTimeUUID() );
testEntity.setField( new StringField( "string", "foo" ) );
//write entity
Edge collectionEdge = createEntityandCollectionEdge( applicationScope, graphManager, testEntity );
//Write multiple connection edges
final int edgeCount = 5;
final List<MarkedEdge>
connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator();
//query until results are available for collections
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
getResults( EntityIndex, collectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
for(int i = 0; i < edgeCount; i++) {
//query until results are available for connections
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearchEdges.get( i ) );
getResults( EntityIndex, connectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
1 );
}
for(Edge connectionSearch:connectionSearchEdges) {
//step 1
final Edge toBeMarkedEdge = graphManager.markEdge( connectionSearch ).toBlocking().firstOrDefault( null );
final Edge toBeDeletedEdge = graphManager.deleteEdge( toBeMarkedEdge ).toBlocking().first();
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
//step 2
IndexOperationMessage indexOperationMessage =
indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ) .flatMap(mesage ->indexProducer.put(mesage)).toBlocking().lastOrDefault( null );
//not sure if this is still valid.
assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
//ensure that no edges remain
final CandidateResults connectionResultsEmpty = EntityIndex.search( connectionSearchEdge,
SearchTypes.fromTypes( "things" ),"select *",10,0, false );
assertEquals(0,connectionResultsEmpty.size());
}
}
/**
* Refactor into two methods . Should only have one responsiblitiy.
* @param applicationScope
* @param graphManager
* @return
*/
private Edge createTestEntityAndReturnConnectionEdge( final ApplicationScope applicationScope,
final GraphManager graphManager,
final Entity testEntity) {
final EntityCollectionManager collectionManager =
entityCollectionManagerFactory.createCollectionManager( applicationScope );
final EntityIndex EntityIndex =
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final Edge collectionEdge =
createEntityandCollectionEdge( applicationScope, graphManager, testEntity );
//create our connection edge.
final Id connectingId = createId( "connecting" );
final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
final Edge connectionSearch = graphManager.writeEdge( connectionEdge ).toBlocking().last();
//now index
indexService.indexEntity( applicationScope, testEntity)
.flatMap(mesage ->indexProducer.put(mesage)).count().toBlocking().last();
//query until results are available for collections
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
getResults( EntityIndex, collectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
1 );
//query until results are available for connections
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
getResults( EntityIndex, connectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
1 );
return connectionSearch;
}
/**
* Creates an entity along with the corresponding collection edge.
* @param applicationScope
* @param graphManager
* @param testEntity
* @return
*/
private Edge createEntityandCollectionEdge( final ApplicationScope applicationScope,
final GraphManager graphManager, final Entity testEntity) {
final EntityCollectionManager collectionManager =
entityCollectionManagerFactory.createCollectionManager( applicationScope );
collectionManager.write( testEntity, null ).toBlocking().last();
//create our collection edge
final Edge collectionEdge =
CpNamingUtils.createCollectionEdge( applicationScope.getApplication(), testEntity.getId().getType(),
testEntity.getId() );
graphManager.writeEdge( collectionEdge ).toBlocking().last();
return collectionEdge;
}
private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
final int edgeCount ) {
final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
//create our connection edge.
final Id connectingId = createId( "connecting" );
final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
return graphManager.writeEdge( connectionEdge ).subscribeOn( Schedulers.io() );
}, 20).toList().toBlocking().last();
assertEquals( "All edges saved", edgeCount, connectionSearchEdges.size() );
return connectionSearchEdges;
}
private CandidateResults getResults( final EntityIndex EntityIndex,
final SearchEdge searchEdge, final SearchTypes searchTypes,
final int expectedSize ) {
final int attempts = 100;
String ql = "select *";
for ( int i = 0; i < attempts; i++ ) {
final CandidateResults candidateResults =
EntityIndex.search( searchEdge, searchTypes, ql , 100, 0, false );
if ( candidateResults.size() == expectedSize ) {
return candidateResults;
}
try {
Thread.sleep( 100 );
}
catch ( InterruptedException e ) {
//swallow
}
}
fail( "Could not find candidates of size " + expectedSize + "after " + attempts + " attempts" );
//we'll never reach this, required for compile
return null;
}
}