blob: e9d9e6208d647b2ea0f752d44cef41eb4eee525f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.persistence;
import java.util.*;
import net.jcip.annotations.NotThreadSafe;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.cassandra.SpringResource;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.codahale.metrics.MetricRegistry;
import com.google.inject.Injector;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@NotThreadSafe
public class RebuildIndexTest extends AbstractCoreIT {
private static final Logger logger = LoggerFactory.getLogger( RebuildIndexTest.class );
private static final MetricRegistry registry = new MetricRegistry();
private static final int ENTITIES_TO_INDEX = 1000;
@Before
public void startReporting() {
if (logger.isDebugEnabled()) {
logger.debug("Starting metrics reporting");
}
}
@After
public void printReport() {
logger.debug( "Printing metrics report" );
}
@Test( timeout = 120000 )
public void rebuildOneCollectionIndex() throws Exception {
logger.info( "Started rebuildIndex()" );
String rand = RandomStringUtils.randomAlphanumeric( 5 );
final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
final EntityManager em = setup.getEmf().getEntityManager( appId );
final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
// ----------------- create a bunch of entities
Map<String, Object> entityMap = new HashMap<String, Object>() {{
put( "key1", 1000 );
put( "key2", 2000 );
put( "key3", "Some value" );
}};
List<EntityRef> entityRefs = new ArrayList<EntityRef>();
int herderCount = 0;
int shepardCount = 0;
for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
final Entity entity;
try {
entityMap.put( "key", i );
if ( i % 2 == 0 ) {
entity = em.create( "catherder", entityMap );
herderCount++;
}
else {
entity = em.create( "catshepard", entityMap );
shepardCount++;
}
}
catch ( Exception ex ) {
throw new RuntimeException( "Error creating entity", ex );
}
entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
if ( i % 10 == 0 ) {
logger.info( "Created {} entities", i );
}
}
logger.info( "Created {} entities", ENTITIES_TO_INDEX );
app.waitForQueueDrainAndRefreshIndex(5000);
// ----------------- test that we can read them, should work fine
logger.debug( "Read the data" );
readData( em, "catherders", herderCount, 0 );
readData( em, "catshepards", shepardCount, 0 );
// ----------------- delete the system and application indexes
logger.debug( "Deleting apps" );
deleteIndex( em.getApplicationId() );
// ----------------- test that we can read them, should fail
logger.debug( "Reading data, should fail this time " );
//should be no data
readData( em, "testTypes", 0, 0 );
// ----------------- rebuild index for catherders only
logger.debug( "Preparing to rebuild all indexes" );
final ReIndexRequestBuilder builder =
reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
assertNotNull( status.getJobId(), "JobId is present" );
logger.info( "Rebuilt index" );
waitForRebuild( status, reIndexService );
app.waitForQueueDrainAndRefreshIndex(10000);
// ----------------- test that we can read the catherder collection and not the catshepard
readData( em, "catherders", herderCount, 0 );
readData( em, "catshepards", 0, 0 );
}
@Test( timeout = 240000 )
public void rebuildIndex() throws Exception {
logger.info( "Started rebuildIndex()" );
String rand = RandomStringUtils.randomAlphanumeric( 5 );
String orgName = "org_" + rand;
String appName = "app_" + rand;
final UUID appId = setup.createApplication(orgName,appName);
logger.info("orgName={} appName={} appId={}", orgName, appName, appId.toString());
final EntityManager em = setup.getEmf().getEntityManager( appId );
final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
// ----------------- create a bunch of entities
Map<String, Object> entityMap = new HashMap<String, Object>() {{
put( "key1", 1000 );
put( "key2", 2000 );
put( "key3", "Some value" );
}};
Map<String, Object> cat1map = new HashMap<String, Object>() {{
put( "name", "enzo" );
put( "color", "orange" );
}};
Map<String, Object> cat2map = new HashMap<String, Object>() {{
put( "name", "marquee" );
put( "color", "grey" );
}};
Map<String, Object> cat3map = new HashMap<String, Object>() {{
put( "name", "bertha" );
put( "color", "tabby" );
}};
Entity cat1 = em.create( "cat", cat1map );
Entity cat2 = em.create( "cat", cat2map );
Entity cat3 = em.create( "cat", cat3map );
List<EntityRef> entityRefs = new ArrayList<>();
for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
final Entity entity;
try {
entityMap.put( "key", i );
entity = em.create( "testType", entityMap );
em.createConnection( entity, "herds", cat1 );
em.createConnection( entity, "herds", cat2 );
em.createConnection( entity, "herds", cat3 );
}
catch ( Exception ex ) {
throw new RuntimeException( "Error creating entity", ex );
}
entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
if ( i % 10 == 0 ) {
logger.info( "Created {} entities", i );
}
}
logger.info( "Created {} entities", ENTITIES_TO_INDEX );
app.waitForQueueDrainAndRefreshIndex(15000);
// ----------------- test that we can read them, should work fine
logger.debug( "Read the data" );
final String collectionName = "testtypes";
readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
// ----------------- delete the system and application indexes
logger.debug( "Deleting app index" );
deleteIndex( em.getApplicationId() );
app.waitForQueueDrainAndRefreshIndex();
// ----------------- test that we can read them, should fail
// deleting sytem app index will interfere with other concurrently running tests
//deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
// ----------------- test that we can read them, should fail
logger.debug( "Reading data, should fail this time " );
readData( em, collectionName, 0, 0 );
// ----------------- rebuild index
logger.debug( "Preparing to rebuild all indexes" );
;
try {
final ReIndexRequestBuilder builder =
reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
assertNotNull( status.getJobId(), "JobId is present" );
logger.info( "Rebuilt index" );
waitForRebuild( status, reIndexService );
logger.info( "Rebuilt index" );
}
catch ( Exception ex ) {
logger.error( "Error rebuilding index", ex );
fail();
}
// ----------------- test that we can read them
app.waitForQueueDrainAndRefreshIndex(15000);
readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
}
@Test( timeout = 120000 )
public void rebuildIndexGeo() throws Exception {
logger.info( "Started rebuildIndex()" );
String rand = RandomStringUtils.randomAlphanumeric( 5 );
final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
final EntityManager em = setup.getEmf().getEntityManager( appId );
final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
// ----------------- create a bunch of entities
Map<String, Object> cat1map = new HashMap<String, Object>() {{
put( "name", "enzo" );
put( "color", "grey" );
put("location", new LinkedHashMap<String, Object>() {{
put("latitude", -35.746369);
put("longitude", 150.952183);
}});
}};
final double lat = -34.746369;
final double lon = 152.952183;
Map<String, Object> cat2map = new HashMap<String, Object>() {{
put( "name", "marquee" );
put( "color", "grey" );
put("location", new LinkedHashMap<String, Object>() {{
put("latitude", lat);
put("longitude", lon);
}});
}};
Map<String, Object> cat3map = new HashMap<String, Object>() {{
put( "name", "bertha" );
put( "color", "grey" );
put("location", new LinkedHashMap<String, Object>() {{
put("latitude", -33.746369);
put("longitude", 150.952183);
}});
}};
Entity cat1 = em.create( "cat", cat1map );
Entity cat2 = em.create( "cat", cat2map );
Entity cat3 = em.create( "cat", cat3map );
logger.info( "Created {} entities", ENTITIES_TO_INDEX );
app.waitForQueueDrainAndRefreshIndex(5000);
// ----------------- test that we can read them, should work fine
logger.debug( "Read the data" );
final String collectionName = "cats";
Query q = Query.fromQL( "select * where color='grey'" ).withLimit( 1000 );
Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 3 );
assertEquals(3,results.size());
// ----------------- delete the system and application indexes
logger.debug( "Deleting app index" );
deleteIndex( em.getApplicationId() );
// ----------------- test that we can read them, should fail
// deleting sytem app index will interfere with other concurrently running tests
//deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
// ----------------- test that we can read them, should fail
logger.debug( "Reading data, should fail this time " );
results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 0 );
assertEquals(results.size(),0);
// ----------------- rebuild index
logger.debug( "Preparing to rebuild all indexes" );
try {
final ReIndexRequestBuilder builder =
reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
assertNotNull( status.getJobId(), "JobId is present" );
logger.info( "Rebuilt index" );
waitForRebuild( status, reIndexService );
logger.info( "Rebuilt index" );
}
catch ( Exception ex ) {
logger.error( "Error rebuilding index", ex );
fail();
}
// ----------------- test that we can read them
app.waitForQueueDrainAndRefreshIndex(5000);
results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 3 );
assertEquals(results.size(),3);
q = Query.fromQL("select * where location within 100 of "+lat+", "+lon);
results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 1 );
assertEquals(results.size(),1);
}
@Test( timeout = 120000 )
public void rebuildUpdatedSince() throws Exception {
logger.info( "Started rebuildIndex()" );
String rand = RandomStringUtils.randomAlphanumeric( 5 );
final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
final EntityManager em = setup.getEmf().getEntityManager( appId );
final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
// ----------------- create a bunch of entities
Map<String, Object> entityData = new HashMap<String, Object>() {{
put( "key1", 1000 );
}};
final Entity firstEntity = em.create( "thing", entityData );
final Entity secondEntity = em.create( "thing", entityData);
app.waitForQueueDrainAndRefreshIndex(5000);
// ----------------- test that we can read them, should work fine
logger.debug( "Read the data" );
final String collectionName = "things";
countEntities( em, collectionName, 2 );
// ----------------- delete the system and application indexes
logger.debug( "Deleting app index" );
deleteIndex( em.getApplicationId() );
// ----------------- test that we can read them, should fail
// deleting sytem app index will interfere with other concurrently running tests
//deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
// ----------------- test that we can read them, should fail
if (logger.isDebugEnabled()) {
logger.debug("Reading data, should fail this time ");
}
countEntities( em, collectionName, 0);
// ----------------- rebuild index
final long firstUpdatedTimestamp = firstEntity.getModified();
final long secondUpdatedTimestamp = secondEntity.getModified();
assertTrue( "second should be updated after second", firstUpdatedTimestamp < secondUpdatedTimestamp );
try {
final long updatedTimestamp = secondEntity.getModified();
if (logger.isDebugEnabled()) {
logger.debug("Preparing to rebuild all indexes with timestamp {}", updatedTimestamp);
}
//set our update timestamp
final ReIndexRequestBuilder builder =
reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withStartTimestamp(
updatedTimestamp );
ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
assertNotNull( status.getJobId(), "JobId is present" );
logger.info( "Rebuilt index" );
waitForRebuild( status, reIndexService );
logger.info( "Rebuilt index" );
}
catch ( Exception ex ) {
logger.error( "Error rebuilding index", ex );
fail();
}
// ----------------- test that we can read them
app.waitForQueueDrainAndRefreshIndex(5000);
countEntities( em, collectionName, 1 );
}
/**
* Wait for the rebuild to occur
*/
private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
throws InterruptedException {
while ( true ) {
try {
final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
break;
}
}
catch ( IllegalArgumentException iae ) {
//swallow. Thrown if our job can't be found. I.E hasn't updated yet
}
Thread.sleep( 1000 );
}
}
/**
* Delete app index
*/
private void deleteIndex( UUID appUuid ) {
Injector injector = SpringResource.getInstance().getBean( Injector.class );
IndexLocationStrategyFactory indexLocationStrategyFactory = injector.getInstance(IndexLocationStrategyFactory.class);
EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION );
ApplicationScope scope = new ApplicationScopeImpl( appId );
EntityIndex ei = eif.createEntityIndex(
indexLocationStrategyFactory.getIndexLocationStrategy(scope)
);
ei.deleteApplication().toBlocking().lastOrDefault( null );
app.waitForQueueDrainAndRefreshIndex();
}
private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
throws Exception {
app.waitForQueueDrainAndRefreshIndex();
Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
int count = 0;
while ( true ) {
for ( Entity e : results.getEntities() ) {
assertEquals( 2000, e.getProperty( "key2" ) );
Results catResults =
em.searchTargetEntities( e, Query.fromQL( "select *" ).setConnectionType( "herds" ) );
assertEquals( expectedConnections, catResults.size() );
if ( count % 100 == 0 ) {
logger.info( "read {} entities", count );
}
count++;
}
if ( results.hasCursor() ) {
logger.info( "Counted {} : query again with cursor", count );
q.setCursor( results.getCursor() );
results = em.searchCollection( em.getApplicationRef(), collectionName, q );
}
else {
break;
}
}
assertEquals( "Did not get expected entities", expectedEntities, count );
return count;
}
private int countEntities( EntityManager em, String collectionName, int expectedEntities)
throws Exception {
app.waitForQueueDrainAndRefreshIndex();
Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
int count = 0;
while ( true ) {
count += results.size();
if ( results.hasCursor() ) {
logger.info( "Counted {} : query again with cursor", count );
q.setCursor( results.getCursor() );
results = em.searchCollection( em.getApplicationRef(), collectionName, q );
}
else {
break;
}
}
assertEquals( "Did not get expected entities", expectedEntities, count );
return count;
}
}