blob: c3a7e65ac498b3a4180556c3a22abcfd362c97e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.corepersistence.migration;
import java.util.HashSet;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.inject.Injector;
import com.netflix.astyanax.Keyspace;
import rx.functions.Action1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class GraphShardVersionMigrationIT extends AbstractCoreIT {
private Injector injector;
private GraphShardVersionMigration graphShardVersionMigration;
private Keyspace keyspace;
private MigrationManager migrationManager;
private ManagerCache managerCache;
private DataMigrationManager dataMigrationManager;
private MigrationInfoSerialization migrationInfoSerialization;
@Before
public void setup() {
injector = CpSetup.getInjector();
graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
keyspace = injector.getInstance( Keyspace.class );
migrationManager = injector.getInstance( MigrationManager.class );
managerCache = injector.getInstance( ManagerCache.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
}
@Test
public void testIdMapping() throws Throwable {
assertEquals("version 2 expected", 2, graphShardVersionMigration.getVersion());
/**
* Drop our migration keyspaces to ensure we don't have a "new version in there"
* This will ensure we have an "old data" version of data written
*/
keyspace.dropColumnFamily( MigrationInfoSerializationImpl.CF_MIGRATION_INFO );
//create the column families again
migrationManager.migrate();
final EntityManager newAppEm = app.getEntityManager();
final String type1 = "type1thing";
final String type2 = "type2thing";
final int size = 10;
final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
final Set<Id> allEntities = new HashSet<>();
allEntities.addAll( type1Identities );
allEntities.addAll( type2Identities );
final TestProgressObserver progressObserver = new TestProgressObserver();
//used to validate 1.0 types, and 2.0 types
final Multimap<Id, String> sourceTypes = HashMultimap.create( 10000, 10 );
final Multimap<Id, String> targetTypes = HashMultimap.create( 10000, 10 );
//read everything in previous version format and put it into our types.
AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
.doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
@Override
public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );
/**
* Get our edge types from the source
*/
gm.getEdgeTypesFromSource(
new SimpleSearchEdgeType( entity.entityId, null, null ) )
.doOnNext( new Action1<String>() {
@Override
public void call( final String s ) {
sourceTypes.put( entity.entityId, s );
}
} ).toBlocking().lastOrDefault( null );
/**
* Get the edge types to the target
*/
gm.getEdgeTypesToTarget(
new SimpleSearchEdgeType( entity.entityId, null, null ) )
.doOnNext( new Action1<String>() {
@Override
public void call( final String s ) {
targetTypes.put( entity.entityId, s );
}
} ).toBlocking().lastOrDefault( null );
allEntities.remove( entity.entityId );
}
} ).toBlocking().lastOrDefault( null );
//perform the migration
graphShardVersionMigration.migrate( progressObserver );
assertEquals("Newly saved entities encounterd", 0, allEntities.size());
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
//write the status and version, then invalidate the cache so it appears
migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
migrationInfoSerialization.setVersion( graphShardVersionMigration.getVersion() );
dataMigrationManager.invalidate();
assertEquals("New version saved, and we should get new implementation", graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion());
//now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
.doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
@Override
public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );
/**
* Get our edge types from the source
*/
gm.getEdgeTypesFromSource(
new SimpleSearchEdgeType( entity.entityId, null, null ) )
.doOnNext( new Action1<String>() {
@Override
public void call( final String s ) {
sourceTypes.remove( entity.entityId, s );
}
} ).toBlocking().lastOrDefault( null );
/**
* Get the edge types to the target
*/
gm.getEdgeTypesToTarget(
new SimpleSearchEdgeType( entity.entityId, null, null ) )
.doOnNext( new Action1<String>() {
@Override
public void call( final String s ) {
targetTypes.remove( entity.entityId, s );
}
} ).toBlocking().lastOrDefault( null );
}
} ).toBlocking().lastOrDefault( null );
assertEquals( "All source types migrated", 0, sourceTypes.size() );
assertEquals( "All target types migrated", 0, targetTypes.size() );
}
}