blob: 3df09e6c2930e996517f2d184b700dbedcd087e5 [file] [log] [blame]
/*
* Copyright 2014 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.corepersistence;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.ConnectedEntityRef;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Query.Level;
import org.apache.usergrid.persistence.RelationManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.RoleRef;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Group;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.SearchByEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.MapUtils;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import rx.Observable;
import rx.functions.Func1;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionEdge;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
import static org.apache.usergrid.utils.ClassUtils.cast;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
import static org.apache.usergrid.utils.MapUtils.addMapSet;
/**
* Implement good-old Usergrid RelationManager with the new-fangled Core Persistence API.
*/
public class CpRelationManager implements RelationManager {
private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class );
private final EntityManagerFig entityManagerFig;
private ManagerCache managerCache;
private EntityManager em;
private UUID applicationId;
private EntityRef headEntity;
private org.apache.usergrid.persistence.model.entity.Entity cpHeadEntity;
private final ApplicationScope applicationScope;
private final AsyncEventService indexService;
private final PipelineBuilderFactory pipelineBuilderFactory;
public CpRelationManager( final ManagerCache managerCache,
final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService,
final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId,
final EntityRef headEntity ) {
Assert.notNull( em, "Entity manager cannot be null" );
Assert.notNull( applicationId, "Application Id cannot be null" );
Assert.notNull( headEntity, "Head entity cannot be null" );
Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
Assert.notNull( indexService, "indexService cannot be null" );
this.entityManagerFig = entityManagerFig;
// TODO: this assert should not be failing
//Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" );
this.em = em;
this.applicationId = applicationId;
this.headEntity = headEntity;
this.managerCache = managerCache;
this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
this.pipelineBuilderFactory = pipelineBuilderFactory;
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading head entity {}:{} from app {}", new Object[] {
headEntity.getType(), headEntity.getUuid(), applicationScope
} );
}
Id entityId = new SimpleId( headEntity.getUuid(), headEntity.getType() );
this.cpHeadEntity = ( ( CpEntityManager ) em ).load( entityId );
// commented out because it is possible that CP entity has not been created yet
Assert.notNull( cpHeadEntity, String
.format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) );
this.indexService = indexService;
}
@Override
public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
GraphManager gm = managerCache.getGraphManager( applicationScope );
String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
logger.debug( "getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}", new Object[] {
edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid()
} );
Observable<Set<String>> types =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) )
.collect( () -> new HashSet<>(), ( set, type ) -> set.add( type ) );
return types.toBlocking().last();
}
@Override
public Map<String, Map<UUID, Set<String>>> getOwners() throws Exception {
// TODO: do we need to restrict this to edges prefixed with owns?
//Map<EntityRef, Set<String>> containerEntities = getContainers(-1, "owns", null);
Map<EntityRef, Set<String>> containerEntities = getContainers();
Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();
for ( EntityRef owner : containerEntities.keySet() ) {
Set<String> collections = containerEntities.get( owner );
for ( String collection : collections ) {
MapUtils.addMapMapSet( owners, owner.getType(), owner.getUuid(), collection );
}
}
return owners;
}
private Map<EntityRef, Set<String>> getContainers() {
return getContainers( -1, null, null );
}
/**
* Gets containing collections and/or connections depending on the edge type you pass in
*
* @param limit Max number to return
* @param edgeType Edge type, edge type prefix or null to allow any edge type
* @param fromEntityType Only consider edges from entities of this type
*/
Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) {
final GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<Edge> edges =
gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
.flatMap( new Func1<String, Observable<Edge>>() {
@Override
public Observable<Edge> call( final String edgeType ) {
return gm.loadEdgesToTarget(
new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
}
} );
//if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory
if ( limit > -1 ) {
edges = edges.take( limit );
}
return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge ) -> {
if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() );
return;
}
final EntityRef eref =
new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
String name = getNameFromEdgeType( edge.getType() );
addMapSet( entityRefSetMap, eref, name );
} ).toBlocking().last();
}
@Override
public boolean isConnectionMember( String connectionType, EntityRef entity ) throws Exception {
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
logger.debug( "isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] {
connectionType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid()
} );
GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<Edge> edges = gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromConnectionType(new SimpleId(headEntity.getUuid(), headEntity.getType()), connectionType, entityId)
);
return edges.toBlocking().firstOrDefault( null ) != null;
}
@SuppressWarnings( "unchecked" )
@Override
public boolean isCollectionMember( String collectionName, EntityRef entity ) throws Exception {
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
logger.debug( "isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] {
collectionName, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid()
} );
GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<Edge> edges = gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName(new SimpleId(headEntity.getUuid(), headEntity.getType()), collectionName, entityId)
);
return edges.toBlocking().firstOrDefault( null ) != null;
}
@Override
public Set<String> getCollections() throws Exception {
final Set<String> indexes = new HashSet<String>();
GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<String> str =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
Iterator<String> iter = str.toBlocking().getIterator();
while ( iter.hasNext() ) {
String edgeType = iter.next();
indexes.add( getNameFromEdgeType( edgeType ) );
}
return indexes;
}
@Override
public Results getCollection( String collectionName, UUID startResult, int count, Level resultsLevel,
boolean reversed ) throws Exception {
final String ql;
if ( startResult != null ) {
ql = "select * where created > " + startResult.timestamp();
}
else {
ql = "select *";
}
Query query = Query.fromQL( ql );
query.setLimit( count );
query.setReversed( reversed );
return searchCollection( collectionName, query );
}
@Override
public Results getCollection( String collectionName, Query query, Level level ) throws Exception {
return searchCollection( collectionName, query );
}
// add to a named collection of the head entity
@Override
public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {
Preconditions.checkNotNull(itemRef,"itemref is null");
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( ( collection != null && collection.getType()!=null ) && !collection.getType().equals( itemRef.getType() ) ) {
return null;
}
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load(entityId);
// don't fetch entity if we've already got one
final Entity itemEntity;
if ( itemRef instanceof Entity ) {
itemEntity = ( Entity ) itemRef;
}
else {
itemEntity = em.get( itemRef );
}
if ( itemEntity == null ) {
return null;
}
if ( memberEntity == null ) {
throw new RuntimeException(
"Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
}
if ( logger.isDebugEnabled() ) {
logger.debug( "Loaded member entity {}:{} from app {}\n " + " data {}", new Object[] {
itemRef.getType(), itemRef.getUuid(), applicationScope, CpEntityMapUtils.toMap( memberEntity )
} );
}
// create graph edge connection from head entity to member entity
final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntity.getId() );
final String linkedCollection = collection.getLinkedCollection();
GraphManager gm = managerCache.getGraphManager(applicationScope);
gm.writeEdge( edge )
.doOnNext( writtenEdge -> {
if (logger.isDebugEnabled()) {
logger.debug("Wrote edge {}", writtenEdge);
}
})
.filter(writtenEdge -> linkedCollection != null )
.flatMap(writtenEdge -> {
final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
final Edge reverseEdge = createCollectionEdge( memberEntity.getId(), pluralType, cpHeadEntity.getId() );
//reverse
return gm.writeEdge(reverseEdge).doOnNext(reverseEdgeWritten -> {
indexService.queueNewEdge(applicationScope, cpHeadEntity, reverseEdge);
});
})
.doOnCompleted(() -> {
indexService.queueNewEdge(applicationScope, memberEntity, edge);
if (logger.isDebugEnabled()) {
logger.debug("Added entity {}:{} to collection {}", new Object[]{
itemRef.getUuid().toString(), itemRef.getType(), collectionName
});
}
})
.toBlocking().lastOrDefault( null );
//check if we need to reverse our edges
if ( logger.isDebugEnabled() ) {
logger.debug( "Added entity {}:{} to collection {}", new Object[] {
itemRef.getUuid().toString(), itemRef.getType(), collectionName
} );
}
return itemEntity;
}
@Override
public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
throws Exception {
if ( headEntity.getUuid().equals( applicationId ) ) {
if ( itemType.equals( TYPE_ENTITY ) ) {
itemType = singularize( collectionName );
}
if ( itemType.equals( TYPE_ROLE ) ) {
Long inactivity = ( Long ) properties.get( PROPERTY_INACTIVITY );
if ( inactivity == null ) {
inactivity = 0L;
}
return em.createRole( ( String ) properties.get( PROPERTY_NAME ),
( String ) properties.get( PROPERTY_TITLE ), inactivity );
}
return em.create( itemType, properties );
}
else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collectionName.equals( COLLECTION_ROLES ) ) ) {
UUID groupId = headEntity.getUuid();
String roleName = ( String ) properties.get( PROPERTY_NAME );
return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
}
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
return null;
}
properties = getDefaultSchema().cleanUpdatedProperties( itemType, properties, true );
Entity itemEntity = em.create( itemType, properties );
if ( itemEntity != null ) {
addToCollection( collectionName, itemEntity );
if ( collection != null && collection.getLinkedCollection() != null ) {
Id itemEntityId = new SimpleId( itemEntity.getUuid(), itemEntity.getType() );
final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, itemEntityId );
GraphManager gm = managerCache.getGraphManager( applicationScope );
gm.writeEdge( edge );
}
}
return itemEntity;
}
@Override
public void removeFromCollection( String collectionName, EntityRef itemRef ) throws Exception {
// special handling for roles collection of the application
if ( headEntity.getUuid().equals( applicationId ) ) {
if ( collectionName.equals( COLLECTION_ROLES ) ) {
Entity itemEntity = em.get( itemRef );
if ( itemEntity != null ) {
RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole( roleRef.getApplicationRoleName() );
return;
}
em.delete( itemEntity );
return;
}
em.delete( itemRef );
return;
}
// load the entity to be removed to the collection
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading entity to remove from collection " + "{}:{} from app {}\n", new Object[] {
itemRef.getType(), itemRef.getUuid(), applicationScope
} );
}
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId );
// remove edge from collection to item
GraphManager gm = managerCache.getGraphManager( applicationScope );
//run our delete
gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName(cpHeadEntity.getId(), collectionName, memberEntity.getId())
)
.flatMap(edge -> gm.markEdge(edge))
.flatMap(edge -> gm.deleteEdge(edge))
.toBlocking().lastOrDefault(null);
/**
* Remove from the index
*
*/
final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
final EntityIndexBatch batch = ei.createBatch();
// remove item from collection index
SearchEdge indexScope = createCollectionSearchEdge( cpHeadEntity.getId(), collectionName );
batch.deindex( indexScope, memberEntity );
batch.execute();
// special handling for roles collection of a group
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
if ( collectionName.equals( COLLECTION_ROLES ) ) {
String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );
if ( path.startsWith( "/roles/" ) ) {
Entity itemEntity =
em.get( new SimpleEntityRef( memberEntity.getId().getType(), memberEntity.getId().getUuid() ) );
RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole( roleRef.getApplicationRoleName() );
}
}
}
}
@Override
public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
throws Exception {
headEntity = em.validate( headEntity );
dstEntityRef = em.validate( dstEntityRef );
CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
Results results = null;
do {
if ( srcCollection != null ) {
results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false );
}
else {
results = em.getTargetEntities( headEntity, srcRelationName, null, Level.REFS );
}
if ( ( results != null ) && ( results.size() > 0 ) ) {
List<EntityRef> refs = results.getRefs();
for ( EntityRef ref : refs ) {
if ( dstCollection != null ) {
em.addToCollection( dstEntityRef, dstRelationName, ref );
}
else {
em.createConnection( dstEntityRef, dstRelationName, ref );
}
}
}
}
while ( ( results != null ) && ( results.hasMoreResults() ) );
}
@Override
public Results searchCollection( String collectionName, Query query ) throws Exception {
if ( query == null ) {
query = new Query();
query.setCollection( collectionName );
}
headEntity = em.validate( headEntity );
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( collection == null ) {
throw new RuntimeException(
"Cannot find collection-info for '" + collectionName + "' of " + headEntity.getType() + ":" + headEntity
.getUuid() );
}
query.setEntityType( collection.getType() );
query = adjustQuery( query );
final IdBuilder pipelineBuilder =
pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() )
.withLimit( query.getLimit() ).fromId( cpHeadEntity.getId() );
final EntityBuilder results;
if ( query.isGraphSearch() ) {
results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
}
else {
final String entityType = collection.getType();
results = pipelineBuilder.searchCollection( collectionName, query.getQl().get() , entityType).loadEntities();
}
return new EntityQueryExecutor( results.build() ).next();
}
@Override
public Results searchCollectionConsistent( String collectionName, Query query, int expectedResults )
throws Exception {
Results results;
long maxLength = entityManagerFig.pollForRecordsTimeout();
long sleepTime = entityManagerFig.sleep();
boolean found;
long current = System.currentTimeMillis(), length = 0;
do {
results = searchCollection( collectionName, query );
length = System.currentTimeMillis() - current;
found = expectedResults == results.size();
if ( found ) {
break;
}
Thread.sleep(sleepTime);
}while (!found && length <= maxLength);
if(logger.isInfoEnabled()){
logger.info(String.format("Consistent Search finished in %s, results=%s, expected=%s...dumping stack",length, results.size(),expectedResults));
}
return results;
}
@Override
public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
return createConnection( connection.getConnectionType(), connection.getTargetRefs() );
}
@Override
public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {
headEntity = em.validate( headEntity );
connectedEntityRef = em.validate( connectedEntityRef );
ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
if ( logger.isDebugEnabled() ) {
logger.debug( "createConnection(): " + "Indexing connection type '{}'\n from source {}:{}]\n"
+ " to target {}:{}\n app {}", new Object[] {
connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(),
connectedEntityRef.getUuid(), applicationScope
} );
}
final Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() );
final org.apache.usergrid.persistence.model.entity.Entity targetEntity = ( ( CpEntityManager ) em ).load( entityId );
// create graph edge connection from head entity to member entity
final Edge edge = createConnectionEdge( cpHeadEntity.getId(), connectionType, targetEntity.getId() );
final GraphManager gm = managerCache.getGraphManager( applicationScope );
gm.writeEdge( edge ).toBlocking().last();
indexService.queueNewEdge( applicationScope, targetEntity, edge );
return connection;
}
@Override
public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
EntityRef connectedEntityRef ) throws Exception {
throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {
throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {
ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
return connection;
}
@Override
public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
EntityRef connectedEntityRef ) throws Exception {
throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {
throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
// First, clean up the dictionary records of the connection
EntityRef connectingEntityRef = connectionRef.getSourceRefs(); // source
EntityRef connectedEntityRef = connectionRef.getTargetRefs(); // target
String connectionType = connectionRef.getTargetRefs().getConnectionType();
if ( logger.isDebugEnabled() ) {
logger.debug( "Deleting connection '{}' from source {}:{} \n to target {}:{}", new Object[] {
connectionType, connectingEntityRef.getType(), connectingEntityRef.getUuid(),
connectedEntityRef.getType(), connectedEntityRef.getUuid()
} );
}
Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() );
org.apache.usergrid.persistence.model.entity.Entity targetEntity = ( ( CpEntityManager ) em ).load( entityId );
// Delete graph edge connection from head entity to member entity
Edge edge = new SimpleEdge( new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
connectionType, targetEntity.getId(), System.currentTimeMillis() );
GraphManager gm = managerCache.getGraphManager( applicationScope );
final Id sourceId = new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() );
final SearchByEdge search = createConnectionSearchByEdge( sourceId, connectionType, targetEntity.getId() );
//delete all the edges and queue their processing
gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).doOnNext(
returnedEdge -> indexService.queueDeleteEdge( applicationScope, returnedEdge ) ).toBlocking()
.lastOrDefault( null );
}
@Override
public Set<String> getConnectionTypes( UUID connectedEntityId ) throws Exception {
throw new UnsupportedOperationException( "Cannot specify entity by UUID alone." );
}
@Override
public Set<String> getConnectionTypes() throws Exception {
return getConnectionTypes( false );
}
@Override
public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {
Set<String> connections = cast( em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) );
if ( connections == null ) {
return null;
}
if ( filterConnection && ( connections.size() > 0 ) ) {
connections.remove( "connection" );
}
return connections;
}
@Override
public Results getTargetEntities( String connectionType, String connectedEntityType, Level level )
throws Exception {
//until this is refactored properly, we will delegate to a search by query
Results raw = null;
Preconditions.checkNotNull( connectionType, "connectionType cannot be null" );
Query query = new Query();
query.setConnectionType( connectionType );
query.setEntityType( connectedEntityType );
query.setResultsLevel( level );
return searchTargetEntities( query );
}
@Override
public Results getSourceEntities( String connType, String fromEntityType, Level resultsLevel ) throws Exception {
return getSourceEntities( connType, fromEntityType, resultsLevel, -1 );
}
@Override
public Results getSourceEntities( String connType, String fromEntityType, Level level, int count )
throws Exception {
// looking for edges to the head entity
String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connType );
Map<EntityRef, Set<String>> containers = getContainers( count, edgeType, fromEntityType );
if ( Level.REFS.equals( level ) ) {
List<EntityRef> refList = new ArrayList<EntityRef>( containers.keySet() );
return Results.fromRefList( refList );
}
if ( Level.IDS.equals( level ) ) {
// TODO: someday this should return a list of Core Persistence Ids
List<UUID> idList = new ArrayList<UUID>();
for ( EntityRef ref : containers.keySet() ) {
idList.add( ref.getUuid() );
}
return Results.fromIdList( idList );
}
List<Entity> entities = new ArrayList<Entity>();
for ( EntityRef ref : containers.keySet() ) {
Entity entity = em.get( ref );
logger.debug( " Found connecting entity: " + entity.getProperties() );
entities.add( entity );
}
return Results.fromEntities( entities );
}
@Override
public Results searchTargetEntities( Query query ) throws Exception {
Preconditions.checkNotNull( query, "query cannot be null" );
final String connection = query.getConnectionType();
Preconditions.checkNotNull( connection, "connection must be specified" );
headEntity = em.validate( headEntity );
query = adjustQuery( query );
final Optional<String> entityType = Optional.fromNullable(query.getEntityType()) ;
//set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
// -> 1.0 results
// startid -- graph edge load -- entity load (verify) from ids -> results page collector
// startid -- eq query candiddate -- entity load (verify) from canddiates -> results page collector
//startid -- graph edge load -- entity id verify --> filter to connection ref --> connection ref collector
//startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref
// collector
final IdBuilder
pipelineBuilder = pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() ).withLimit( query.getLimit() ).fromId(
cpHeadEntity.getId() );
if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) {
final IdBuilder traversedIds;
if ( query.isGraphSearch() ) {
traversedIds = pipelineBuilder.traverseConnection( connection, entityType );
}
else {
traversedIds =
pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds();
}
//create connection refs
final Observable<ResultsPage<ConnectionRef>> results =
traversedIds.loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
return new ConnectionRefQueryExecutor( results ).next();
}
//we want to load all entities
final Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> results;
if ( query.isGraphSearch() ) {
results = pipelineBuilder.traverseConnection( connection, entityType ).loadEntities().build();
}
else {
results = pipelineBuilder.searchConnection( connection, query.getQl().get() , entityType).loadEntities().build();
}
return new EntityQueryExecutor( results ).next();
}
private Query adjustQuery( Query query ) {
// handle the select by identifier case
if ( query.getQl().isPresent() ) {
return query;
}
// a name alias or email alias was specified
if ( query.containsSingleNameOrEmailIdentifier() ) {
Identifier ident = query.getSingleIdentifier();
// an email was specified. An edge case that only applies to users.
// This is fulgy to put here, but required.
if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
final String newQuery = "select * where email='" + query.getSingleNameOrEmailIdentifier() + "'";
query.setQl( newQuery );
}
// use the ident with the default alias. could be an email
else {
final String newQuery = "select * where name='" + query.getSingleNameOrEmailIdentifier() + "'";
query.setQl( newQuery );
}
}
else if ( query.containsSingleUuidIdentifier() ) {
//TODO, this shouldn't even come from ES, it should look up the entity directly
final String newQuery = "select * where uuid=" + query.getSingleUuidIdentifier() + "";
query.setQl( newQuery );
}
//TODO T.N. not sure if we still need this. If we do then we ned to modify our query interface.
// final String ql = query.getQl();
//
// if ( query.isReversed() && ( StringUtils.isEmpty( ql ) || !(ql.contains( "order by " )) )) {
//
//
// final String sortQueryString =
//
// SortPredicate
// Query.SortPredicate desc =
// new Query.SortPredicate( PROPERTY_CREATED, Query.SortDirection.DESCENDING );
//
// try {
// query.addSort( desc );
// }
// catch ( Exception e ) {
// logger.warn( "Attempted to reverse sort order already set", PROPERTY_CREATED );
// }
// }
return query;
}
@Override
public Set<String> getConnectionIndexes( String connectionType ) throws Exception {
throw new UnsupportedOperationException( "Not supported yet." );
}
/** side effect: converts headEntity into an Entity if it is an EntityRef! */
private Entity getHeadEntity() throws Exception {
Entity entity = null;
if ( headEntity instanceof Entity ) {
entity = ( Entity ) headEntity;
}
else {
entity = em.get( headEntity );
headEntity = entity;
}
return entity;
}
}