blob: 218c695193a0b0b56ad8b46ed37c811f3c7984d0 [file] [log] [blame]
/*
* Copyright 2014 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.corepersistence;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
import org.apache.usergrid.corepersistence.service.CollectionSearch;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.ConnectionSearch;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.Query.Level;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Group;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.graph.*;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.queue.settings.IndexConsistency;
import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import rx.Observable;
import java.util.*;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
import static org.apache.usergrid.persistence.Schema.*;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
import static org.apache.usergrid.utils.MapUtils.addMapSet;
/**
* Implement good-old Usergrid RelationManager with the new-fangled Core Persistence API.
*/
public class CpRelationManager implements RelationManager {
private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class );
private final EntityManagerFig entityManagerFig;
private ManagerCache managerCache;
private EntityManager em;
private UUID applicationId;
private EntityRef headEntity;
private org.apache.usergrid.persistence.model.entity.Entity cpHeadEntity;
private final ApplicationScope applicationScope;
private final AsyncEventService indexService;
private final CollectionSettingsFactory collectionSettingsFactory;
private final CollectionService collectionService;
private final ConnectionService connectionService;
public CpRelationManager( final ManagerCache managerCache,
final AsyncEventService indexService, final CollectionService collectionService,
final ConnectionService connectionService,
final EntityManager em,
final EntityManagerFig entityManagerFig, final UUID applicationId,
final CollectionSettingsFactory collectionSettingsFactory,
final EntityRef headEntity) {
Assert.notNull( em, "Entity manager cannot be null" );
Assert.notNull( applicationId, "Application Id cannot be null" );
Assert.notNull( headEntity, "Head entity cannot be null" );
Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
Assert.notNull( indexService, "indexService cannot be null" );
Assert.notNull( collectionService, "collectionService cannot be null" );
Assert.notNull( connectionService, "connectionService cannot be null" );
this.entityManagerFig = entityManagerFig;
// TODO: this assert should not be failing
//Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" );
this.em = em;
this.applicationId = applicationId;
this.headEntity = headEntity;
this.managerCache = managerCache;
this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
this.collectionService = collectionService;
this.connectionService = connectionService;
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading head entity {}:{} from app {}",
headEntity.getType(), headEntity.getUuid(), applicationScope
);
}
Id entityId = new SimpleId( headEntity.getUuid(), headEntity.getType() );
this.cpHeadEntity = ( ( CpEntityManager ) em ).load( entityId );
// commented out because it is possible that CP entity has not been created yet
Assert.notNull( cpHeadEntity, String
.format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) );
this.indexService = indexService;
this.collectionSettingsFactory = collectionSettingsFactory;
}
@Override
public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
GraphManager gm = managerCache.getGraphManager( applicationScope );
String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
if (logger.isTraceEnabled()) {
logger.trace("getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}",
edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() );
}
Observable<Set<String>> types =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) )
.collect( () -> new HashSet<>(), ( set, type ) -> set.add( type ) );
return types.toBlocking().last();
}
@Override
public Map<String, Map<UUID, Set<String>>> getOwners() throws Exception {
// TODO: do we need to restrict this to edges prefixed with owns?
//Map<EntityRef, Set<String>> containerEntities = getContainers(-1, "owns", null);
Map<EntityRef, Set<String>> containerEntities = getContainers();
Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();
for ( EntityRef owner : containerEntities.keySet() ) {
Set<String> collections = containerEntities.get( owner );
for ( String collection : collections ) {
MapUtils.addMapMapSet( owners, owner.getType(), owner.getUuid(), collection );
}
}
return owners;
}
private Map<EntityRef, Set<String>> getContainers() {
return getContainers( -1, null, null );
}
/**
* Gets containing collections and/or connections depending on the edge type you pass in
*
* @param limit Max number to return
* @param edgeType Edge type, edge type prefix or null to allow any edge type
* @param fromEntityType Only consider edges from entities of this type
*/
Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) {
final GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<Edge> edges =
gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
.flatMap( edgeType1 -> gm.loadEdgesToTarget(
new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType1, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ) );
//if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory
if ( limit > -1 ) {
edges = edges.take( limit );
}
return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge ) -> {
if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
}
return;
}
final EntityRef eref =
new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
String name = getNameFromEdgeType( edge.getType() );
addMapSet( entityRefSetMap, eref, name );
} ).toBlocking().last();
}
@Override
public boolean isConnectionMember( String connectionType, EntityRef entity ) throws Exception {
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
if (logger.isTraceEnabled()) {
logger.trace("isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}",
connectionType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() );
}
GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
.createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType,
entityId ) );
return edges.toBlocking().firstOrDefault( null ) != null;
}
@SuppressWarnings( "unchecked" )
@Override
public boolean isCollectionMember( String collectionName, EntityRef entity ) throws Exception {
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
if (logger.isTraceEnabled()) {
logger.trace("isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}",
collectionName, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() );
}
GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
.createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName,
entityId ) );
return edges.toBlocking().firstOrDefault( null ) != null;
}
@Override
public Set<String> getCollections() throws Exception {
final Set<String> indexes = new HashSet<String>();
GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<String> str =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
Iterator<String> iter = str.toBlocking().getIterator();
while ( iter.hasNext() ) {
String edgeType = iter.next();
indexes.add( getNameFromEdgeType( edgeType ) );
}
return indexes;
}
@Override
public Results getCollection( String collectionName, UUID startResult, int count, Level resultsLevel,
boolean reversed ) throws Exception {
final String ql;
if (startResult != null ) {
// UUID timestamp is a different measure than 'created' field on entities
Calendar uuidEpoch = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
uuidEpoch.clear();
uuidEpoch.set(1582, 9, 15, 0, 0, 0); // 9 = October
long epochMillis = uuidEpoch.getTime().getTime();
long time = (startResult.timestamp() / 10000L) + epochMillis;
if ( !reversed ) {
ql = "select * where created > " + time;
} else {
ql = "select * where created < " + time;
}
} else {
ql = "select *";
}
Query query = Query.fromQL( ql );
if(query == null ){
throw new RuntimeException("Unable to get data for collection: "+collectionName);
}
query.setLimit( count );
query.setReversed( reversed );
return searchCollection( collectionName, query );
}
@Override
public Results getCollection( String collectionName, Query query, Level level ) throws Exception {
return searchCollection( collectionName, query );
}
// add to a named collection of the head entity
@Override
public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {
Preconditions.checkNotNull( itemRef, "itemref is null" );
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( ( collection != null && collection.getType() != null ) && !collection.getType()
.equals( itemRef.getType() ) ) {
return null;
}
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId );
Id memberEntityId = memberEntity.getId();
// don't fetch entity if we've already got one
final Entity itemEntity;
if ( itemRef instanceof Entity ) {
itemEntity = ( Entity ) itemRef;
}
else {
itemEntity = em.get( itemRef );
}
if ( itemEntity == null ) {
return null;
}
if ( memberEntityId == null ) {
throw new RuntimeException(
"Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
}
if ( logger.isDebugEnabled() ) {
logger.debug( "Loaded member entity {}:{} from app {}\n data {}",
itemRef.getType(), itemRef.getUuid(), applicationScope, CpEntityMapUtils.toMap( memberEntity ) );
}
// create graph edge connection from head entity to member entity
final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntityId );
final String linkedCollection = collection.getLinkedCollection();
GraphManager gm = managerCache.getGraphManager( applicationScope );
gm.writeEdge( edge ).doOnNext( writtenEdge -> {
if ( logger.isDebugEnabled() ) {
logger.debug( "Wrote edge {}", writtenEdge );
}
} ).filter( writtenEdge -> linkedCollection != null ).flatMap( writtenEdge -> {
final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
final Edge reverseEdge = createCollectionEdge( memberEntityId, pluralType, cpHeadEntity.getId() );
//reverse
return gm.writeEdge( reverseEdge ).doOnNext( reverseEdgeWritten -> {
String entityType = cpHeadEntity.getId().getType();
if ( !skipIndexingForType( entityType) ) {
QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, queueIndexingStrategy);
}
} );
} ).doOnCompleted( () -> {
String entityType = memberEntity.getId().getType();
if ( !skipIndexingForType( entityType ) ) {
QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
indexService.queueNewEdge(applicationScope, memberEntityId, edge, queueIndexingStrategy);
}
if ( logger.isDebugEnabled() ) {
logger.debug( "Added entity {}:{} to collection {}",
itemRef.getUuid().toString(), itemRef.getType(), collectionName );
}
} ).toBlocking().lastOrDefault( null );
// remove any duplicate edges (keeps the duplicate edge with same timestamp)
removeDuplicateEdgesAsync(gm, edge);
if ( logger.isDebugEnabled() ) {
logger.debug( "Added entity {}:{} to collection {}",
itemRef.getUuid().toString(), itemRef.getType(), collectionName );
}
return itemEntity;
}
@Override
public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
throws Exception {
if ( headEntity.getUuid().equals( applicationId ) ) {
if ( itemType.equals( TYPE_ENTITY ) ) {
itemType = singularize( collectionName );
}
if ( itemType.equals( TYPE_ROLE ) ) {
Long inactivity = ( Long ) properties.get( PROPERTY_INACTIVITY );
if ( inactivity == null ) {
inactivity = 0L;
}
return em.createRole( ( String ) properties.get( PROPERTY_NAME ),
( String ) properties.get( PROPERTY_TITLE ), inactivity );
}
return em.create( itemType, properties );
}
else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collectionName.equals( COLLECTION_ROLES ) ) ) {
UUID groupId = headEntity.getUuid();
String roleName = ( String ) properties.get( PROPERTY_NAME );
return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
}
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
return null;
}
properties = getDefaultSchema().cleanUpdatedProperties( itemType, properties, true );
Entity itemEntity = em.create( itemType, properties );
if ( itemEntity != null ) {
addToCollection( collectionName, itemEntity );
if ( collection != null && collection.getLinkedCollection() != null ) {
Id itemEntityId = new SimpleId( itemEntity.getUuid(), itemEntity.getType() );
final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, itemEntityId );
GraphManager gm = managerCache.getGraphManager( applicationScope );
gm.writeEdge( edge );
}
}
return itemEntity;
}
@Override
public void removeFromCollection( String collectionName, EntityRef itemRef ) throws Exception {
if ( headEntity.getUuid().equals( applicationId ) ) {
if ( collectionName.equals( COLLECTION_ROLES ) ) {
// special handling for roles collection of the application
Entity itemEntity = em.get( itemRef );
if ( itemEntity != null ) {
RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole(roleRef.getApplicationRoleName(), Optional.fromNullable(itemEntity) );
return;
}
}
// handles normal app collection deletes
em.delete( itemRef );
return;
}
// headEntity is not an application (used for management entities and entity collections like user devices)
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading entity to remove from collection {}:{} from app {}\n",
itemRef.getType(), itemRef.getUuid(), applicationScope );
}
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
// this will remove the edges from app->entity(collection)
removeItemFromCollection(collectionName, itemRef);
// special handling for roles collection of a group
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
if ( collectionName.equals( COLLECTION_ROLES ) ) {
String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );
if ( path.startsWith( "/roles/" ) ) {
Entity itemEntity =
em.get( new SimpleEntityRef( entityId.getType(), entityId.getUuid() ) );
RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole( roleRef.getApplicationRoleName(), Optional.fromNullable(itemEntity) );
}
}
}
}
@Override
public void removeItemFromCollection( String collectionName, EntityRef itemRef ) throws Exception {
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
// remove edge from collection to item
GraphManager gm = managerCache.getGraphManager( applicationScope );
// mark the edge versions and take the first for later delete edge queue event ( load is descending )
final Edge markedSourceEdge = gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, entityId ) )
.flatMap(edge -> gm.markEdge(edge)).toBlocking().firstOrDefault(null);
Edge markedReversedEdge = null;
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if (collection != null && collection.getLinkedCollection() != null) {
// delete reverse edges
final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
markedReversedEdge = gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName( entityId, pluralType, cpHeadEntity.getId() ) )
.flatMap(reverseEdge -> gm.markEdge(reverseEdge)).toBlocking().firstOrDefault(null);
}
/**
* Remove from the index. This will call gm.deleteEdge which also deletes the reverse edge(s) and de-indexes
* older versions of the edge(s).
*
*/
if( markedSourceEdge != null ) {
indexService.queueDeleteEdge(applicationScope, markedSourceEdge);
}
if( markedReversedEdge != null ){
indexService.queueDeleteEdge(applicationScope, markedReversedEdge);
}
}
@Override
public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
throws Exception {
headEntity = em.validate( headEntity );
dstEntityRef = em.validate( dstEntityRef );
CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
Results results = null;
do {
if ( srcCollection != null ) {
results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false );
}
else {
results = em.getTargetEntities( headEntity, srcRelationName, null, Level.REFS );
}
if ( ( results != null ) && ( results.size() > 0 ) ) {
List<EntityRef> refs = results.getRefs();
for ( EntityRef ref : refs ) {
if ( dstCollection != null ) {
em.addToCollection( dstEntityRef, dstRelationName, ref );
}
else {
em.createConnection( dstEntityRef, dstRelationName, ref );
}
}
}
}
while ( ( results != null ) && ( results.hasMoreResults() ) );
}
@Override
public Results searchCollection( String collectionName, Query query ) throws Exception {
if ( query == null ) {
query = new Query();
query.setCollection( collectionName );
}
headEntity = em.validate( headEntity );
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( collection == null ) {
throw new RuntimeException(
"Cannot find collection-info for '" + collectionName + "' of " + headEntity.getType() + ":" + headEntity
.getUuid() );
}
query.setEntityType( collection.getType() );
final Query toExecute = adjustQuery( query );
final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
final Id ownerId = headEntity.asId();
final boolean analyzeOnly = query.getAnalyzeOnly();
final boolean returnQuery = query.getReturnQuery();
if(query.getLevel() == Level.IDS ){
return new IdQueryExecutor( toExecute.getCursor() ) {
@Override
protected Observable<ResultsPage<Id>> buildNewResultsPage(
final Optional<String> cursor ) {
final CollectionSearch search =
new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
queryString, cursor );
search.setAnalyzeOnly(analyzeOnly);
search.setReturnQuery(returnQuery);
return collectionService.searchCollectionIds( search );
}
}.next();
}
//wire the callback so we can get each page
return new EntityQueryExecutor( toExecute.getCursor() ) {
@Override
protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> buildNewResultsPage(
final Optional<String> cursor ) {
final CollectionSearch search =
new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
queryString, cursor );
search.setAnalyzeOnly(analyzeOnly);
search.setReturnQuery(returnQuery);
IndexConsistency indexConsistency = getIndexConsistencyForType(collectionName);
search.setKeepStaleEntries(indexConsistency == IndexConsistency.LATEST);
return collectionService.searchCollection( search );
}
}.next();
}
@Override
public Results searchCollectionConsistent( String collectionName, Query query, int expectedResults )
throws Exception {
Results results;
long maxLength = entityManagerFig.pollForRecordsTimeout();
long sleepTime = entityManagerFig.sleep();
boolean found;
long current = System.currentTimeMillis(), length = 0;
do {
results = searchCollection( collectionName, query );
length = System.currentTimeMillis() - current;
found = expectedResults == results.size();
if ( found ) {
break;
}
logger.info("Sleeping {} ms during searchCollectionConsistent", sleepTime);
Thread.sleep( sleepTime );
}
while ( !found && length <= maxLength );
logger.info( "Consistent Search finished in {}, results={}, expected={}...dumping stack",
length, results.size(), expectedResults );
return results;
}
@Override
public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
return createConnection( connection.getConnectionType(), connection.getTargetRefs() );
}
@Override
public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {
headEntity = em.validate( headEntity );
connectedEntityRef = em.validate( connectedEntityRef );
ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
if ( logger.isTraceEnabled() ) {
logger.trace( "createConnection(): Indexing connection type '{}'\n from source {}:{}]\n to target {}:{}\n app {}",
connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(),
connectedEntityRef.getUuid(), applicationScope );
}
final Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() );
final org.apache.usergrid.persistence.model.entity.Entity targetEntity =
( ( CpEntityManager ) em ).load( entityId );
// create graph edge connection from head entity to member entity
final Edge edge = createConnectionEdge( cpHeadEntity.getId(), connectionType, targetEntity.getId() );
final GraphManager gm = managerCache.getGraphManager( applicationScope );
//write new edge
gm.writeEdge(edge).toBlocking().lastOrDefault(null); //throw an exception if this fails
String entityType = targetEntity.getId().getType();
if ( !skipIndexingForType( entityType ) ) {
QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, queueIndexingStrategy);
}
// remove any duplicate edges (keeps the duplicate edge with same timestamp)
removeDuplicateEdgesAsync(gm, edge);
return connection;
}
@Override
public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
EntityRef connectedEntityRef ) throws Exception {
throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {
throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {
ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
return connection;
}
@Override
public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
EntityRef connectedEntityRef ) throws Exception {
throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {
throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
// First, clean up the dictionary records of the connection
EntityRef connectingEntityRef = connectionRef.getSourceRefs(); // source
EntityRef connectedEntityRef = connectionRef.getTargetRefs(); // target
String connectionType = connectionRef.getTargetRefs().getConnectionType();
if ( logger.isTraceEnabled() ) {
logger.trace( "Deleting connection '{}' from source {}:{} \n to target {}:{}",
connectionType, connectingEntityRef.getType(), connectingEntityRef.getUuid(),
connectedEntityRef.getType(), connectedEntityRef.getUuid() );
}
Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() );
org.apache.usergrid.persistence.model.entity.Entity targetEntity = ( ( CpEntityManager ) em ).load( entityId );
GraphManager gm = managerCache.getGraphManager( applicationScope );
final Id sourceId = new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() );
final SearchByEdge search = createConnectionSearchByEdge( sourceId, connectionType, targetEntity.getId() );
//delete all the edges and queue their processing
gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) )
.doOnNext( returnedEdge -> {
if ( !skipIndexingForType( returnedEdge.getSourceNode().getType() ) || !skipIndexingForType( returnedEdge.getTargetNode().getType() ) ) {
indexService.queueDeleteEdge(applicationScope, returnedEdge);
}
}).toBlocking()
.lastOrDefault( null );
}
@Override
public Set<String> getConnectionTypes( UUID connectedEntityId ) throws Exception {
throw new UnsupportedOperationException( "Cannot specify entity by UUID alone." );
}
@Override
public Set<String> getConnectionTypes() throws Exception {
return getConnectionTypes( false );
}
@Override
public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {
final GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<String> edges =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
return edges.collect( () -> new HashSet<String>(), ( edgeSet, edge ) -> {
edgeSet.add( CpNamingUtils.getNameFromEdgeType( edge ) );
} ).toBlocking().last();
}
@Override
public Results getTargetEntities( String connectionType, String connectedEntityType, Level level )
throws Exception {
//until this is refactored properly, we will delegate to a search by query
Results raw = null;
Preconditions.checkNotNull( connectionType, "connectionType cannot be null" );
Query query = new Query();
query.setConnectionType( connectionType );
query.setEntityType( connectedEntityType );
query.setResultsLevel( level );
return searchTargetEntities( query );
}
@Override
public Results getSourceEntities( String connType, String fromEntityType, Level resultsLevel ) throws Exception {
return getSourceEntities( connType, fromEntityType, resultsLevel, -1 );
}
@Override
public Results getSourceEntities( String connType, String fromEntityType, Level level, int count )
throws Exception {
// looking for edges to the head entity
String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connType );
Map<EntityRef, Set<String>> containers = getContainers( count, edgeType, fromEntityType );
if ( Level.REFS.equals( level ) ) {
List<EntityRef> refList = new ArrayList<EntityRef>( containers.keySet() );
return Results.fromRefList( refList );
}
if ( Level.IDS.equals( level ) ) {
// TODO: someday this should return a list of Core Persistence Ids
List<UUID> idList = new ArrayList<UUID>();
for ( EntityRef ref : containers.keySet() ) {
idList.add( ref.getUuid() );
}
return Results.fromIdList( idList );
}
List<Entity> entities = new ArrayList<Entity>();
for ( EntityRef ref : containers.keySet() ) {
Entity entity = em.get( ref );
if (logger.isTraceEnabled()) {
logger.trace(" Found connecting entity: " + entity.getProperties());
}
entities.add( entity );
}
return Results.fromEntities( entities );
}
@Override
public Results searchTargetEntities( Query query ) throws Exception {
Preconditions.checkNotNull( query, "query cannot be null" );
final String connection = query.getConnectionType();
Preconditions.checkNotNull( connection, "connection must be specified" );
headEntity = em.validate( headEntity );
final boolean analyzeOnly = query.getAnalyzeOnly();
final boolean returnQuery = query.getReturnQuery();
final Query toExecute = adjustQuery( query );
final Optional<String> entityType = Optional.fromNullable( query.getEntityType() );
//set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
// -> 1.0 results
// startid -- graph edge load -- entity load (verify) from ids -> results page collector
// startid -- eq query candiddate -- entity load (verify) from canddiates -> results page collector
//startid -- graph edge load -- entity id verify --> filter to connection ref --> connection ref collector
//startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref
// collector
final Id sourceId = headEntity.asId();
final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
final boolean isConnecting = query.isConnecting();
if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) {
return new ConnectionRefQueryExecutor( toExecute.getCursor() ) {
@Override
protected Observable<ResultsPage<ConnectionRef>> buildNewResultsPage( final Optional<String> cursor ) {
//we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
final ConnectionSearch search =
new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
queryString, cursor, isConnecting );
search.setAnalyzeOnly(analyzeOnly);
search.setReturnQuery(returnQuery);
return connectionService.searchConnectionAsRefs( search );
}
}.next();
}
return new EntityQueryExecutor( toExecute.getCursor() ) {
@Override
protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> buildNewResultsPage(
final Optional<String> cursor ) {
//we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
final ConnectionSearch search =
new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
queryString, cursor, isConnecting );
search.setAnalyzeOnly(analyzeOnly);
search.setReturnQuery(returnQuery);
return connectionService.searchConnection( search );
}
}.next();
}
private Query adjustQuery( Query query ) {
// handle the select by identifier case
if ( query.getQl().isPresent() ) {
return query;
}
// a name alias or email alias was specified
if ( query.containsSingleNameOrEmailIdentifier() ) {
Identifier ident = query.getSingleIdentifier();
// an email was specified. An edge case that only applies to users.
// This is fulgy to put here, but required.
if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
final String newQuery = "select * where email='" + query.getSingleNameOrEmailIdentifier() + "'";
query.setQl( newQuery );
}
// groups have a special unique identifier
else if ( query.getEntityType().equals( Group.ENTITY_TYPE ) ){
final String newQuery = "select * where path='" + query.getSingleNameOrEmailIdentifier() + "'";
query.setQl( newQuery );
}
// use the ident with the default alias. could be an email
else {
final String newQuery = "select * where name='" + query.getSingleNameOrEmailIdentifier() + "'";
query.setQl( newQuery );
}
}
else if ( query.containsSingleUuidIdentifier() ) {
//TODO, this shouldn't even come from ES, it should look up the entity directly
final String newQuery = "select * where uuid=" + query.getSingleUuidIdentifier() + "";
query.setQl( newQuery );
}
//TODO T.N. not sure if we still need this. If we do then we ned to modify our query interface.
// final String ql = query.getQl();
//
// if ( query.isReversed() && ( StringUtils.isEmpty( ql ) || !(ql.contains( "order by " )) )) {
//
//
// final String sortQueryString =
//
// SortPredicate
// Query.SortPredicate desc =
// new Query.SortPredicate( PROPERTY_CREATED, Query.SortDirection.DESCENDING );
//
// try {
// query.addSort( desc );
// }
// catch ( Exception e ) {
// logger.warn( "Attempted to reverse sort order already set", PROPERTY_CREATED );
// }
// }
return query;
}
@Override
public Set<String> getConnectionIndexes( String connectionType ) throws Exception {
throw new UnsupportedOperationException( "Not supported yet." );
}
/** side effect: converts headEntity into an Entity if it is an EntityRef! */
private Entity getHeadEntity() throws Exception {
Entity entity = null;
if ( headEntity instanceof Entity ) {
entity = ( Entity ) headEntity;
}
else {
entity = em.get( headEntity );
headEntity = entity;
}
return entity;
}
private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){
//now read all older versions of an edge, and remove them. Finally calling delete
final SearchByEdge searchByEdge =
new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.absent() );
//load our versions, only retain the most recent one
gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
if (logger.isDebugEnabled()) {
logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
}
return gm.markEdge(edgeToDelete );
}).lastOrDefault(null).doOnNext(lastEdge -> {
//no op if we hit our default
if (lastEdge == null) {
return;
}
//don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
}).toBlocking().lastOrDefault(null);//this should throw an exception
}
private IndexConsistency getIndexConsistencyForType(String type ) {
return CpCollectionUtils.getIndexConsistencyForType(collectionSettingsFactory, applicationId, type);
}
private QueueIndexingStrategy getIndexingStrategyForType(String type ) {
return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
}
private boolean skipIndexingForType( String type ) {
return CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, type);
}
/**
* Get the map manager for uuid mapping
*/
private MapManager getMapManagerForTypes() {
Id mapOwner = new SimpleId( applicationId, TYPE_APPLICATION );
final MapScope ms = CpNamingUtils.getEntityTypeMapScope(mapOwner);
MapManager mm = managerCache.getMapManager( ms );
return mm;
}
}