| /* |
| * Copyright 2014 The Apache Software Foundation. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.usergrid.corepersistence; |
| |
| |
| import java.nio.ByteBuffer; |
| import java.util.AbstractMap; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.util.Assert; |
| |
| import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; |
| import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor; |
| import org.apache.usergrid.corepersistence.results.QueryExecutor; |
| import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl; |
| import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; |
| import org.apache.usergrid.corepersistence.util.CpNamingUtils; |
| import org.apache.usergrid.persistence.ConnectedEntityRef; |
| import org.apache.usergrid.persistence.ConnectionRef; |
| import org.apache.usergrid.persistence.Entity; |
| import org.apache.usergrid.persistence.EntityManager; |
| import org.apache.usergrid.persistence.EntityRef; |
| import org.apache.usergrid.persistence.IndexBucketLocator; |
| import org.apache.usergrid.persistence.PagingResultsIterator; |
| import org.apache.usergrid.persistence.RelationManager; |
| import org.apache.usergrid.persistence.Results; |
| import org.apache.usergrid.persistence.RoleRef; |
| import org.apache.usergrid.persistence.Schema; |
| import org.apache.usergrid.persistence.SimpleEntityRef; |
| import org.apache.usergrid.persistence.SimpleRoleRef; |
| import org.apache.usergrid.persistence.cassandra.CassandraService; |
| import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; |
| import org.apache.usergrid.persistence.cassandra.IndexUpdate; |
| import org.apache.usergrid.persistence.cassandra.QueryProcessorImpl; |
| import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner; |
| import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner; |
| import org.apache.usergrid.persistence.cassandra.index.IndexScanner; |
| import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner; |
| import org.apache.usergrid.persistence.collection.CollectionScope; |
| import org.apache.usergrid.persistence.core.future.BetterFuture; |
| import org.apache.usergrid.persistence.core.metrics.MetricsFactory; |
| import org.apache.usergrid.persistence.core.scope.ApplicationScope; |
| import org.apache.usergrid.persistence.entities.Group; |
| import org.apache.usergrid.persistence.entities.User; |
| import org.apache.usergrid.persistence.geo.ConnectionGeoSearch; |
| import org.apache.usergrid.persistence.geo.EntityLocationRef; |
| import org.apache.usergrid.persistence.geo.model.Point; |
| import org.apache.usergrid.persistence.graph.Edge; |
| import org.apache.usergrid.persistence.graph.GraphManager; |
| import org.apache.usergrid.persistence.graph.SearchByEdgeType; |
| import org.apache.usergrid.persistence.graph.impl.SimpleEdge; |
| import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; |
| import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; |
| import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; |
| import org.apache.usergrid.persistence.index.EntityIndex; |
| import org.apache.usergrid.persistence.index.EntityIndexBatch; |
| import org.apache.usergrid.persistence.index.IndexScope; |
| import org.apache.usergrid.persistence.index.SearchTypes; |
| import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; |
| import org.apache.usergrid.persistence.index.query.Identifier; |
| import org.apache.usergrid.persistence.index.query.Query; |
| import org.apache.usergrid.persistence.index.query.Query.Level; |
| import org.apache.usergrid.persistence.model.entity.Id; |
| import org.apache.usergrid.persistence.model.entity.SimpleId; |
| import org.apache.usergrid.persistence.model.util.UUIDGenerator; |
| import org.apache.usergrid.persistence.query.ir.AllNode; |
| import org.apache.usergrid.persistence.query.ir.NameIdentifierNode; |
| import org.apache.usergrid.persistence.query.ir.QueryNode; |
| import org.apache.usergrid.persistence.query.ir.QuerySlice; |
| import org.apache.usergrid.persistence.query.ir.SearchVisitor; |
| import org.apache.usergrid.persistence.query.ir.WithinNode; |
| import org.apache.usergrid.persistence.query.ir.result.ConnectionIndexSliceParser; |
| import org.apache.usergrid.persistence.query.ir.result.ConnectionResultsLoaderFactory; |
| import org.apache.usergrid.persistence.query.ir.result.ConnectionTypesIterator; |
| import org.apache.usergrid.persistence.query.ir.result.EmptyIterator; |
| import org.apache.usergrid.persistence.query.ir.result.GeoIterator; |
| import org.apache.usergrid.persistence.query.ir.result.SliceIterator; |
| import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator; |
| import org.apache.usergrid.persistence.schema.CollectionInfo; |
| import org.apache.usergrid.utils.IndexUtils; |
| import org.apache.usergrid.utils.MapUtils; |
| import org.apache.usergrid.utils.UUIDUtils; |
| |
| import com.codahale.metrics.Timer; |
| import com.google.common.base.Preconditions; |
| |
| import me.prettyprint.hector.api.Keyspace; |
| import me.prettyprint.hector.api.beans.DynamicComposite; |
| import me.prettyprint.hector.api.beans.HColumn; |
| import me.prettyprint.hector.api.mutation.Mutator; |
| import rx.Observable; |
| import rx.functions.Action1; |
| import rx.functions.Action2; |
| import rx.functions.Func1; |
| |
| import static java.util.Arrays.asList; |
| |
| import static me.prettyprint.hector.api.factory.HFactory.createMutator; |
| import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType; |
| import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES; |
| import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES; |
| import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES; |
| import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES; |
| import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES; |
| import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS; |
| import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED; |
| import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY; |
| import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME; |
| import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE; |
| import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; |
| import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY; |
| import static org.apache.usergrid.persistence.Schema.TYPE_ROLE; |
| import static org.apache.usergrid.persistence.Schema.getDefaultSchema; |
| import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES; |
| import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES; |
| import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX; |
| import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES; |
| import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator; |
| import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator; |
| import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute; |
| import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key; |
| import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT; |
| import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex; |
| import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex; |
| import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex; |
| import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex; |
| import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode; |
| import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue; |
| import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue; |
| import static org.apache.usergrid.persistence.cassandra.Serializers.be; |
| import static org.apache.usergrid.utils.ClassUtils.cast; |
| import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag; |
| import static org.apache.usergrid.utils.InflectionUtils.singularize; |
| import static org.apache.usergrid.utils.MapUtils.addMapSet; |
| import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros; |
| |
| |
| /** |
| * Implement good-old Usergrid RelationManager with the new-fangled Core Persistence API. |
| */ |
| public class CpRelationManager implements RelationManager { |
| |
| private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class ); |
| |
| |
| private CpEntityManagerFactory emf; |
| |
| private ManagerCache managerCache; |
| |
| private EntityManager em; |
| |
| private UUID applicationId; |
| |
| private EntityRef headEntity; |
| |
| private org.apache.usergrid.persistence.model.entity.Entity cpHeadEntity; |
| |
| private ApplicationScope applicationScope; |
| private CollectionScope headEntityScope; |
| |
| private CassandraService cass; |
| |
| private IndexBucketLocator indexBucketLocator; |
| |
| private MetricsFactory metricsFactory; |
| private Timer updateCollectionTimer; |
| private Timer createConnectionTimer; |
| private Timer cassConnectionDelete; |
| private Timer esDeleteConnectionTimer; |
| |
| public CpRelationManager() {} |
| |
| |
| public CpRelationManager init( |
| EntityManager em, |
| CpEntityManagerFactory emf, |
| UUID applicationId, |
| EntityRef headEntity, |
| IndexBucketLocator indexBucketLocator, |
| MetricsFactory metricsFactory) { |
| |
| Assert.notNull( em, "Entity manager cannot be null" ); |
| Assert.notNull( emf, "Entity manager factory cannot be null" ); |
| Assert.notNull( applicationId, "Application Id cannot be null" ); |
| Assert.notNull( headEntity, "Head entity cannot be null" ); |
| Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" ); |
| |
| // TODO: this assert should not be failing |
| //Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" ); |
| |
| this.em = em; |
| this.emf = emf; |
| this.applicationId = applicationId; |
| this.headEntity = headEntity; |
| this.managerCache = emf.getManagerCache(); |
| this.applicationScope = CpNamingUtils.getApplicationScope( applicationId ); |
| |
| this.cass = em.getCass(); // TODO: eliminate need for this via Core Persistence |
| this.indexBucketLocator = indexBucketLocator; // TODO: this also |
| this.metricsFactory = metricsFactory; |
| this.updateCollectionTimer = metricsFactory |
| .getTimer( CpRelationManager.class, "relation.manager.es.update.collection" ); |
| this.createConnectionTimer = metricsFactory |
| .getTimer( CpRelationManager.class, "relation.manager.es.create.connection.timer" ); |
| this.cassConnectionDelete = metricsFactory |
| .getTimer( CpRelationManager.class, "relation.manager.cassandra.delete.connection.batch.timer" ); |
| this.esDeleteConnectionTimer = metricsFactory.getTimer(CpRelationManager.class, "relation.manager.es.delete.connection.batch.timer" ); |
| // load the Core Persistence version of the head entity as well |
| this.headEntityScope = getCollectionScopeNameFromEntityType( |
| applicationScope.getApplication(), headEntity.getType()); |
| |
| if ( logger.isDebugEnabled() ) { |
| logger.debug( "Loading head entity {}:{} from scope\n app {}\n owner {}\n name {}", |
| new Object[] { |
| headEntity.getType(), |
| headEntity.getUuid(), |
| headEntityScope.getApplication(), |
| headEntityScope.getOwner(), |
| headEntityScope.getName() |
| } ); |
| } |
| |
| Id entityId = new SimpleId( headEntity.getUuid(), headEntity.getType() ); |
| |
| // if(headEntity instanceof Entity){ |
| // cpHeadEntity = entityToCpEntity( (Entity)headEntity, headEntity.getUuid() ); |
| // }else { |
| this.cpHeadEntity = |
| ( ( CpEntityManager ) em ).load( new CpEntityManager.EntityScope( headEntityScope, entityId ) ); |
| // } |
| |
| // commented out because it is possible that CP entity has not been created yet |
| Assert.notNull( cpHeadEntity, "cpHeadEntity cannot be null" ); |
| |
| |
| return this; |
| } |
| |
| |
| @Override |
| public Set<String> getCollectionIndexes( String collectionName ) throws Exception { |
| final Set<String> indexes = new HashSet<String>(); |
| |
| GraphManager gm = managerCache.getGraphManager(applicationScope); |
| |
| String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ); |
| |
| logger.debug("getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}", |
| new Object[] { |
| edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() |
| }); |
| |
| Observable<String> types= gm.getEdgeTypesFromSource( |
| new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) ); |
| |
| Iterator<String> iter = types.toBlockingObservable().getIterator(); |
| while ( iter.hasNext() ) { |
| indexes.add( iter.next() ); |
| } |
| return indexes; |
| } |
| |
| |
| @Override |
| public Map<String, Map<UUID, Set<String>>> getOwners() throws Exception { |
| |
| // TODO: do we need to restrict this to edges prefixed with owns? |
| //Map<EntityRef, Set<String>> containerEntities = getContainers(-1, "owns", null); |
| Map<EntityRef, Set<String>> containerEntities = getContainers(); |
| |
| Map<String, Map<UUID, Set<String>>> owners = |
| new LinkedHashMap<String, Map<UUID, Set<String>>>(); |
| |
| for ( EntityRef owner : containerEntities.keySet() ) { |
| Set<String> collections = containerEntities.get( owner ); |
| for ( String collection : collections ) { |
| MapUtils.addMapMapSet( owners, owner.getType(), owner.getUuid(), collection ); |
| } |
| } |
| |
| return owners; |
| } |
| |
| |
| private Map<EntityRef, Set<String>> getContainers() { |
| return getContainers( -1, null, null ); |
| } |
| |
| |
| /** |
| * Gets containing collections and/or connections depending on the edge type you pass in |
| * |
| * @param limit Max number to return |
| * @param edgeType Edge type, edge type prefix or null to allow any edge type |
| * @param fromEntityType Only consider edges from entities of this type |
| */ |
| Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) { |
| |
| Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>(); |
| |
| final GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| |
| Observable<Edge> edges = |
| gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) ) |
| .flatMap( new Func1<String, Observable<Edge>>() { |
| @Override |
| public Observable<Edge> call( final String edgeType ) { |
| return gm.loadEdgesToTarget( |
| new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE, |
| SearchByEdgeType.Order.DESCENDING, null ) ); |
| |
| } |
| } ); |
| |
| //if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory |
| if ( limit > -1 ) { |
| edges = edges.take( limit ); |
| } |
| |
| |
| return edges.collect( results, new Action2<Map<EntityRef, Set<String>>, Edge>() { |
| @Override |
| public void call( final Map<EntityRef, Set<String>> entityRefSetMap, final Edge edge ) { |
| if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) { |
| logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() ); |
| return; |
| } |
| |
| final EntityRef eref = |
| new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() ); |
| |
| String name; |
| if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) { |
| name = CpNamingUtils.getConnectionType( edge.getType() ); |
| } |
| else { |
| name = CpNamingUtils.getCollectionName( edge.getType() ); |
| } |
| addMapSet( entityRefSetMap, eref, name ); |
| } |
| } ).toBlocking().last(); |
| } |
| |
| |
| public void updateContainingCollectionAndCollectionIndexes( |
| final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) { |
| |
| |
| final GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| |
| Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( |
| cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator(); |
| |
| logger.debug("updateContainingCollectionsAndCollections(): " |
| + "Searched for edges to target {}:{}\n in scope {}\n found: {}", |
| new Object[] { |
| cpHeadEntity.getId().getType(), |
| cpHeadEntity.getId().getUuid(), |
| applicationScope.getApplication(), |
| edgeTypesToTarget.hasNext() |
| }); |
| |
| // loop through all types of edge to target |
| |
| |
| final EntityIndex ei = managerCache.getEntityIndex( applicationScope ); |
| |
| final EntityIndexBatch entityIndexBatch = ei.createBatch(); |
| |
| final int count = gm.getEdgeTypesToTarget( |
| new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) ) |
| |
| // for each edge type, emit all the edges of that type |
| .flatMap( new Func1<String, Observable<Edge>>() { |
| @Override |
| public Observable<Edge> call( final String etype ) { |
| return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( |
| cpHeadEntity.getId(), etype, Long.MAX_VALUE, |
| SearchByEdgeType.Order.DESCENDING, null ) ); |
| } |
| } ) |
| |
| //for each edge we receive index and add to the batch |
| .doOnNext( new Action1<Edge>() { |
| @Override |
| public void call( final Edge edge ) { |
| |
| EntityRef sourceEntity = |
| new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() ); |
| |
| // reindex the entity in the source entity's collection or connection index |
| |
| IndexScope indexScope; |
| if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) { |
| |
| String collName = CpNamingUtils.getCollectionName( edge.getType() ); |
| indexScope = new IndexScopeImpl( |
| new SimpleId( sourceEntity.getUuid(), sourceEntity.getType()), |
| CpNamingUtils.getCollectionScopeNameFromCollectionName( collName )); |
| } |
| else { |
| |
| String connName = CpNamingUtils.getConnectionType( edge.getType() ); |
| indexScope = new IndexScopeImpl( |
| new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ), |
| CpNamingUtils.getConnectionScopeName( connName ) ); |
| } |
| |
| entityIndexBatch.index( indexScope, cpEntity ); |
| |
| // reindex the entity in the source entity's all-types index |
| |
| //TODO REMOVE INDEX CODE |
| // indexScope = new IndexScopeImpl( new SimpleId( |
| // sourceEntity.getUuid(), sourceEntity.getType() ), CpNamingUtils |
| // .ALL_TYPES, entityType ); |
| // |
| // entityIndexBatch.index( indexScope, cpEntity ); |
| } |
| } ).count().toBlocking().lastOrDefault( 0 ); |
| |
| //Adding graphite metrics |
| Timer.Context timeElasticIndexBatch = updateCollectionTimer.time(); |
| entityIndexBatch.execute(); |
| timeElasticIndexBatch.stop(); |
| |
| logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count ); |
| } |
| |
| |
| @Override |
| public boolean isConnectionMember( String connectionType, EntityRef entity ) throws Exception { |
| |
| Id entityId = new SimpleId( entity.getUuid(), entity.getType() ); |
| |
| String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ); |
| |
| logger.debug("isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}", |
| new Object[] { |
| edgeType, |
| headEntity.getType(), headEntity.getUuid(), |
| entity.getType(), entity.getUuid() }); |
| |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| Observable<Edge> edges = gm.loadEdgeVersions( new SimpleSearchByEdge( |
| new SimpleId( headEntity.getUuid(), headEntity.getType() ), |
| edgeType, |
| entityId, |
| Long.MAX_VALUE, |
| SearchByEdgeType.Order.DESCENDING, |
| null ) ); |
| |
| return edges.toBlockingObservable().firstOrDefault( null ) != null; |
| } |
| |
| |
| @SuppressWarnings( "unchecked" ) |
| @Override |
| public boolean isCollectionMember( String collName, EntityRef entity ) throws Exception { |
| |
| Id entityId = new SimpleId( entity.getUuid(), entity.getType() ); |
| |
| String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collName ); |
| |
| logger.debug("isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}", |
| new Object[] { |
| edgeType, |
| headEntity.getType(), headEntity.getUuid(), |
| entity.getType(), entity.getUuid() }); |
| |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| Observable<Edge> edges = gm.loadEdgeVersions( new SimpleSearchByEdge( |
| new SimpleId( headEntity.getUuid(), headEntity.getType() ), |
| edgeType, |
| entityId, |
| Long.MAX_VALUE, |
| SearchByEdgeType.Order.DESCENDING, |
| null ) ); |
| |
| return edges.toBlockingObservable().firstOrDefault( null ) != null; |
| } |
| |
| |
| private boolean moreThanOneInboundConnection( EntityRef target, String connectionType ) { |
| |
| Id targetId = new SimpleId( target.getUuid(), target.getType() ); |
| |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| |
| Observable<Edge> edgesToTarget = gm.loadEdgesToTarget( new SimpleSearchByEdgeType( |
| targetId, |
| CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ), |
| System.currentTimeMillis(), |
| SearchByEdgeType.Order.DESCENDING, |
| null ) ); // last |
| |
| Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator(); |
| int count = 0; |
| while ( iterator.hasNext() ) { |
| iterator.next(); |
| if ( count++ > 1 ) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| |
| private boolean moreThanOneOutboundConnection( EntityRef source, String connectionType ) { |
| |
| Id sourceId = new SimpleId( source.getUuid(), source.getType() ); |
| |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| |
| Observable<Edge> edgesFromSource = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( |
| sourceId, |
| CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ), |
| System.currentTimeMillis(), |
| SearchByEdgeType.Order.DESCENDING, |
| null ) ); // last |
| |
| int count = edgesFromSource.take( 2 ).count().toBlocking().last(); |
| |
| return count > 1; |
| } |
| |
| |
| @Override |
| public Set<String> getCollections() throws Exception { |
| |
| final Set<String> indexes = new HashSet<String>(); |
| |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| |
| Observable<String> str = gm.getEdgeTypesFromSource( |
| new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) ); |
| |
| Iterator<String> iter = str.toBlockingObservable().getIterator(); |
| while ( iter.hasNext() ) { |
| String edgeType = iter.next(); |
| indexes.add( CpNamingUtils.getCollectionName( edgeType ) ); |
| } |
| |
| return indexes; |
| } |
| |
| |
| @Override |
| public Results getCollection( String collectionName, |
| UUID startResult, |
| int count, |
| Level resultsLevel, |
| boolean reversed ) throws Exception { |
| |
| Query query = Query.fromQL( "select *" ); |
| query.setLimit( count ); |
| query.setReversed( reversed ); |
| |
| if ( startResult != null ) { |
| query.addGreaterThanEqualFilter( "created", startResult.timestamp() ); |
| } |
| |
| return searchCollection( collectionName, query ); |
| } |
| |
| |
| @Override |
| public Results getCollection( String collName, Query query, Level level ) throws Exception { |
| |
| return searchCollection( collName, query ); |
| } |
| |
| |
| // add to a named collection of the head entity |
| @Override |
| public Entity addToCollection( String collName, EntityRef itemRef ) throws Exception { |
| |
| CollectionInfo collection = |
| getDefaultSchema().getCollection( headEntity.getType(), collName ); |
| if ( ( collection != null ) && !collection.getType().equals( itemRef.getType() ) ) { |
| return null; |
| } |
| |
| return addToCollection( collName, itemRef, |
| ( collection != null && collection.getLinkedCollection() != null ) ); |
| } |
| |
| |
| public Entity addToCollection( String collName, EntityRef itemRef, boolean connectBack ) |
| throws Exception { |
| |
| CollectionScope memberScope = getCollectionScopeNameFromEntityType( |
| applicationScope.getApplication(), itemRef.getType()); |
| |
| Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() ); |
| org.apache.usergrid.persistence.model.entity.Entity memberEntity = |
| ((CpEntityManager)em).load( new CpEntityManager.EntityScope( memberScope, entityId)); |
| |
| return addToCollection(collName, itemRef, memberEntity, connectBack); |
| } |
| |
| |
| public Entity addToCollection(final String collName, final EntityRef itemRef, |
| final org.apache.usergrid.persistence.model.entity.Entity memberEntity, final boolean connectBack ) |
| throws Exception { |
| |
| // don't fetch entity if we've already got one |
| final Entity itemEntity; |
| if ( itemRef instanceof Entity ) { |
| itemEntity = ( Entity ) itemRef; |
| } |
| else { |
| itemEntity = em.get( itemRef ); |
| } |
| |
| if ( itemEntity == null ) { |
| return null; |
| } |
| |
| CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collName ); |
| if ( ( collection != null ) && !collection.getType().equals( itemRef.getType() ) ) { |
| return null; |
| } |
| |
| // load the new member entity to be added to the collection from its default scope |
| CollectionScope memberScope = getCollectionScopeNameFromEntityType( |
| applicationScope.getApplication(), itemRef.getType()); |
| |
| if ( memberEntity == null ) { |
| throw new RuntimeException( |
| "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() ); |
| } |
| |
| if ( logger.isDebugEnabled() ) { |
| logger.debug( "Loaded member entity {}:{} from scope\n app {}\n " |
| + "owner {}\n name {} data {}", |
| new Object[] { |
| itemRef.getType(), |
| itemRef.getUuid(), |
| memberScope.getApplication(), |
| memberScope.getOwner(), |
| memberScope.getName(), |
| CpEntityMapUtils.toMap( memberEntity ) |
| } ); |
| } |
| |
| String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collName ); |
| |
| UUID timeStampUuid = memberEntity.getId().getUuid() != null |
| && UUIDUtils.isTimeBased( memberEntity.getId().getUuid() ) |
| ? memberEntity.getId().getUuid() : UUIDUtils.newTimeUUID(); |
| |
| long uuidHash = UUIDUtils.getUUIDLong( timeStampUuid ); |
| |
| // create graph edge connection from head entity to member entity |
| Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash ); |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| gm.writeEdge( edge ).toBlockingObservable().last(); |
| |
| |
| if(logger.isDebugEnabled()) { |
| logger.debug( "Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}", new Object[] { |
| edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(), memberEntity.getId().getType(), |
| memberEntity.getId().getUuid(), applicationScope.getApplication().getType(), |
| applicationScope.getApplication().getUuid() |
| } ); |
| } |
| |
| ( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName ); |
| |
| if(logger.isDebugEnabled()) { |
| logger.debug( "Added entity {}:{} to collection {}", new Object[] { |
| itemRef.getUuid().toString(), itemRef.getType(), collName |
| } ); |
| } |
| // logger.debug("With head entity scope is {}:{}:{}", new Object[] { |
| // headEntityScope.getApplication().toString(), |
| // headEntityScope.getOwner().toString(), |
| // headEntityScope.getName()}); |
| |
| if ( connectBack && collection != null && collection.getLinkedCollection() != null ) { |
| getRelationManager( itemEntity ).addToCollection( |
| collection.getLinkedCollection(), headEntity, cpHeadEntity, false ); |
| getRelationManager( itemEntity ).addToCollection( |
| collection.getLinkedCollection(), headEntity, false ); |
| } |
| |
| return itemEntity; |
| } |
| |
| |
| @Override |
| public Entity addToCollections( List<EntityRef> owners, String collName ) throws Exception { |
| |
| // TODO: this addToCollections() implementation seems wrong. |
| for ( EntityRef eref : owners ) { |
| addToCollection( collName, eref ); |
| } |
| |
| return null; |
| } |
| |
| |
| @Override |
| public Entity createItemInCollection( |
| String collName, String itemType, Map<String, Object> properties) throws Exception { |
| |
| if ( headEntity.getUuid().equals( applicationId ) ) { |
| if ( itemType.equals( TYPE_ENTITY ) ) { |
| itemType = singularize( collName ); |
| } |
| |
| if ( itemType.equals( TYPE_ROLE ) ) { |
| Long inactivity = ( Long ) properties.get( PROPERTY_INACTIVITY ); |
| if ( inactivity == null ) { |
| inactivity = 0L; |
| } |
| return em.createRole( ( String ) properties.get( PROPERTY_NAME ), |
| ( String ) properties.get( PROPERTY_TITLE ), inactivity ); |
| } |
| return em.create( itemType, properties ); |
| } |
| |
| else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) |
| && ( collName.equals( COLLECTION_ROLES ) ) ) { |
| UUID groupId = headEntity.getUuid(); |
| String roleName = ( String ) properties.get( PROPERTY_NAME ); |
| return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) ); |
| } |
| |
| CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collName ); |
| if ( ( collection != null ) && !collection.getType().equals( itemType ) ) { |
| return null; |
| } |
| |
| properties = getDefaultSchema().cleanUpdatedProperties( itemType, properties, true ); |
| |
| Entity itemEntity = em.create( itemType, properties ); |
| |
| if ( itemEntity != null ) { |
| |
| addToCollection( collName, itemEntity ); |
| |
| if ( collection != null && collection.getLinkedCollection() != null ) { |
| getRelationManager( getHeadEntity() ) |
| .addToCollection( collection.getLinkedCollection(), itemEntity ); |
| } |
| } |
| |
| return itemEntity; |
| } |
| |
| |
| @Override |
| public void removeFromCollection( String collName, EntityRef itemRef ) throws Exception { |
| |
| // special handling for roles collection of the application |
| if ( headEntity.getUuid().equals( applicationId ) ) { |
| if ( collName.equals( COLLECTION_ROLES ) ) { |
| Entity itemEntity = em.get( itemRef ); |
| if ( itemEntity != null ) { |
| RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity ); |
| em.deleteRole( roleRef.getApplicationRoleName() ); |
| return; |
| } |
| em.delete( itemEntity ); |
| return; |
| } |
| em.delete( itemRef ); |
| return; |
| } |
| |
| // load the entity to be removed to the collection |
| CollectionScope memberScope = getCollectionScopeNameFromEntityType( |
| applicationScope.getApplication(), itemRef.getType()); |
| |
| if ( logger.isDebugEnabled() ) { |
| logger.debug( "Loading entity to remove from collection " |
| + "{}:{} from scope\n app {}\n owner {}\n name {}", |
| new Object[] { |
| itemRef.getType(), |
| itemRef.getUuid(), |
| memberScope.getApplication(), |
| memberScope.getOwner(), |
| memberScope.getName() |
| }); |
| } |
| |
| Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() ); |
| org.apache.usergrid.persistence.model.entity.Entity memberEntity = |
| ((CpEntityManager)em).load( new CpEntityManager.EntityScope( memberScope, entityId)); |
| |
| final EntityIndex ei = managerCache.getEntityIndex( applicationScope ); |
| final EntityIndexBatch batch = ei.createBatch(); |
| |
| // remove item from collection index |
| IndexScope indexScope = new IndexScopeImpl( |
| cpHeadEntity.getId(), |
| CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); |
| |
| batch.deindex( indexScope, memberEntity ); |
| |
| // remove collection from item index |
| IndexScope itemScope = new IndexScopeImpl( |
| memberEntity.getId(), |
| CpNamingUtils.getCollectionScopeNameFromCollectionName( |
| Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) ); |
| |
| |
| batch.deindex( itemScope, cpHeadEntity ); |
| |
| BetterFuture future = batch.execute(); |
| |
| // remove edge from collection to item |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| Edge collectionToItemEdge = new SimpleEdge( |
| cpHeadEntity.getId(), |
| CpNamingUtils.getEdgeTypeFromCollectionName( collName ), |
| memberEntity.getId(), UUIDUtils.getUUIDLong( memberEntity.getId().getUuid() ) ); |
| gm.deleteEdge( collectionToItemEdge ).toBlockingObservable().last(); |
| |
| // remove edge from item to collection |
| Edge itemToCollectionEdge = new SimpleEdge( |
| memberEntity.getId(), |
| CpNamingUtils.getEdgeTypeFromCollectionName( |
| Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ), |
| cpHeadEntity.getId(), |
| UUIDUtils.getUUIDLong( cpHeadEntity.getId().getUuid() ) ); |
| |
| gm.deleteEdge( itemToCollectionEdge ).toBlockingObservable().last(); |
| |
| // special handling for roles collection of a group |
| if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) { |
| |
| if ( collName.equals( COLLECTION_ROLES ) ) { |
| String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" ); |
| |
| if ( path.startsWith( "/roles/" ) ) { |
| |
| Entity itemEntity = em.get( new SimpleEntityRef( memberEntity.getId().getType(), |
| memberEntity.getId().getUuid() ) ); |
| |
| RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity ); |
| em.deleteRole( roleRef.getApplicationRoleName() ); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void copyRelationships(String srcRelationName, EntityRef dstEntityRef, |
| String dstRelationName) throws Exception { |
| |
| headEntity = em.validate( headEntity ); |
| dstEntityRef = em.validate( dstEntityRef ); |
| |
| CollectionInfo srcCollection = |
| getDefaultSchema().getCollection( headEntity.getType(), srcRelationName ); |
| |
| CollectionInfo dstCollection = |
| getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName ); |
| |
| Results results = null; |
| do { |
| if ( srcCollection != null ) { |
| results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false ); |
| } |
| else { |
| results = em.getConnectedEntities( headEntity, srcRelationName, null, Level.REFS ); |
| } |
| |
| if ( ( results != null ) && ( results.size() > 0 ) ) { |
| List<EntityRef> refs = results.getRefs(); |
| for ( EntityRef ref : refs ) { |
| if ( dstCollection != null ) { |
| em.addToCollection( dstEntityRef, dstRelationName, ref ); |
| } |
| else { |
| em.createConnection( dstEntityRef, dstRelationName, ref ); |
| } |
| } |
| } |
| } |
| while ( ( results != null ) && ( results.hasMoreResults() ) ); |
| } |
| |
| |
| @Override |
| public Results searchCollection( String collName, Query query ) throws Exception { |
| |
| if ( query == null ) { |
| query = new Query(); |
| query.setCollection( collName ); |
| } |
| |
| headEntity = em.validate( headEntity ); |
| |
| CollectionInfo collection = |
| getDefaultSchema().getCollection( headEntity.getType(), collName ); |
| |
| if ( collection == null ) { |
| throw new RuntimeException( "Cannot find collection-info for '" + collName |
| + "' of " + headEntity.getType() + ":" + headEntity .getUuid() ); |
| } |
| |
| final IndexScope indexScope = new IndexScopeImpl( |
| cpHeadEntity.getId(), |
| CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); |
| |
| final EntityIndex ei = managerCache.getEntityIndex( applicationScope ); |
| |
| final SearchTypes types = SearchTypes.fromTypes( collection.getType() ); |
| |
| logger.debug( "Searching scope {}:{}", |
| |
| indexScope.getOwner().toString(), indexScope.getName() ); |
| |
| query.setEntityType( collection.getType() ); |
| query = adjustQuery( query ); |
| |
| |
| final CollectionResultsLoaderFactoryImpl resultsLoaderFactory = new CollectionResultsLoaderFactoryImpl( managerCache ); |
| |
| |
| //execute the query and return our next result |
| final QueryExecutor executor = new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, applicationScope, indexScope, types, query ); |
| |
| return executor.next(); |
| } |
| |
| |
| @Override |
| public ConnectionRef createConnection( ConnectionRef connection ) throws Exception { |
| |
| return createConnection( connection.getConnectionType(), connection.getConnectedEntity() ); |
| } |
| |
| |
| @Override |
| public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception { |
| |
| headEntity = em.validate( headEntity ); |
| connectedEntityRef = em.validate( connectedEntityRef ); |
| |
| ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef ); |
| |
| CollectionScope targetScope = getCollectionScopeNameFromEntityType( |
| applicationScope.getApplication(), connectedEntityRef.getType()); |
| |
| if ( logger.isDebugEnabled() ) { |
| logger.debug("createConnection(): " |
| + "Indexing connection type '{}'\n from source {}:{}]\n" |
| + " to target {}:{}\n from scope\n app {}\n owner {}\n name {}", |
| new Object[] { |
| connectionType, |
| headEntity.getType(), |
| headEntity.getUuid(), |
| connectedEntityRef.getType(), |
| connectedEntityRef.getUuid(), |
| targetScope.getApplication(), |
| targetScope.getOwner(), |
| targetScope.getName() |
| }); |
| } |
| |
| Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType()); |
| org.apache.usergrid.persistence.model.entity.Entity targetEntity = |
| ((CpEntityManager)em).load( new CpEntityManager.EntityScope( targetScope, entityId)); |
| |
| String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ); |
| |
| // create graph edge connection from head entity to member entity |
| Edge edge = new SimpleEdge( |
| cpHeadEntity.getId(), edgeType, targetEntity.getId(), System.currentTimeMillis() ); |
| |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| gm.writeEdge( edge ).toBlockingObservable().last(); |
| |
| EntityIndex ei = managerCache.getEntityIndex( applicationScope ); |
| EntityIndexBatch batch = ei.createBatch(); |
| |
| // Index the new connection in app|source|type context |
| IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), |
| CpNamingUtils.getConnectionScopeName( connectionType ) ); |
| |
| batch.index( indexScope, targetEntity ); |
| |
| // Index the new connection in app|scope|all-types context |
| //TODO REMOVE INDEX CODE |
| // IndexScope allTypesIndexScope = new IndexScopeImpl( cpHeadEntity.getId(), CpNamingUtils.ALL_TYPES, entityType ); |
| // batch.index( allTypesIndexScope, targetEntity ); |
| |
| |
| BetterFuture future = batch.execute(); |
| |
| Keyspace ko = cass.getApplicationKeyspace( applicationId ); |
| Mutator<ByteBuffer> m = createMutator( ko, be ); |
| batchUpdateEntityConnection( m, false, connection, UUIDGenerator.newTimeUUID() ); |
| //Added Graphite Metrics |
| Timer.Context timeElasticIndexBatch = createConnectionTimer.time(); |
| batchExecute( m, CassandraService.RETRY_COUNT ); |
| timeElasticIndexBatch.stop(); |
| |
| |
| return connection; |
| } |
| |
| |
| @SuppressWarnings( "unchecked" ) |
| public Mutator<ByteBuffer> batchUpdateEntityConnection( |
| Mutator<ByteBuffer> batch, |
| boolean disconnect, |
| ConnectionRefImpl conn, |
| UUID timestampUuid ) throws Exception { |
| |
| long timestamp = getTimestampInMicros( timestampUuid ); |
| |
| Entity connectedEntity = em.get(new SimpleEntityRef( |
| conn.getConnectedEntityType(), conn.getConnectedEntityId() ) ); |
| |
| if ( connectedEntity == null ) { |
| return batch; |
| } |
| |
| // Create connection for requested params |
| |
| if ( disconnect ) { |
| |
| addDeleteToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES, |
| key(conn.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES, |
| conn.getConnectionType() ), |
| asList(conn.getConnectedEntityId(), conn.getConnectedEntityType() ), timestamp ); |
| |
| addDeleteToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES, |
| key(conn.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES, |
| conn.getConnectionType() ), |
| asList(conn.getConnectingEntityId(), conn.getConnectingEntityType() ), timestamp ); |
| |
| // delete the connection path if there will be no connections left |
| |
| // check out outbound edges of the given type. If we have more than the 1 specified, |
| // we shouldn't delete the connection types from our outbound index |
| if ( !moreThanOneOutboundConnection(conn.getConnectingEntity(), conn.getConnectionType() ) ) { |
| |
| addDeleteToMutator(batch, ENTITY_DICTIONARIES, |
| key(conn.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ), |
| conn.getConnectionType(), timestamp ); |
| } |
| |
| //check out inbound edges of the given type. If we have more than the 1 specified, |
| // we shouldn't delete the connection types from our outbound index |
| if ( !moreThanOneInboundConnection(conn.getConnectingEntity(), conn.getConnectionType() ) ) { |
| |
| addDeleteToMutator(batch, ENTITY_DICTIONARIES, |
| key(conn.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ), |
| conn.getConnectionType(), timestamp ); |
| } |
| } |
| else { |
| |
| addInsertToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES, |
| key(conn.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES, |
| conn.getConnectionType() ), |
| asList(conn.getConnectedEntityId(), conn.getConnectedEntityType() ), timestamp, |
| timestamp ); |
| |
| addInsertToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES, |
| key(conn.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES, |
| conn.getConnectionType() ), |
| asList(conn.getConnectingEntityId(), conn.getConnectingEntityType() ), timestamp, |
| timestamp ); |
| |
| // Add connection type to connections set |
| addInsertToMutator(batch, ENTITY_DICTIONARIES, |
| key(conn.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ), |
| conn.getConnectionType(), null, timestamp ); |
| |
| // Add connection type to connections set |
| addInsertToMutator(batch, ENTITY_DICTIONARIES, |
| key(conn.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ), |
| conn.getConnectionType(), null, timestamp ); |
| } |
| |
| // Add indexes for the connected entity's list properties |
| |
| // Get the names of the list properties in the connected entity |
| Set<String> dictionaryNames = em.getDictionaryNames( connectedEntity ); |
| |
| // For each list property, get the values in the list and |
| // update the index with those values |
| |
| Schema schema = getDefaultSchema(); |
| |
| for ( String dictionaryName : dictionaryNames ) { |
| |
| boolean has_dictionary = schema.hasDictionary( |
| connectedEntity.getType(), dictionaryName ); |
| |
| boolean dictionary_indexed = schema.isDictionaryIndexedInConnections( |
| connectedEntity.getType(), dictionaryName ); |
| |
| if ( dictionary_indexed || !has_dictionary ) { |
| Set<Object> elementValues = em.getDictionaryAsSet( connectedEntity, dictionaryName ); |
| for ( Object elementValue : elementValues ) { |
| IndexUpdate indexUpdate = batchStartIndexUpdate( |
| batch, connectedEntity, dictionaryName, elementValue, |
| timestampUuid, has_dictionary, true, disconnect, false ); |
| batchUpdateConnectionIndex(indexUpdate, conn ); |
| } |
| } |
| } |
| |
| return batch; |
| } |
| |
| |
| @Override |
| public ConnectionRef createConnection( |
| String pairedConnectionType, |
| EntityRef pairedEntity, |
| String connectionType, |
| EntityRef connectedEntityRef ) throws Exception { |
| |
| throw new UnsupportedOperationException( "Paired connections not supported" ); |
| } |
| |
| |
| @Override |
| public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception { |
| |
| throw new UnsupportedOperationException( "Paired connections not supported" ); |
| } |
| |
| |
| @Override |
| public ConnectionRef connectionRef( |
| String connectionType, EntityRef connectedEntityRef ) throws Exception { |
| |
| ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef ); |
| |
| return connection; |
| } |
| |
| |
| @Override |
| public ConnectionRef connectionRef( |
| String pairedConnectionType, |
| EntityRef pairedEntity, |
| String connectionType, |
| EntityRef connectedEntityRef ) throws Exception { |
| |
| throw new UnsupportedOperationException( "Paired connections not supported" ); |
| } |
| |
| |
| @Override |
| public ConnectionRef connectionRef( ConnectedEntityRef... connections ) { |
| |
| throw new UnsupportedOperationException( "Paired connections not supported" ); |
| } |
| |
| |
| @Override |
| public void deleteConnection( ConnectionRef connectionRef ) throws Exception { |
| |
| // First, clean up the dictionary records of the connection |
| Keyspace ko = cass.getApplicationKeyspace( applicationId ); |
| Mutator<ByteBuffer> m = createMutator( ko, be ); |
| batchUpdateEntityConnection( |
| m, true, ( ConnectionRefImpl ) connectionRef, UUIDGenerator.newTimeUUID() ); |
| |
| //Added Graphite Metrics |
| Timer.Context timeDeleteConnections = cassConnectionDelete.time(); |
| batchExecute( m, CassandraService.RETRY_COUNT ); |
| timeDeleteConnections.stop(); |
| |
| EntityRef connectingEntityRef = connectionRef.getConnectingEntity(); // source |
| EntityRef connectedEntityRef = connectionRef.getConnectedEntity(); // target |
| |
| String connectionType = connectionRef.getConnectedEntity().getConnectionType(); |
| |
| CollectionScope targetScope = getCollectionScopeNameFromEntityType( applicationScope.getApplication(), |
| connectedEntityRef.getType() ); |
| |
| if ( logger.isDebugEnabled() ) { |
| logger.debug( "Deleting connection '{}' from source {}:{} \n to target {}:{}", |
| new Object[] { |
| connectionType, |
| connectingEntityRef.getType(), |
| connectingEntityRef.getUuid(), |
| connectedEntityRef.getType(), |
| connectedEntityRef.getUuid() |
| }); |
| } |
| |
| Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ); |
| org.apache.usergrid.persistence.model.entity.Entity targetEntity = |
| ((CpEntityManager)em).load( new CpEntityManager.EntityScope( targetScope, entityId)); |
| |
| // Delete graph edge connection from head entity to member entity |
| Edge edge = new SimpleEdge( |
| new SimpleId( connectingEntityRef.getUuid(), |
| connectingEntityRef.getType() ), |
| connectionType, |
| targetEntity.getId(), |
| System.currentTimeMillis() ); |
| |
| GraphManager gm = managerCache.getGraphManager( applicationScope ); |
| gm.deleteEdge( edge ).toBlockingObservable().last(); |
| |
| final EntityIndex ei = managerCache.getEntityIndex( applicationScope ); |
| final EntityIndexBatch batch = ei.createBatch(); |
| |
| // Deindex the connection in app|source|type context |
| IndexScope indexScope = new IndexScopeImpl( |
| new SimpleId( connectingEntityRef.getUuid(), |
| connectingEntityRef.getType() ), |
| CpNamingUtils.getConnectionScopeName( connectionType ) ); |
| batch.deindex( indexScope, targetEntity ); |
| |
| // Deindex the connection in app|source|type context |
| //TODO REMOVE INDEX CODE |
| // IndexScope allTypesIndexScope = new IndexScopeImpl( |
| // new SimpleId( connectingEntityRef.getUuid(), |
| // connectingEntityRef.getType() ), |
| // CpNamingUtils.ALL_TYPES, entityType ); |
| // |
| // batch.deindex( allTypesIndexScope, targetEntity ); |
| |
| //Added Graphite Metrics |
| Timer.Context timeDeleteConnection = esDeleteConnectionTimer.time(); |
| batch.execute(); |
| timeDeleteConnection.stop(); |
| |
| } |
| |
| |
| @Override |
| public Set<String> getConnectionTypes( UUID connectedEntityId ) throws Exception { |
| throw new UnsupportedOperationException( "Cannot specify entity by UUID alone." ); |
| } |
| |
| |
| @Override |
| public Set<String> getConnectionTypes() throws Exception { |
| return getConnectionTypes( false ); |
| } |
| |
| |
| @Override |
| public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception { |
| Set<String> connections = cast( |
| em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) ); |
| |
| if ( connections == null ) { |
| return null; |
| } |
| if ( filterConnection && ( connections.size() > 0 ) ) { |
| connections.remove( "connection" ); |
| } |
| return connections; |
| } |
| |
| |
| @Override |
| public Results getConnectedEntities( |
| String connectionType, String connectedEntityType, Level level ) throws Exception { |
| |
| //until this is refactored properly, we will delegate to a search by query |
| Results raw = null; |
| |
| Preconditions.checkNotNull( connectionType, "connectionType cannot be null" ); |
| |
| Query query = new Query(); |
| query.setConnectionType( connectionType ); |
| query.setEntityType( connectedEntityType ); |
| query.setResultsLevel( level ); |
| |
| return searchConnectedEntities( query ); |
| |
| |
| } |
| |
| |
| @Override |
| public Results getConnectingEntities( |
| String connType, String fromEntityType, Level resultsLevel ) throws Exception { |
| |
| return getConnectingEntities( connType, fromEntityType, resultsLevel, -1 ); |
| } |
| |
| |
| @Override |
| public Results getConnectingEntities( |
| String connType, String fromEntityType, Level level, int count ) throws Exception { |
| |
| // looking for edges to the head entity |
| String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connType ); |
| |
| Map<EntityRef, Set<String>> containers = getContainers( count, edgeType, fromEntityType ); |
| |
| if ( Level.REFS.equals( level ) ) { |
| List<EntityRef> refList = new ArrayList<EntityRef>( containers.keySet() ); |
| return Results.fromRefList( refList ); |
| } |
| |
| if ( Level.IDS.equals( level ) ) { |
| // TODO: someday this should return a list of Core Persistence Ids |
| List<UUID> idList = new ArrayList<UUID>(); |
| for ( EntityRef ref : containers.keySet() ) { |
| idList.add( ref.getUuid() ); |
| } |
| return Results.fromIdList( idList ); |
| } |
| |
| List<Entity> entities = new ArrayList<Entity>(); |
| for ( EntityRef ref : containers.keySet() ) { |
| Entity entity = em.get( ref ); |
| logger.debug( " Found connecting entity: " + entity.getProperties() ); |
| entities.add( entity ); |
| } |
| return Results.fromEntities( entities ); |
| } |
| |
| |
| @Override |
| public Results searchConnectedEntities( Query query ) throws Exception { |
| |
| Preconditions.checkNotNull(query, "query cannot be null"); |
| |
| final String connection = query.getConnectionType(); |
| |
| Preconditions.checkNotNull( connection, "connection must be specified" ); |
| |
| // if ( query == null ) { |
| // query = new Query(); |
| // } |
| |
| headEntity = em.validate( headEntity ); |
| |
| final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), |
| CpNamingUtils.getConnectionScopeName( connection ) ); |
| |
| final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() ); |
| |
| EntityIndex ei = managerCache.getEntityIndex( applicationScope ); |
| |
| logger.debug( "Searching connections from the scope {}:{} with types {}", new Object[] { |
| indexScope.getOwner().toString(), indexScope.getName(), searchTypes |
| } ); |
| |
| query = adjustQuery( query ); |
| |
| final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory = new ConnectionResultsLoaderFactoryImpl( managerCache, |
| headEntity, connection ); |
| |
| final QueryExecutor executor = new ElasticSearchQueryExecutor(resultsLoaderFactory, ei, applicationScope, indexScope, searchTypes, query); |
| |
| return executor.next(); |
| // CandidateResults crs = ei.search( indexScope, searchTypes, query ); |
| |
| // return buildConnectionResults( indexScope, query, crs, connection ); |
| } |
| |
| |
| private Query adjustQuery( Query query ) { |
| |
| // handle the select by identifier case |
| if ( query.getRootOperand() == null ) { |
| |
| // a name alias or email alias was specified |
| if ( query.containsSingleNameOrEmailIdentifier() ) { |
| |
| Identifier ident = query.getSingleIdentifier(); |
| |
| // an email was specified. An edge case that only applies to users. |
| // This is fulgy to put here, but required. |
| if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) { |
| |
| Query newQuery = Query.fromQL( "select * where email='" |
| + query.getSingleNameOrEmailIdentifier() + "'" ); |
| query.setRootOperand( newQuery.getRootOperand() ); |
| } |
| |
| // use the ident with the default alias. could be an email |
| else { |
| |
| Query newQuery = Query.fromQL( "select * where name='" |
| + query.getSingleNameOrEmailIdentifier() + "'" ); |
| query.setRootOperand( newQuery.getRootOperand() ); |
| } |
| } |
| else if ( query.containsSingleUuidIdentifier() ) { |
| |
| Query newQuery = Query.fromQL( |
| "select * where uuid='" + query.getSingleUuidIdentifier() + "'" ); |
| query.setRootOperand( newQuery.getRootOperand() ); |
| } |
| } |
| |
| if ( query.isReversed() ) { |
| |
| Query.SortPredicate desc = |
| new Query.SortPredicate( PROPERTY_CREATED, Query.SortDirection.DESCENDING ); |
| |
| try { |
| query.addSort( desc ); |
| } |
| catch ( Exception e ) { |
| logger.warn( "Attempted to reverse sort order already set", PROPERTY_CREATED ); |
| } |
| } |
| |
| if ( query.getSortPredicates().isEmpty() ) { |
| |
| Query.SortPredicate asc = |
| new Query.SortPredicate( PROPERTY_CREATED, Query.SortDirection.ASCENDING); |
| |
| query.addSort( asc ); |
| } |
| |
| return query; |
| } |
| |
| |
| @Override |
| public Set<String> getConnectionIndexes( String connectionType ) throws Exception { |
| throw new UnsupportedOperationException( "Not supported yet." ); |
| } |
| |
| |
| private CpRelationManager getRelationManager( EntityRef headEntity ) { |
| CpRelationManager rmi = new CpRelationManager(); |
| rmi.init( em, emf, applicationId, headEntity, null, metricsFactory); |
| return rmi; |
| } |
| |
| |
| /** side effect: converts headEntity into an Entity if it is an EntityRef! */ |
| private Entity getHeadEntity() throws Exception { |
| Entity entity = null; |
| if ( headEntity instanceof Entity ) { |
| entity = ( Entity ) headEntity; |
| } |
| else { |
| entity = em.get( headEntity ); |
| headEntity = entity; |
| } |
| return entity; |
| } |
| |
| // |
| // private Results buildConnectionResults( final IndexScope indexScope, |
| // final Query query, final CandidateResults crs, final String connectionType ) { |
| // |
| // if ( query.getLevel().equals( Level.ALL_PROPERTIES ) ) { |
| // return buildResults( indexScope, query, crs, connectionType ); |
| // } |
| // |
| // final EntityRef sourceRef = new SimpleEntityRef( headEntity.getType(), headEntity.getUuid() ); |
| // |
| // List<ConnectionRef> refs = new ArrayList<ConnectionRef>( crs.size() ); |
| // |
| // for ( CandidateResult cr : crs ) { |
| // |
| // SimpleEntityRef targetRef = |
| // new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid() ); |
| // |
| // final ConnectionRef ref = |
| // new ConnectionRefImpl( sourceRef, connectionType, targetRef ); |
| // |
| // refs.add( ref ); |
| // } |
| // |
| // return Results.fromConnections( refs ); |
| // } |
| |
| |
| |
| |
| @Override |
| public void batchUpdateSetIndexes( Mutator<ByteBuffer> batch, String setName, Object elementValue, |
| boolean removeFromSet, UUID timestampUuid ) throws Exception { |
| |
| Entity entity = getHeadEntity(); |
| |
| elementValue = getDefaultSchema() |
| .validateEntitySetValue( entity.getType(), setName, elementValue ); |
| |
| IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, setName, elementValue, |
| timestampUuid, true, true, removeFromSet, false ); |
| |
| // Update collections |
| |
| Map<String, Set<CollectionInfo>> containers = |
| getDefaultSchema().getContainersIndexingDictionary( entity.getType(), setName ); |
| |
| if ( containers != null ) { |
| Map<EntityRef, Set<String>> containerEntities = getContainers(); |
| for ( EntityRef containerEntity : containerEntities.keySet() ) { |
| if ( containerEntity.getType().equals( TYPE_APPLICATION ) |
| && Schema.isAssociatedEntityType( entity.getType() ) ) { |
| logger.debug( "Extended properties for {} not indexed by application", entity.getType() ); |
| continue; |
| } |
| Set<String> collectionNames = containerEntities.get( containerEntity ); |
| Set<CollectionInfo> collections = containers.get( containerEntity.getType() ); |
| |
| if ( collections != null ) { |
| |
| for ( CollectionInfo collection : collections ) { |
| if ( collectionNames.contains( collection.getName() ) ) { |
| batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() ); |
| } |
| } |
| } |
| } |
| } |
| |
| batchUpdateBackwardConnectionsDictionaryIndexes( indexUpdate ); |
| } |
| |
| |
| /** |
| * Batch update collection index. |
| * |
| * @param indexUpdate The update to apply |
| * @param owner The entity that is the owner context of this entity update. Can either be an |
| * application, or another * entity |
| * @param collectionName the collection name |
| * |
| * @return The indexUpdate with batch mutations |
| * @throws Exception the exception |
| */ |
| public IndexUpdate batchUpdateCollectionIndex( |
| IndexUpdate indexUpdate, EntityRef owner, String collectionName ) |
| throws Exception { |
| |
| logger.debug( "batchUpdateCollectionIndex" ); |
| |
| Entity indexedEntity = indexUpdate.getEntity(); |
| |
| String bucketId = indexBucketLocator |
| .getBucket( applicationId, IndexBucketLocator.IndexType.COLLECTION, indexedEntity.getUuid(), |
| indexedEntity.getType(), indexUpdate.getEntryName() ); |
| |
| // the root name without the bucket |
| // entity_id,collection_name,prop_name, |
| Object index_name = null; |
| // entity_id,collection_name,prop_name, bucketId |
| Object index_key = null; |
| |
| // entity_id,collection_name,collected_entity_id,prop_name |
| |
| for ( IndexUpdate.IndexEntry entry : indexUpdate.getPrevEntries() ) { |
| |
| if ( entry.getValue() != null ) { |
| |
| index_name = key( owner.getUuid(), collectionName, entry.getPath() ); |
| |
| index_key = key( index_name, bucketId ); |
| |
| addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, |
| entry.getIndexComposite(), indexUpdate.getTimestamp() ); |
| |
| if ( "location.coordinates".equals( entry.getPath() ) ) { |
| EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), |
| entry.getTimestampUuid(), entry.getValue().toString() ); |
| batchRemoveLocationFromCollectionIndex( indexUpdate.getBatch(), |
| indexBucketLocator, applicationId, index_name, loc ); |
| } |
| } |
| else { |
| logger.error( "Unexpected condition - deserialized property value is null" ); |
| } |
| } |
| |
| if ( ( indexUpdate.getNewEntries().size() > 0 ) |
| && ( !indexUpdate.isMultiValue() |
| || ( indexUpdate.isMultiValue() && !indexUpdate.isRemoveListEntry() ) ) ) { |
| |
| for ( IndexUpdate.IndexEntry indexEntry : indexUpdate.getNewEntries() ) { |
| |
| // byte valueCode = indexEntry.getValueCode(); |
| |
| index_name = key( owner.getUuid(), collectionName, indexEntry.getPath() ); |
| |
| index_key = key( index_name, bucketId ); |
| |
| // int i = 0; |
| |
| addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, |
| indexEntry.getIndexComposite(), null, indexUpdate.getTimestamp() ); |
| |
| if ( "location.coordinates".equals( indexEntry.getPath() ) ) { |
| EntityLocationRef loc = new EntityLocationRef( |
| indexUpdate.getEntity(), |
| indexEntry.getTimestampUuid(), |
| indexEntry.getValue().toString() ); |
| batchStoreLocationInCollectionIndex( |
| indexUpdate.getBatch(), |
| indexBucketLocator, |
| applicationId, |
| index_name, |
| indexedEntity.getUuid(), |
| loc ); |
| } |
| |
| // i++; |
| } |
| } |
| |
| for ( String index : indexUpdate.getIndexesSet() ) { |
| addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES, |
| key( owner.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ), index, null, |
| indexUpdate.getTimestamp() ); |
| } |
| |
| return indexUpdate; |
| } |
| |
| |
| public IndexUpdate batchStartIndexUpdate( |
| Mutator<ByteBuffer> batch, Entity entity, String entryName, |
| Object entryValue, UUID timestampUuid, boolean schemaHasProperty, |
| boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed ) |
| throws Exception { |
| return batchStartIndexUpdate( batch, entity, entryName, entryValue, timestampUuid, |
| schemaHasProperty, isMultiValue, removeListEntry, fulltextIndexed, false ); |
| } |
| |
| |
| public IndexUpdate batchStartIndexUpdate( |
| Mutator<ByteBuffer> batch, Entity entity, String entryName, |
| Object entryValue, UUID timestampUuid, boolean schemaHasProperty, |
| boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed, |
| boolean skipRead ) throws Exception { |
| |
| long timestamp = getTimestampInMicros( timestampUuid ); |
| |
| IndexUpdate indexUpdate = new IndexUpdate( batch, entity, entryName, entryValue, |
| schemaHasProperty, isMultiValue, removeListEntry, timestampUuid ); |
| |
| // entryName = entryName.toLowerCase(); |
| |
| // entity_id,connection_type,connected_entity_id,prop_name |
| |
| if ( !skipRead ) { |
| |
| List<HColumn<ByteBuffer, ByteBuffer>> entries = null; |
| |
| if ( isMultiValue && validIndexableValue( entryValue ) ) { |
| entries = cass.getColumns( |
| cass.getApplicationKeyspace( applicationId ), |
| ENTITY_INDEX_ENTRIES, |
| entity.getUuid(), |
| new DynamicComposite( |
| entryName, |
| indexValueCode( entryValue ), |
| toIndexableValue( entryValue ) ), |
| setGreaterThanEqualityFlag( |
| new DynamicComposite( |
| entryName, indexValueCode( entryValue ), |
| toIndexableValue( entryValue ) ) ), |
| INDEX_ENTRY_LIST_COUNT, |
| false ); |
| } |
| else { |
| entries = cass.getColumns( |
| cass.getApplicationKeyspace( applicationId ), |
| ENTITY_INDEX_ENTRIES, |
| entity.getUuid(), |
| new DynamicComposite( entryName ), |
| setGreaterThanEqualityFlag( new DynamicComposite( entryName ) ), |
| INDEX_ENTRY_LIST_COUNT, |
| false ); |
| } |
| |
| if ( logger.isDebugEnabled() ) { |
| logger.debug( "Found {} previous index entries for {} of entity {}", new Object[] { |
| entries.size(), entryName, entity.getUuid() |
| } ); |
| } |
| |
| // Delete all matching entries from entry list |
| for ( HColumn<ByteBuffer, ByteBuffer> entry : entries ) { |
| UUID prev_timestamp = null; |
| Object prev_value = null; |
| String prev_obj_path = null; |
| |
| // new format: |
| // composite(entryName, |
| // value_code,prev_value,prev_timestamp,prev_obj_path) = null |
| DynamicComposite composite = |
| DynamicComposite.fromByteBuffer( entry.getName().duplicate() ); |
| prev_value = composite.get( 2 ); |
| prev_timestamp = ( UUID ) composite.get( 3 ); |
| if ( composite.size() > 4 ) { |
| prev_obj_path = ( String ) composite.get( 4 ); |
| } |
| |
| if ( prev_value != null ) { |
| |
| String entryPath = entryName; |
| if ( ( prev_obj_path != null ) && ( prev_obj_path.length() > 0 ) ) { |
| entryPath = entryName + "." + prev_obj_path; |
| } |
| |
| indexUpdate.addPrevEntry( |
| entryPath, prev_value, prev_timestamp, entry.getName().duplicate() ); |
| |
| // composite(property_value,connected_entity_id,entry_timestamp) |
| // addDeleteToMutator(batch, ENTITY_INDEX_ENTRIES, |
| // entity.getUuid(), entry.getName(), timestamp); |
| |
| } |
| else { |
| logger.error( "Unexpected condition - deserialized property value is null" ); |
| } |
| } |
| } |
| |
| if ( !isMultiValue || ( isMultiValue && !removeListEntry ) ) { |
| |
| List<Map.Entry<String, Object>> list = |
| IndexUtils.getKeyValueList( entryName, entryValue, fulltextIndexed ); |
| |
| if ( entryName.equalsIgnoreCase( "location" ) && ( entryValue instanceof Map ) ) { |
| @SuppressWarnings( "rawtypes" ) double latitude = |
| MapUtils.getDoubleValue( ( Map ) entryValue, "latitude" ); |
| @SuppressWarnings( "rawtypes" ) double longitude = |
| MapUtils.getDoubleValue( ( Map ) entryValue, "longitude" ); |
| list.add( new AbstractMap.SimpleEntry<String, Object>( "location.coordinates", |
| latitude + "," + longitude ) ); |
| } |
| |
| for ( Map.Entry<String, Object> indexEntry : list ) { |
| |
| if ( validIndexableValue( indexEntry.getValue() ) ) { |
| indexUpdate.addNewEntry( |
| indexEntry.getKey(), toIndexableValue( indexEntry.getValue() ) ); |
| } |
| } |
| |
| if ( isMultiValue ) { |
| addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(), |
| asList( entryName, |
| indexValueCode( entryValue ), |
| toIndexableValue( entryValue ), |
| indexUpdate.getTimestampUuid() ), |
| null, timestamp ); |
| } |
| else { |
| // int i = 0; |
| |
| for ( Map.Entry<String, Object> indexEntry : list ) { |
| |
| String name = indexEntry.getKey(); |
| if ( name.startsWith( entryName + "." ) ) { |
| name = name.substring( entryName.length() + 1 ); |
| } |
| else if ( name.startsWith( entryName ) ) { |
| name = name.substring( entryName.length() ); |
| } |
| |
| byte code = indexValueCode( indexEntry.getValue() ); |
| Object val = toIndexableValue( indexEntry.getValue() ); |
| addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(), |
| asList( entryName, code, val, indexUpdate.getTimestampUuid(), name ), |
| null, timestamp ); |
| |
| indexUpdate.addIndex( indexEntry.getKey() ); |
| } |
| } |
| |
| indexUpdate.addIndex( entryName ); |
| } |
| |
| return indexUpdate; |
| } |
| |
| |
| /** |
| * Batch update backward connections set indexes. |
| * |
| * @param indexUpdate The index to update in the dictionary |
| * |
| * @return The index update |
| * |
| * @throws Exception the exception |
| */ |
| public IndexUpdate batchUpdateBackwardConnectionsDictionaryIndexes( |
| IndexUpdate indexUpdate ) throws Exception { |
| |
| logger.debug( "batchUpdateBackwardConnectionsListIndexes" ); |
| |
| boolean entityHasDictionary = getDefaultSchema() |
| .isDictionaryIndexedInConnections( |
| indexUpdate.getEntity().getType(), indexUpdate.getEntryName() ); |
| |
| if ( !entityHasDictionary ) { |
| return indexUpdate; |
| } |
| |
| |
| return doBackwardConnectionsUpdate( indexUpdate ); |
| } |
| |
| |
| /** |
| * Search each reverse connection type in the graph for connections. |
| * If one is found, update the index appropriately |
| * |
| * @param indexUpdate The index update to use |
| * |
| * @return The updated index update |
| */ |
| private IndexUpdate doBackwardConnectionsUpdate( IndexUpdate indexUpdate ) throws Exception { |
| final Entity targetEntity = indexUpdate.getEntity(); |
| |
| logger.debug( "doBackwardConnectionsUpdate" ); |
| |
| final ConnectionTypesIterator connectionTypes = |
| new ConnectionTypesIterator( cass, applicationId, targetEntity.getUuid(), false, 100 ); |
| |
| for ( String connectionType : connectionTypes ) { |
| |
| PagingResultsIterator itr = |
| getReversedConnectionsIterator( targetEntity, connectionType ); |
| |
| for ( Object connection : itr ) { |
| |
| final ConnectedEntityRef sourceEntity = ( ConnectedEntityRef ) connection; |
| |
| //we need to create a connection ref from the source entity (found via reverse edge) |
| // to the entity we're about to update. This is the index that needs updated |
| final ConnectionRefImpl connectionRef = |
| new ConnectionRefImpl( sourceEntity, connectionType, indexUpdate.getEntity() ); |
| |
| batchUpdateConnectionIndex( indexUpdate, connectionRef ); |
| } |
| } |
| |
| return indexUpdate; |
| } |
| |
| |
| /** |
| * Batch update connection index. |
| * |
| * @param indexUpdate The update operation to perform |
| * @param connection The connection to update |
| * |
| * @return The index with the batch mutation udpated |
| * |
| * @throws Exception the exception |
| */ |
| public IndexUpdate batchUpdateConnectionIndex( |
| IndexUpdate indexUpdate, ConnectionRefImpl connection ) throws Exception { |
| |
| logger.debug( "batchUpdateConnectionIndex" ); |
| |
| // UUID connection_id = connection.getUuid(); |
| |
| UUID[] index_keys = connection.getIndexIds(); |
| |
| // Delete all matching entries from entry list |
| for ( IndexUpdate.IndexEntry entry : indexUpdate.getPrevEntries() ) { |
| |
| if ( entry.getValue() != null ) { |
| |
| batchDeleteConnectionIndexEntries( indexUpdate, entry, connection, index_keys ); |
| |
| if ( "location.coordinates".equals( entry.getPath() ) ) { |
| EntityLocationRef loc = |
| new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(), |
| entry.getValue().toString() ); |
| batchDeleteLocationInConnectionsIndex( |
| indexUpdate.getBatch(), indexBucketLocator, applicationId, |
| index_keys, entry.getPath(), loc ); |
| } |
| } |
| else { |
| logger.error( "Unexpected condition - deserialized property value is null" ); |
| } |
| } |
| |
| if ( ( indexUpdate.getNewEntries().size() > 0 ) |
| && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue() |
| && !indexUpdate.isRemoveListEntry() ) ) ) { |
| |
| for ( IndexUpdate.IndexEntry indexEntry : indexUpdate.getNewEntries() ) { |
| |
| batchAddConnectionIndexEntries( indexUpdate, indexEntry, connection, index_keys ); |
| |
| if ( "location.coordinates".equals( indexEntry.getPath() ) ) { |
| EntityLocationRef loc = |
| new EntityLocationRef( |
| indexUpdate.getEntity(), |
| indexEntry.getTimestampUuid(), |
| indexEntry.getValue().toString() ); |
| batchStoreLocationInConnectionsIndex( |
| indexUpdate.getBatch(), indexBucketLocator, applicationId, |
| index_keys, indexEntry.getPath(), loc ); |
| } |
| } |
| |
| /* |
| * addInsertToMutator(batch, EntityCF.SETS, key(connection_id, |
| * Schema.INDEXES_SET), indexEntry.getKey(), null, false, timestamp); } |
| * |
| * addInsertToMutator(batch, EntityCF.SETS, key(connection_id, |
| * Schema.INDEXES_SET), entryName, null, false, timestamp); |
| */ |
| } |
| |
| for ( String index : indexUpdate.getIndexesSet() ) { |
| addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES, |
| key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES), index, null, |
| indexUpdate.getTimestamp() ); |
| } |
| |
| return indexUpdate; |
| } |
| |
| |
| /** |
| * Get a paging results iterator. Should return an iterator for all results |
| * |
| * @param targetEntity The target entity search connections from |
| * |
| * @return connectionType The name of the edges to search |
| */ |
| private PagingResultsIterator getReversedConnectionsIterator( |
| EntityRef targetEntity, String connectionType ) throws Exception { |
| |
| return new PagingResultsIterator( |
| getConnectingEntities( targetEntity, connectionType, null, Level.REFS ) ); |
| } |
| |
| |
| /** |
| * Get all edges that are to the targetEntity |
| * |
| * @param targetEntity The target entity to search edges in |
| * @param connectionType The type of connection. If not specified, all connections are returned |
| * @param connectedEntityType The connected entity type, if not specified all types are returned |
| * @param resultsLevel The results level to return |
| */ |
| private Results getConnectingEntities( |
| EntityRef targetEntity, String connectionType, String connectedEntityType, |
| Level resultsLevel ) throws Exception { |
| |
| return getConnectingEntities( |
| targetEntity, connectionType, connectedEntityType, resultsLevel, 0); |
| } |
| |
| |
| /** |
| * Get all edges that are to the targetEntity |
| * |
| * @param targetEntity The target entity to search edges in |
| * @param connectionType The type of connection. If not specified, all connections are returned |
| * @param connectedEntityType The connected entity type, if not specified all types are returned |
| * @param count result limit |
| */ |
| private Results getConnectingEntities( EntityRef targetEntity, String connectionType, |
| String connectedEntityType, Level level, int count) throws Exception { |
| |
| Query query = new Query(); |
| query.setResultsLevel( level ); |
| query.setLimit( count ); |
| |
| final ConnectionRefImpl connectionRef = new ConnectionRefImpl( |
| new SimpleEntityRef( connectedEntityType, null ), connectionType, targetEntity ); |
| final ConnectionResultsLoaderFactory factory = |
| new ConnectionResultsLoaderFactory( connectionRef ); |
| |
| QueryProcessorImpl qp = new QueryProcessorImpl( query, null, em, factory ); |
| SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, false ); |
| |
| return qp.getResults( visitor ); |
| } |
| |
| |
| public Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( |
| IndexUpdate indexUpdate, |
| IndexUpdate.IndexEntry entry, |
| ConnectionRefImpl connection, |
| UUID[] index_keys ) throws Exception { |
| |
| logger.debug( "batchDeleteConnectionIndexEntries" ); |
| |
| // entity_id,prop_name |
| Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(), |
| indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION, |
| index_keys[ConnectionRefImpl.ALL], entry.getPath() ) ); |
| |
| // entity_id,entity_type,prop_name |
| Object entity_type_prop_index_key = |
| key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), |
| indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION, |
| index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) ); |
| |
| // entity_id,connection_type,prop_name |
| Object connection_type_prop_index_key = |
| key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(), |
| indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION, |
| index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) ); |
| |
| // entity_id,connection_type,entity_type,prop_name |
| Object connection_type_and_entity_type_prop_index_key = |
| key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), |
| indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION, |
| index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) ); |
| |
| // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp) |
| addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key, |
| entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(), |
| connection.getConnectedEntityType() ), indexUpdate.getTimestamp() ); |
| |
| // composite(property_value,connected_entity_id,connection_type,entry_timestamp) |
| addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key, |
| entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ), |
| indexUpdate.getTimestamp() ); |
| |
| // composite(property_value,connected_entity_id,entity_type,entry_timestamp) |
| addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key, |
| entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), |
| indexUpdate.getTimestamp() ); |
| |
| // composite(property_value,connected_entity_id,entry_timestamp) |
| addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key, |
| entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() ); |
| |
| return indexUpdate.getBatch(); |
| } |
| |
| |
| public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexUpdate.IndexEntry entry, |
| ConnectionRefImpl conn, UUID[] index_keys ) { |
| |
| logger.debug( "batchAddConnectionIndexEntries" ); |
| |
| // entity_id,prop_name |
| Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], |
| INDEX_CONNECTIONS, entry.getPath(), |
| indexBucketLocator.getBucket( applicationId, |
| IndexBucketLocator.IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL], |
| entry.getPath() ) ); |
| |
| // entity_id,entity_type,prop_name |
| Object entity_type_prop_index_key = |
| key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), |
| indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION, |
| index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) ); |
| |
| // entity_id,connection_type,prop_name |
| Object connection_type_prop_index_key = |
| key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(), |
| indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION, |
| index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) ); |
| |
| // entity_id,connection_type,entity_type,prop_name |
| Object connection_type_and_entity_type_prop_index_key = |
| key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], |
| INDEX_CONNECTIONS, entry.getPath(), |
| indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION, |
| index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) ); |
| |
| // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp) |
| addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key, |
| entry.getIndexComposite( conn.getConnectedEntityId(), conn.getConnectionType(), |
| conn.getConnectedEntityType() ), conn.getUuid(), indexUpdate.getTimestamp() ); |
| |
| // composite(property_value,connected_entity_id,connection_type,entry_timestamp) |
| addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key, |
| entry.getIndexComposite( conn.getConnectedEntityId(), conn.getConnectionType() ), |
| conn.getUuid(), indexUpdate.getTimestamp() ); |
| |
| // composite(property_value,connected_entity_id,entity_type,entry_timestamp) |
| addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key, |
| entry.getIndexComposite( conn.getConnectedEntityId(), conn.getConnectedEntityType() ), |
| conn.getUuid(), indexUpdate.getTimestamp() ); |
| |
| // composite(property_value,connected_entity_id,entry_timestamp) |
| addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, |
| connection_type_and_entity_type_prop_index_key, |
| entry.getIndexComposite( conn.getConnectedEntityId() ), conn.getUuid(), |
| indexUpdate.getTimestamp() ); |
| |
| return indexUpdate.getBatch(); |
| } |
| |
| |
| /** |
| * Simple search visitor that performs all the joining |
| * |
| * @author tnine |
| */ |
| private class SearchConnectionVisitor extends SearchVisitor { |
| |
| private final ConnectionRefImpl connection; |
| |
| /** True if we should search from source->target edges. |
| * False if we should search from target<-source edges */ |
| private final boolean outgoing; |
| |
| |
| /** |
| * @param queryProcessor They query processor to use |
| * @param connection The connection refernce |
| * @param outgoing The direction to search. True if we should search from source->target |
| * edges. False if we * should search from target<-source edges |
| */ |
| public SearchConnectionVisitor( QueryProcessorImpl queryProcessor, ConnectionRefImpl connection, |
| boolean outgoing ) { |
| super( queryProcessor ); |
| this.connection = connection; |
| this.outgoing = outgoing; |
| } |
| |
| |
| /* (non-Javadoc) |
| * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence |
| * .query.ir |
| * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice) |
| */ |
| @Override |
| protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception { |
| |
| UUID id = ConnectionRefImpl.getIndexId( |
| ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE, |
| headEntity, |
| connection.getConnectionType(), |
| connection.getConnectedEntityType(), |
| new ConnectedEntityRef[0] ); |
| |
| Object key = key( id, INDEX_CONNECTIONS ); |
| |
| // update the cursor and order before we perform the slice |
| // operation |
| queryProcessor.applyCursorAndSort( slice ); |
| |
| IndexScanner columns = null; |
| |
| if ( slice.isComplete() ) { |
| columns = new NoOpIndexScanner(); |
| } |
| else { |
| columns = searchIndex( key, slice, queryProcessor.getPageSizeHint( node ) ); |
| } |
| |
| return columns; |
| } |
| |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid. |
| * persistence.query.ir.WithinNode) |
| */ |
| @Override |
| public void visit( WithinNode node ) throws Exception { |
| |
| QuerySlice slice = node.getSlice(); |
| |
| queryProcessor.applyCursorAndSort( slice ); |
| |
| GeoIterator itr = new GeoIterator( |
| new ConnectionGeoSearch( em, indexBucketLocator, cass, connection.getIndexId() ), |
| query.getLimit(), |
| slice, |
| node.getPropertyName(), |
| new Point( node.getLattitude(), node.getLongitude() ), |
| node.getDistance() ); |
| |
| results.push( itr ); |
| } |
| |
| |
| @Override |
| public void visit( AllNode node ) throws Exception { |
| QuerySlice slice = node.getSlice(); |
| |
| queryProcessor.applyCursorAndSort( slice ); |
| |
| int size = queryProcessor.getPageSizeHint( node ); |
| |
| ByteBuffer start = null; |
| |
| if ( slice.hasCursor() ) { |
| start = slice.getCursor(); |
| } |
| |
| |
| boolean skipFirst = !node.isForceKeepFirst() && slice.hasCursor(); |
| |
| UUID entityIdToUse; |
| |
| //change our type depending on which direction we're loading |
| String dictionaryType; |
| |
| //the target type |
| String targetType; |
| |
| //this is on the "source" side of the edge |
| if ( outgoing ) { |
| entityIdToUse = connection.getConnectingEntityId(); |
| dictionaryType = DICTIONARY_CONNECTED_ENTITIES; |
| targetType = connection.getConnectedEntityType(); |
| } |
| |
| //we're on the target side of the edge |
| else { |
| entityIdToUse = connection.getConnectedEntityId(); |
| dictionaryType = DICTIONARY_CONNECTING_ENTITIES; |
| targetType = connection.getConnectingEntityType(); |
| } |
| |
| final String connectionType = connection.getConnectionType(); |
| |
| final ConnectionIndexSliceParser connectionParser = new ConnectionIndexSliceParser( targetType ); |
| |
| final Iterator<String> connectionTypes; |
| |
| //use the provided connection type |
| if ( connectionType != null ) { |
| connectionTypes = Collections.singleton( connectionType ).iterator(); |
| } |
| |
| //we need to iterate all connection types |
| else { |
| connectionTypes = new ConnectionTypesIterator( |
| cass, applicationId, entityIdToUse, outgoing, size ); |
| } |
| |
| IndexScanner connectionScanner = new ConnectedIndexScanner( |
| cass, |
| dictionaryType, |
| applicationId, |
| entityIdToUse, |
| connectionTypes, |
| start, |
| slice.isReversed(), |
| size, |
| skipFirst ); |
| |
| this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) ); |
| } |
| |
| |
| @Override |
| public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception { |
| |
| //TODO T.N. USERGRID-1919 actually validate this is connected |
| EntityRef ref = em.getAlias( connection.getConnectedEntityType(), nameIdentifierNode.getName() ); |
| |
| if ( ref == null ) { |
| this.results.push( new EmptyIterator() ); |
| return; |
| } |
| |
| this.results.push( new StaticIdIterator( ref.getUuid() ) ); |
| } |
| } |
| |
| |
| private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize ) throws Exception { |
| |
| DynamicComposite[] range = slice.getRange(); |
| |
| Object keyPrefix = key( indexKey, slice.getPropertyName() ); |
| |
| IndexScanner scanner = new IndexBucketScanner( |
| cass, |
| indexBucketLocator, |
| ENTITY_INDEX, |
| applicationId, |
| IndexBucketLocator.IndexType.CONNECTION, |
| keyPrefix, |
| range[0], |
| range[1], |
| slice.isReversed(), |
| pageSize, |
| slice.hasCursor(), |
| slice.getPropertyName() ); |
| |
| return scanner; |
| } |
| } |