blob: be841761fb85eb4879a2a05caaad35190ebb9194 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.usergrid.persistence.cassandra;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.apache.usergrid.persistence.CollectionRef;
import org.apache.usergrid.persistence.ConnectedEntityRef;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.IndexBucketLocator;
import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
import org.apache.usergrid.persistence.PagingResultsIterator;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.RelationManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.RoleRef;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.SimpleCollectionRef;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.cassandra.IndexUpdate.IndexEntry;
import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
import org.apache.usergrid.persistence.entities.Group;
import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
import org.apache.usergrid.persistence.geo.EntityLocationRef;
import org.apache.usergrid.persistence.geo.model.Point;
import org.apache.usergrid.persistence.hector.CountingMutator;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.IndexUtils;
import org.apache.usergrid.utils.MapUtils;
import com.yammer.metrics.annotation.Metered;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.mutation.Mutator;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import static java.util.Arrays.asList;
import static me.prettyprint.hector.api.factory.HFactory.createMutator;
import org.apache.usergrid.persistence.EntityManager;
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
import static org.apache.usergrid.persistence.Schema.PROPERTY_COLLECTION_NAME;
import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
import static org.apache.usergrid.persistence.Schema.PROPERTY_ITEM;
import static org.apache.usergrid.persistence.Schema.PROPERTY_ITEM_TYPE;
import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
import static org.apache.usergrid.persistence.Schema.TYPE_MEMBER;
import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
import static org.apache.usergrid.persistence.Schema.defaultCollectionName;
import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
import static org.apache.usergrid.persistence.cassandra.ConnectionRefImpl.CONNECTION_ENTITY_CONNECTION_TYPE;
import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
import static org.apache.usergrid.utils.ClassUtils.cast;
import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
import static org.apache.usergrid.utils.ConversionUtils.string;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
import static org.apache.usergrid.utils.MapUtils.addMapSet;
import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
import static org.apache.usergrid.utils.UUIDUtils.newTimeUUID;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.index.query.Query.Level;
public class RelationManagerImpl implements RelationManager {
private static final Logger logger = LoggerFactory.getLogger( RelationManagerImpl.class );
private EntityManager em;
private CassandraService cass;
private UUID applicationId;
private EntityRef headEntity;
private IndexBucketLocator indexBucketLocator;
public RelationManagerImpl() {
public RelationManagerImpl init( EntityManager em, CassandraService cass, UUID applicationId,
EntityRef headEntity, IndexBucketLocator indexBucketLocator ) {
Assert.notNull( em, "Entity manager cannot be null" );
Assert.notNull( cass, "Cassandra service cannot be null" );
Assert.notNull( applicationId, "Application Id cannot be null" );
Assert.notNull( headEntity, "Head entity cannot be null" );
Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
Assert.notNull( indexBucketLocator, "Index bucket locator cannot be null" );
this.em = em;
this.applicationId = applicationId;
this.cass = cass;
this.headEntity = headEntity;
this.indexBucketLocator = indexBucketLocator;
return this;
private RelationManagerImpl getRelationManager( EntityRef headEntity ) {
RelationManagerImpl rmi = new RelationManagerImpl();
rmi.init( em, cass, applicationId, headEntity, indexBucketLocator );
return rmi;
/** side effect: converts headEntity into an Entity if it is an EntityRef! */
private Entity getHeadEntity() throws Exception {
Entity entity = null;
if ( headEntity instanceof Entity ) {
entity = ( Entity ) headEntity;
else {
entity = em.get( headEntity );
headEntity = entity;
return entity;
* Batch update collection index.
* @param indexUpdate The update to apply
* @param owner The entity that is the owner context of this entity update. Can either be an application, or
* another entity
* @param collectionName the collection name
* @return The indexUpdate with batch mutations
* @throws Exception the exception
@Metered(group = "core", name = "RelationManager_batchUpdateCollectionIndex")
public IndexUpdate batchUpdateCollectionIndex( IndexUpdate indexUpdate, EntityRef owner, String collectionName )
throws Exception {
logger.debug( "batchUpdateCollectionIndex" );
Entity indexedEntity = indexUpdate.getEntity();
String bucketId = indexBucketLocator
.getBucket( applicationId, IndexType.COLLECTION, indexedEntity.getUuid(), indexedEntity.getType(),
indexUpdate.getEntryName() );
// the root name without the bucket
// entity_id,collection_name,prop_name,
Object index_name = null;
// entity_id,collection_name,prop_name, bucketId
Object index_key = null;
// entity_id,collection_name,collected_entity_id,prop_name
for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
if ( entry.getValue() != null ) {
index_name = key( owner.getUuid(), collectionName, entry.getPath() );
index_key = key( index_name, bucketId );
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, entry.getIndexComposite(),
indexUpdate.getTimestamp() );
if ( "location.coordinates".equals( entry.getPath() ) ) {
EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
entry.getValue().toString() );
batchRemoveLocationFromCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
index_name, loc );
else {
logger.error( "Unexpected condition - deserialized property value is null" );
if ( ( indexUpdate.getNewEntries().size() > 0 ) && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
&& !indexUpdate.isRemoveListEntry() ) ) ) {
for ( IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
// byte valueCode = indexEntry.getValueCode();
index_name = key( owner.getUuid(), collectionName, indexEntry.getPath() );
index_key = key( index_name, bucketId );
// int i = 0;
addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, indexEntry.getIndexComposite(),
null, indexUpdate.getTimestamp() );
if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
EntityLocationRef loc =
new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
indexEntry.getValue().toString() );
batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
index_name, indexedEntity.getUuid(), loc );
// i++;
for ( String index : indexUpdate.getIndexesSet() ) {
addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
key( owner.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ), index, null,
indexUpdate.getTimestamp() );
return indexUpdate;
@Metered(group = "core", name = "RelationManager_getCollectionIndexes")
public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
// TODO TN, read all buckets here
List<HColumn<String, String>> results =
cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
key( headEntity.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ),, );
Set<String> indexes = new TreeSet<String>();
if ( results != null ) {
for ( HColumn<String, String> column : results ) {
String propertyName = column.getName();
if ( !propertyName.endsWith( ".keywords" ) ) {
indexes.add( column.getName() );
return indexes;
public Map<EntityRef, Set<String>> getContainingCollections() throws Exception {
Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
Keyspace ko = cass.getApplicationKeyspace( applicationId );
// TODO TN get all buckets here
List<HColumn<DynamicComposite, ByteBuffer>> containers = cass.getAllColumns( ko, ENTITY_COMPOSITE_DICTIONARIES,
key( headEntity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ), Serializers.dce, );
if ( containers != null ) {
for ( HColumn<DynamicComposite, ByteBuffer> container : containers ) {
DynamicComposite composite = container.getName();
if ( composite != null ) {
String ownerType = ( String ) composite.get( 0 );
String collectionName = ( String ) composite.get( 1 );
UUID ownerId = ( UUID ) composite.get( 2 );
addMapSet( results, new SimpleEntityRef( ownerType, ownerId ), collectionName );
if ( logger.isDebugEnabled() ) {
logger.debug( " {} ( {} ) is in collection {} ( {} ).", new Object[] {
headEntity.getType(), headEntity.getUuid(), ownerType, collectionName, ownerId
} );
EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
if ( !results.containsKey( applicationRef ) ) {
addMapSet( results, applicationRef, defaultCollectionName( headEntity.getType() ) );
return results;
public void batchCreateCollectionMembership( Mutator<ByteBuffer> batch, EntityRef ownerRef, String collectionName,
EntityRef itemRef, EntityRef membershipRef, UUID timestampUuid )
throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
if ( membershipRef == null ) {
membershipRef = new SimpleCollectionRef( ownerRef, collectionName, itemRef );
Map<String, Object> properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
properties.put( PROPERTY_TYPE, membershipRef.getType() );
properties.put( PROPERTY_COLLECTION_NAME, collectionName );
properties.put( PROPERTY_ITEM, itemRef.getUuid() );
properties.put( PROPERTY_ITEM_TYPE, itemRef.getType() );
em.batchCreate( batch, membershipRef.getType(), null, properties, membershipRef.getUuid(), timestampUuid );
key( membershipRef.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
asList( ownerRef.getType(), collectionName, ownerRef.getUuid() ), membershipRef.getUuid(), timestamp );
* Batch add to collection.
* @param batch the batch
* @param collectionName the collection name
* @param entity The entity to add to the batch
* @param timestampUuid The timestamp of this update in a time uuid
* @return batch
* @throws Exception the exception
public Mutator<ByteBuffer> batchAddToCollection( Mutator<ByteBuffer> batch, String collectionName, Entity entity,
UUID timestampUuid ) throws Exception {
List<UUID> ids = new ArrayList<UUID>( 1 );
ids.add( headEntity.getUuid() );
return batchAddToCollections( batch, headEntity.getType(), ids, collectionName, entity, timestampUuid );
@Metered(group = "core", name = "RelationManager_batchAddToCollections")
public Mutator<ByteBuffer> batchAddToCollections( Mutator<ByteBuffer> batch, String ownerType, List<UUID> ownerIds,
String collectionName, Entity entity, UUID timestampUuid )
throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
if ( Schema.isAssociatedEntityType( entity.getType() ) ) {
logger.error( "Cant add an extended type to any collection", new Throwable() );
return batch;
Map<UUID, CollectionRef> membershipRefs = new LinkedHashMap<UUID, CollectionRef>();
for ( UUID ownerId : ownerIds ) {
CollectionRef membershipRef =
new SimpleCollectionRef( new SimpleEntityRef( ownerType, ownerId ), collectionName, entity );
membershipRefs.put( ownerId, membershipRef );
// get the bucket this entityId needs to be inserted into
String bucketId = indexBucketLocator
.getBucket( applicationId, IndexType.COLLECTION, entity.getUuid(), collectionName );
Object collections_key = key( ownerId, Schema.DICTIONARY_COLLECTIONS, collectionName, bucketId );
// Insert in main collection
addInsertToMutator( batch, ENTITY_ID_SETS, collections_key, entity.getUuid(), membershipRef.getUuid(),
timestamp );
key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
asList( ownerType, collectionName, ownerId ), membershipRef.getUuid(), timestamp );
Schema schema = getDefaultSchema();
// Add property indexes
for ( String propertyName : entity.getProperties().keySet() ) {
boolean indexed_property = schema.isPropertyIndexed( entity.getType(), propertyName );
if ( indexed_property ) {
boolean collection_indexes_property =
schema.isPropertyIndexedInCollection( ownerType, collectionName, propertyName );
boolean item_schema_has_property = schema.hasProperty( entity.getType(), propertyName );
boolean fulltext_indexed = schema.isPropertyFulltextIndexed( entity.getType(), propertyName );
if ( collection_indexes_property || !item_schema_has_property ) {
Object propertyValue = entity.getProperty( propertyName );
IndexUpdate indexUpdate =
batchStartIndexUpdate( batch, entity, propertyName, propertyValue, timestampUuid,
item_schema_has_property, false, false, fulltext_indexed, true );
for ( UUID ownerId : ownerIds ) {
EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
batchUpdateCollectionIndex( indexUpdate, owner, collectionName );
// Add set property indexes
Set<String> dictionaryNames = em.getDictionaryNames( entity );
for ( String dictionaryName : dictionaryNames ) {
boolean has_dictionary = schema.hasDictionary( entity.getType(), dictionaryName );
boolean dictionary_indexed =
schema.isDictionaryIndexedInCollection( ownerType, collectionName, dictionaryName );
if ( dictionary_indexed || !has_dictionary ) {
Set<Object> elementValues = em.getDictionaryAsSet( entity, dictionaryName );
for ( Object elementValue : elementValues ) {
IndexUpdate indexUpdate =
batchStartIndexUpdate( batch, entity, dictionaryName, elementValue, timestampUuid,
has_dictionary, true, false, false, true );
for ( UUID ownerId : ownerIds ) {
EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
batchUpdateCollectionIndex( indexUpdate, owner, collectionName );
for ( UUID ownerId : ownerIds ) {
EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
batchCreateCollectionMembership( batch, owner, collectionName, entity, membershipRefs.get( ownerId ),
timestampUuid );
return batch;
* Batch remove from collection.
* <p/>
* * Batch add to collection.
* @param batch the batch
* @param collectionName the collection name
* @param entity The entity to add to the batch
* @param timestampUuid The timestamp of this update in a time uuid
* @return The mutation with the delete operations added
* @throws Exception the exception
public Mutator<ByteBuffer> batchRemoveFromCollection( Mutator<ByteBuffer> batch, String collectionName,
Entity entity, UUID timestampUuid ) throws Exception {
return this.batchRemoveFromCollection( batch, collectionName, entity, false, timestampUuid );
@Metered(group = "core", name = "RelationManager_batchRemoveFromCollection")
public Mutator<ByteBuffer> batchRemoveFromCollection( Mutator<ByteBuffer> batch, String collectionName,
Entity entity, boolean force, UUID timestampUuid )
throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
if ( !force && headEntity.getUuid().equals( applicationId ) ) {
// Can't remove entities from root collections
return batch;
Object collections_key = key( headEntity.getUuid(), Schema.DICTIONARY_COLLECTIONS, collectionName,
indexBucketLocator.getBucket( applicationId, IndexType.COLLECTION, entity.getUuid(), collectionName ) );
// Remove property indexes
Schema schema = getDefaultSchema();
for ( String propertyName : entity.getProperties().keySet() ) {
boolean collection_indexes_property =
schema.isPropertyIndexedInCollection( headEntity.getType(), collectionName, propertyName );
boolean item_schema_has_property = schema.hasProperty( entity.getType(), propertyName );
boolean fulltext_indexed = schema.isPropertyFulltextIndexed( entity.getType(), propertyName );
if ( collection_indexes_property || !item_schema_has_property ) {
IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, propertyName, null, timestampUuid,
item_schema_has_property, false, false, fulltext_indexed );
batchUpdateCollectionIndex( indexUpdate, headEntity, collectionName );
// Remove set indexes
Set<String> dictionaryNames = em.getDictionaryNames( entity );
for ( String dictionaryName : dictionaryNames ) {
boolean has_dictionary = schema.hasDictionary( entity.getType(), dictionaryName );
boolean dictionary_indexed =
schema.isDictionaryIndexedInCollection( headEntity.getType(), collectionName, dictionaryName );
if ( dictionary_indexed || !has_dictionary ) {
Set<Object> elementValues = em.getDictionaryAsSet( entity, dictionaryName );
for ( Object elementValue : elementValues ) {
IndexUpdate indexUpdate =
batchStartIndexUpdate( batch, entity, dictionaryName, elementValue, timestampUuid,
has_dictionary, true, true, false );
batchUpdateCollectionIndex( indexUpdate, headEntity, collectionName );
// Delete actual property
addDeleteToMutator( batch, ENTITY_ID_SETS, collections_key, entity.getUuid(), timestamp );
key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
asList( headEntity.getType(), collectionName, headEntity.getUuid() ), timestamp );
if ( !headEntity.getType().equalsIgnoreCase( TYPE_APPLICATION ) && !Schema
.isAssociatedEntityType( entity.getType() ) ) {
CollectionRef cref = new SimpleCollectionRef( headEntity, collectionName, entity );
em.delete( new SimpleEntityRef( cref.getType(), cref.getUuid() ) );
return batch;
@Metered(group = "core", name = "RelationManager_batchDeleteConnectionIndexEntries")
public Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
ConnectionRefImpl connection, UUID[] index_keys )
throws Exception {
// entity_id,prop_name
Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
entry.getPath() ) );
// entity_id,entity_type,prop_name
Object entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
// entity_id,connection_type,prop_name
Object connection_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
connection.getConnectedEntityType() ), indexUpdate.getTimestamp() );
// composite(property_value,connected_entity_id,connection_type,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
indexUpdate.getTimestamp() );
// composite(property_value,connected_entity_id,entity_type,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
indexUpdate.getTimestamp() );
// composite(property_value,connected_entity_id,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
return indexUpdate.getBatch();
@Metered(group = "core", name = "RelationManager_batchAddConnectionIndexEntries")
public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
ConnectionRefImpl connection, UUID[] index_keys ) {
// entity_id,prop_name
Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
entry.getPath() ) );
// entity_id,entity_type,prop_name
Object entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
// entity_id,connection_type,prop_name
Object connection_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
connection.getConnectedEntityType() ), connection.getUuid(), indexUpdate.getTimestamp() );
// composite(property_value,connected_entity_id,connection_type,entry_timestamp)
addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
connection.getUuid(), indexUpdate.getTimestamp() );
// composite(property_value,connected_entity_id,entity_type,entry_timestamp)
addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
connection.getUuid(), indexUpdate.getTimestamp() );
// composite(property_value,connected_entity_id,entry_timestamp)
addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
entry.getIndexComposite( connection.getConnectedEntityId() ), connection.getUuid(),
indexUpdate.getTimestamp() );
return indexUpdate.getBatch();
* Batch update connection index.
* @param indexUpdate The update operation to perform
* @param connection The connection to update
* @return The index with the batch mutation udpated
* @throws Exception the exception
@Metered(group = "core", name = "RelationManager_batchUpdateConnectionIndex")
public IndexUpdate batchUpdateConnectionIndex( IndexUpdate indexUpdate, ConnectionRefImpl connection )
throws Exception {
// UUID connection_id = connection.getUuid();
UUID[] index_keys = connection.getIndexIds();
// Delete all matching entries from entry list
for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
if ( entry.getValue() != null ) {
batchDeleteConnectionIndexEntries( indexUpdate, entry, connection, index_keys );
if ( "location.coordinates".equals( entry.getPath() ) ) {
EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
entry.getValue().toString() );
batchDeleteLocationInConnectionsIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
index_keys, entry.getPath(), loc );
else {
logger.error( "Unexpected condition - deserialized property value is null" );
if ( ( indexUpdate.getNewEntries().size() > 0 ) && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
&& !indexUpdate.isRemoveListEntry() ) ) ) {
for ( IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
batchAddConnectionIndexEntries( indexUpdate, indexEntry, connection, index_keys );
if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
EntityLocationRef loc =
new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
indexEntry.getValue().toString() );
batchStoreLocationInConnectionsIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
index_keys, indexEntry.getPath(), loc );
* addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
* Schema.INDEXES_SET), indexEntry.getKey(), null, false, timestamp); }
* addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
* Schema.INDEXES_SET), entryName, null, false, timestamp);
for ( String index : indexUpdate.getIndexesSet() ) {
addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), index, null,
indexUpdate.getTimestamp() );
return indexUpdate;
public Set<String> getConnectionIndexes( ConnectionRefImpl connection ) throws Exception {
List<HColumn<String, String>> results =
cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ),, );
Set<String> indexes = new TreeSet<String>();
if ( results != null ) {
for ( HColumn<String, String> column : results ) {
String propertyName = column.getName();
if ( !propertyName.endsWith( ".keywords" ) ) {
indexes.add( column.getName() );
return indexes;
* Batch update backward connections property indexes.
* @param indexUpdate The update to run for incoming connections
* @return The index update to run
* @throws Exception the exception
@Metered(group = "core", name = "RelationManager_batchUpdateBackwardConnectionsPropertyIndexes")
public IndexUpdate batchUpdateBackwardConnectionsPropertyIndexes( IndexUpdate indexUpdate ) throws Exception {
logger.debug( "batchUpdateBackwordConnectionsPropertyIndexes" );
boolean entitySchemaHasProperty = indexUpdate.isSchemaHasProperty();
if ( entitySchemaHasProperty ) {
if ( !getDefaultSchema()
.isPropertyIndexed( indexUpdate.getEntity().getType(), indexUpdate.getEntryName() ) ) {
return indexUpdate;
return doBackwardConnectionsUpdate( indexUpdate );
* Search each reverse connection type in the graph for connections. If one is found, update the index
* appropriately
* @param indexUpdate The index update to use
* @return The updated index update
private IndexUpdate doBackwardConnectionsUpdate( IndexUpdate indexUpdate ) throws Exception {
final Entity targetEntity = indexUpdate.getEntity();
final ConnectionTypesIterator connectionTypes =
new ConnectionTypesIterator( cass, applicationId, targetEntity.getUuid(), false, 100 );
for ( String connectionType : connectionTypes ) {
PagingResultsIterator itr = getReversedConnectionsIterator( targetEntity, connectionType );
for ( Object connection : itr ) {
final ConnectedEntityRef sourceEntity = ( ConnectedEntityRef ) connection;
//we need to create a connection ref from the source entity (found via reverse edge) to the entity
// we're about to update. This is the index that needs updated
final ConnectionRefImpl connectionRef =
new ConnectionRefImpl( sourceEntity, connectionType, indexUpdate.getEntity() );
batchUpdateConnectionIndex( indexUpdate, connectionRef );
return indexUpdate;
* Get a paging results iterator. Should return an iterator for all results
* @param targetEntity The target entity search connections from
* @return connectionType The name of the edges to search
private PagingResultsIterator getReversedConnectionsIterator( EntityRef targetEntity, String connectionType )
throws Exception {
return new PagingResultsIterator( getConnectingEntities( targetEntity, connectionType, null, Level.REFS ) );
* Batch update backward connections set indexes.
* @param indexUpdate The index to update in the dictionary
* @return The index update
* @throws Exception the exception
@Metered(group = "core", name = "RelationManager_batchUpdateBackwardConnectionsDictionaryIndexes")
public IndexUpdate batchUpdateBackwardConnectionsDictionaryIndexes( IndexUpdate indexUpdate ) throws Exception {
logger.debug( "batchUpdateBackwardConnectionsListIndexes" );
boolean entityHasDictionary = getDefaultSchema()
.isDictionaryIndexedInConnections( indexUpdate.getEntity().getType(), indexUpdate.getEntryName() );
if ( !entityHasDictionary ) {
return indexUpdate;
return doBackwardConnectionsUpdate( indexUpdate );
@Metered(group = "core", name = "RelationManager_batchUpdateEntityConnection")
public Mutator<ByteBuffer> batchUpdateEntityConnection( Mutator<ByteBuffer> batch,
boolean disconnect, ConnectionRefImpl connection, UUID timestampUuid ) throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
Entity connectedEntity = em.get( new SimpleEntityRef(
connection.getConnectedEntityType(), connection.getConnectedEntityId()) );
if ( connectedEntity == null ) {
return batch;
// Create connection for requested params
if ( disconnect ) {
key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
connection.getConnectionType() ),
asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp );
key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
connection.getConnectionType() ),
asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp );
// delete the connection path if there will be no connections left
boolean delete = true;
//check out outbound edges of the given type. If we have more than the 1 specified,
// we shouldn't delete the connection types from our outbound index
PagingResultsIterator itr = new PagingResultsIterator(
getConnectedEntities( connection.getConnectingEntity(), connection.getConnectionType(), null,
Level.REFS ) );
ConnectedEntityRef c;
while ( itr.hasNext() ) {
c = ( ConnectedEntityRef );
if ( !connection.getConnectedEntityId().equals( c.getUuid() ) ) {
delete = false;
// c = (ConnectionRef);
// if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
// .getConnectionType()) &&!c.getConnectedEntity().getUuid().equals(connection.getConnectedEntity()
// .getUuid())) {
// delete = false;
// break;
// }
// for (ConnectionRefImpl c : getConnectionsWithEntity(connection.getConnectingEntityId())) {
// if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
// .getConnectionType())) {
// if (!c.getConnectedEntity().getUuid().equals(connection.getConnectedEntity().getUuid())) {
// delete = false;
// break;
// }
// }
// }
if ( delete ) {
addDeleteToMutator( batch, ENTITY_DICTIONARIES,
key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
connection.getConnectionType(), timestamp );
// delete the connection path if there will be no connections left
delete = true;
//check out inbound edges of the given type. If we have more than the 1 specified,
// we shouldn't delete the connection types from our outbound index
itr = new PagingResultsIterator(
getConnectingEntities( connection.getConnectingEntity(), connection.getConnectionType(), null,
Level.REFS ) );
while ( itr.hasNext() ) {
c = ( ConnectedEntityRef );
if ( !connection.getConnectedEntityId().equals( c.getUuid() ) ) {
delete = false;
// if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
// .getConnectionType()) && !c.getConnectingEntity().getUuid().equals(connection.getConnectingEntity
// ().getUuid())) {
// delete = false;
// break;
// }
// for (ConnectionRefImpl c : getConnectionsWithEntity(connection.getConnectedEntityId())) {
// if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
// .getConnectionType())) {
// if (!c.getConnectingEntity().getUuid().equals(connection.getConnectingEntity().getUuid())) {
// delete = false;
// break;
// }
// }
// }
if ( delete ) {
addDeleteToMutator( batch, ENTITY_DICTIONARIES,
key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
connection.getConnectionType(), timestamp );
else {
key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
connection.getConnectionType() ),
asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp,
timestamp );
key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
connection.getConnectionType() ),
asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp,
timestamp );
// Add connection type to connections set
addInsertToMutator( batch, ENTITY_DICTIONARIES,
key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
connection.getConnectionType(), null, timestamp );
// Add connection type to connections set
addInsertToMutator( batch, ENTITY_DICTIONARIES,
key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
connection.getConnectionType(), null, timestamp );
// Add property indexes
// Iterate though all the properties of the connected entity
Schema schema = getDefaultSchema();
for ( String propertyName : connectedEntity.getProperties().keySet() ) {
Object propertyValue = connectedEntity.getProperties().get( propertyName );
boolean indexed = schema.isPropertyIndexed( connectedEntity.getType(), propertyName );
boolean connection_indexes_property = schema.isPropertyIndexed( connectedEntity.getType(), propertyName );
boolean item_schema_has_property = schema.hasProperty( connectedEntity.getType(), propertyName );
boolean fulltext_indexed = schema.isPropertyFulltextIndexed( connectedEntity.getType(), propertyName );
// For each property, if the schema says it's indexed, update its
// index
if ( indexed && ( connection_indexes_property || !item_schema_has_property ) ) {
IndexUpdate indexUpdate =
batchStartIndexUpdate( batch, connectedEntity, propertyName, disconnect ? null : propertyValue,
timestampUuid, item_schema_has_property, false, false, fulltext_indexed );
batchUpdateConnectionIndex( indexUpdate, connection );
// Add indexes for the connected entity's list properties
// Get the names of the list properties in the connected entity
Set<String> dictionaryNames = em.getDictionaryNames( connectedEntity );
// For each list property, get the values in the list and
// update the index with those values
for ( String dictionaryName : dictionaryNames ) {
boolean has_dictionary = schema.hasDictionary( connectedEntity.getType(), dictionaryName );
boolean dictionary_indexed =
schema.isDictionaryIndexedInConnections( connectedEntity.getType(), dictionaryName );
if ( dictionary_indexed || !has_dictionary ) {
Set<Object> elementValues = em.getDictionaryAsSet( connectedEntity, dictionaryName );
for ( Object elementValue : elementValues ) {
IndexUpdate indexUpdate =
batchStartIndexUpdate( batch, connectedEntity, dictionaryName, elementValue, timestampUuid,
has_dictionary, true, disconnect, false );
batchUpdateConnectionIndex( indexUpdate, connection );
return batch;
public void updateEntityConnection( boolean disconnect, ConnectionRefImpl connection ) throws Exception {
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), );
// Make or break the connection
batchUpdateEntityConnection( batch, disconnect, connection, timestampUuid );
// Make or break a connection from the connecting entity
// to the connection itself
ConnectionRefImpl loopback = connection.getConnectionToConnectionEntity();
if ( !disconnect ) {
em.insertEntity( new SimpleEntityRef(
CONNECTION_ENTITY_CONNECTION_TYPE, loopback.getConnectedEntityId() ) );
batchUpdateEntityConnection( batch, disconnect, loopback, timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
@Metered(group = "core", name = "RelationManager_batchDisconnect")
public void batchDisconnect( Mutator<ByteBuffer> batch, UUID timestampUuid ) throws Exception {
PagingResultsIterator itr =
new PagingResultsIterator( getConnectingEntities( headEntity, null, null, Level.REFS ) );
ConnectionRefImpl connection = null;
while ( itr.hasNext() ) {
Object itrObj =;
if ( itrObj instanceof ConnectionRefImpl ) {
connection = (ConnectionRefImpl) itrObj;
else if ( itrObj instanceof SimpleEntityRef ) {
connection = new ConnectionRefImpl( (SimpleEntityRef) itrObj );
else if ( itrObj instanceof EntityRef ) {
connection = new ConnectionRefImpl( new SimpleEntityRef((EntityRef);
else if ( itrObj instanceof UUID ) {
connection = new ConnectionRefImpl( new SimpleEntityRef((UUID);
batchUpdateEntityConnection( batch, true, connection, timestampUuid );
public IndexUpdate batchStartIndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName,
Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed )
throws Exception {
return batchStartIndexUpdate( batch, entity, entryName, entryValue, timestampUuid, schemaHasProperty,
isMultiValue, removeListEntry, fulltextIndexed, false );
@Metered(group = "core", name = "RelationManager_batchStartIndexUpdate")
public IndexUpdate batchStartIndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName,
Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed,
boolean skipRead ) throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
IndexUpdate indexUpdate =
new IndexUpdate( batch, entity, entryName, entryValue, schemaHasProperty, isMultiValue, removeListEntry,
timestampUuid );
// entryName = entryName.toLowerCase();
// entity_id,connection_type,connected_entity_id,prop_name
if ( !skipRead ) {
List<HColumn<ByteBuffer, ByteBuffer>> entries = null;
if ( isMultiValue && validIndexableValue( entryValue ) ) {
entries = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_INDEX_ENTRIES,
new DynamicComposite( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ) ),
setGreaterThanEqualityFlag( new DynamicComposite( entryName, indexValueCode( entryValue ),
toIndexableValue( entryValue ) ) ), INDEX_ENTRY_LIST_COUNT, false );
else {
entries = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_INDEX_ENTRIES,
entity.getUuid(), new DynamicComposite( entryName ),
setGreaterThanEqualityFlag( new DynamicComposite( entryName ) ), INDEX_ENTRY_LIST_COUNT,
false );
if ( logger.isDebugEnabled() ) {
logger.debug( "Found {} previous index entries for {} of entity {}", new Object[] {
entries.size(), entryName, entity.getUuid()
} );
// Delete all matching entries from entry list
for ( HColumn<ByteBuffer, ByteBuffer> entry : entries ) {
UUID prev_timestamp = null;
Object prev_value = null;
String prev_obj_path = null;
// new format:
// composite(entryName,
// value_code,prev_value,prev_timestamp,prev_obj_path) = null
DynamicComposite composite = DynamicComposite.fromByteBuffer( entry.getName().duplicate() );
prev_value = composite.get( 2 );
prev_timestamp = ( UUID ) composite.get( 3 );
if ( composite.size() > 4 ) {
prev_obj_path = ( String ) composite.get( 4 );
if ( prev_value != null ) {
String entryPath = entryName;
if ( ( prev_obj_path != null ) && ( prev_obj_path.length() > 0 ) ) {
entryPath = entryName + "." + prev_obj_path;
indexUpdate.addPrevEntry( entryPath, prev_value, prev_timestamp, entry.getName().duplicate() );
// composite(property_value,connected_entity_id,entry_timestamp)
// addDeleteToMutator(batch, ENTITY_INDEX_ENTRIES,
// entity.getUuid(), entry.getName(), timestamp);
else {
logger.error( "Unexpected condition - deserialized property value is null" );
if ( !isMultiValue || ( isMultiValue && !removeListEntry ) ) {
List<Map.Entry<String, Object>> list = IndexUtils.getKeyValueList( entryName, entryValue, fulltextIndexed );
if ( entryName.equalsIgnoreCase( "location" ) && ( entryValue instanceof Map ) ) {
@SuppressWarnings("rawtypes") double latitude =
MapUtils.getDoubleValue( ( Map ) entryValue, "latitude" );
@SuppressWarnings("rawtypes") double longitude =
MapUtils.getDoubleValue( ( Map ) entryValue, "longitude" );
list.add( new AbstractMap.SimpleEntry<String, Object>( "location.coordinates",
latitude + "," + longitude ) );
for ( Map.Entry<String, Object> indexEntry : list ) {
if ( validIndexableValue( indexEntry.getValue() ) ) {
indexUpdate.addNewEntry( indexEntry.getKey(), toIndexableValue( indexEntry.getValue() ) );
if ( isMultiValue ) {
addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
asList( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ),
indexUpdate.getTimestampUuid() ), null, timestamp );
else {
// int i = 0;
for ( Map.Entry<String, Object> indexEntry : list ) {
String name = indexEntry.getKey();
if ( name.startsWith( entryName + "." ) ) {
name = name.substring( entryName.length() + 1 );
else if ( name.startsWith( entryName ) ) {
name = name.substring( entryName.length() );
byte code = indexValueCode( indexEntry.getValue() );
Object val = toIndexableValue( indexEntry.getValue() );
addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
asList( entryName, code, val, indexUpdate.getTimestampUuid(), name ), null, timestamp );
indexUpdate.addIndex( indexEntry.getKey() );
indexUpdate.addIndex( entryName );
return indexUpdate;
@Metered(group = "core", name = "RelationManager_batchUpdatePropertyIndexes")
public void batchUpdatePropertyIndexes( Mutator<ByteBuffer> batch, String propertyName, Object propertyValue,
boolean entitySchemaHasProperty, boolean noRead, UUID timestampUuid )
throws Exception {
Entity entity = getHeadEntity();
UUID associatedId = null;
String associatedType = null;
if ( Schema.isAssociatedEntityType( entity.getType() ) ) {
Object item = entity.getProperty( PROPERTY_ITEM );
if ( ( item instanceof UUID ) && ( entity.getProperty( PROPERTY_COLLECTION_NAME ) instanceof String ) ) {
associatedId = ( UUID ) item;
associatedType = string( entity.getProperty( PROPERTY_ITEM_TYPE ) );
String entryName = TYPE_MEMBER + "." + propertyName;
if ( logger.isDebugEnabled() ) {
logger.debug( "Extended property {} ( {} ).{} indexed as {} ({})." + entryName, new Object[] {
entity.getType(), entity.getUuid(), propertyName, associatedType, associatedId
} );
propertyName = entryName;
IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, propertyName, propertyValue, timestampUuid,
entitySchemaHasProperty, false, false,
getDefaultSchema().isPropertyFulltextIndexed( entity.getType(), propertyName ), noRead );
// Update collections
String effectiveType = entity.getType();
if ( associatedType != null ) {
indexUpdate.setAssociatedId( associatedId );
effectiveType = associatedType;
Map<String, Set<CollectionInfo>> containers = getDefaultSchema().getContainers( effectiveType );
if ( containers != null ) {
Map<EntityRef, Set<String>> containerEntities = null;
if ( noRead ) {
containerEntities = new LinkedHashMap<EntityRef, Set<String>>();
EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
addMapSet( containerEntities, applicationRef, defaultCollectionName( entity.getType() ) );
else {
containerEntities = getContainingCollections();
for ( EntityRef containerEntity : containerEntities.keySet() ) {
if ( containerEntity.getType().equals( TYPE_APPLICATION ) && Schema
.isAssociatedEntityType( entity.getType() ) ) {
logger.debug( "Extended properties for {} not indexed by application", entity.getType() );
Set<String> collectionNames = containerEntities.get( containerEntity );
Set<CollectionInfo> collections = containers.get( containerEntity.getType() );
if ( collections != null ) {
for ( CollectionInfo collection : collections ) {
if ( collectionNames.contains( collection.getName() ) ) {
batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() );
if ( !noRead ) {
batchUpdateBackwardConnectionsPropertyIndexes( indexUpdate );
* We've updated the properties, add the deletes to the ledger
for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
addDeleteToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(), entry.getLedgerColumn(),
indexUpdate.getTimestamp() );
public void batchUpdateSetIndexes( Mutator<ByteBuffer> batch, String setName, Object elementValue,
boolean removeFromSet, UUID timestampUuid ) throws Exception {
Entity entity = getHeadEntity();
elementValue = getDefaultSchema().validateEntitySetValue( entity.getType(), setName, elementValue );
IndexUpdate indexUpdate =
batchStartIndexUpdate( batch, entity, setName, elementValue, timestampUuid, true, true, removeFromSet,
false );
// Update collections
Map<String, Set<CollectionInfo>> containers =
getDefaultSchema().getContainersIndexingDictionary( entity.getType(), setName );
if ( containers != null ) {
Map<EntityRef, Set<String>> containerEntities = getContainingCollections();
for ( EntityRef containerEntity : containerEntities.keySet() ) {
if ( containerEntity.getType().equals( TYPE_APPLICATION ) && Schema
.isAssociatedEntityType( entity.getType() ) ) {
logger.debug( "Extended properties for {} not indexed by application", entity.getType() );
Set<String> collectionNames = containerEntities.get( containerEntity );
Set<CollectionInfo> collections = containers.get( containerEntity.getType() );
if ( collections != null ) {
for ( CollectionInfo collection : collections ) {
if ( collectionNames.contains( collection.getName() ) ) {
batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() );
batchUpdateBackwardConnectionsDictionaryIndexes( indexUpdate );
private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize ) throws Exception {
DynamicComposite[] range = slice.getRange();
Object keyPrefix = key( indexKey, slice.getPropertyName() );
IndexScanner scanner =
new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.CONNECTION,
keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), slice.getPropertyName() );
return scanner;
* Search the collection index using all the buckets for the given collection
* @param indexKey The index key to read
* @param slice Slice set in the query
* @param collectionName The name of the collection to search
* @param pageSize The page size to load when iterating
private IndexScanner searchIndexBuckets( Object indexKey, QuerySlice slice, String collectionName, int pageSize )
throws Exception {
DynamicComposite[] range = slice.getRange();
Object keyPrefix = key( indexKey, slice.getPropertyName() );
IndexScanner scanner =
new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.COLLECTION,
keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), collectionName );
return scanner;
@Metered(group = "core", name = "RelationManager_isOwner")
public boolean isCollectionMember( String collectionName, EntityRef entity ) throws Exception {
Keyspace ko = cass.getApplicationKeyspace( applicationId );
ByteBuffer col = DynamicComposite
.toByteBuffer( asList( this.headEntity.getType(), collectionName, headEntity.getUuid() ) );
HColumn<ByteBuffer, ByteBuffer> result = cass.getColumn( ko, ENTITY_COMPOSITE_DICTIONARIES,
key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ), col,, );
return result != null;
/** @param connectionName The name of hte connection */
public boolean isConnectionMember( String connectionName, EntityRef entity ) throws Exception {
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Object key = key( this.headEntity.getUuid(), DICTIONARY_CONNECTED_ENTITIES, connectionName );
DynamicComposite start = new DynamicComposite( entity.getUuid() );
List<HColumn<ByteBuffer, ByteBuffer>> cols =
cass.getColumns( ko, ENTITY_COMPOSITE_DICTIONARIES, key, start, null, 1, false );
if ( cols == null || cols.size() == 0 ) {
return false;
UUID returnedUUID = ( UUID ) DynamicComposite.fromByteBuffer( cols.get( 0 ).getName() ).get( 0 );
return entity.getUuid().equals( returnedUUID );
// key(connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
// connection.getConnectionType()),
// asList(connection.getConnectingEntityId(), connection.getConnectingEntityType()), timestamp);
// ConnectionRefImpl ref = new ConnectionRefImpl(this.headEntity, connectionName, entity);
// HColumn<String, UUID> col = cass.getColumn(ko, ENTITY_CONNECTIONS, ref.getUuid(),
// ConnectionRefImpl.CONNECTED_ENTITY_ID, se, ue);
// getConnectedEntities(this.headEntity, connectionName, )
// return col != null && entity.getUuid().equals(col.getValue());
@Metered(group = "core", name = "RelationManager_getOwners")
public Map<String, Map<UUID, Set<String>>> getOwners() throws Exception {
Map<EntityRef, Set<String>> containerEntities = getContainingCollections();
Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();
for ( EntityRef owner : containerEntities.keySet() ) {
Set<String> collections = containerEntities.get( owner );
for ( String collection : collections ) {
MapUtils.addMapMapSet( owners, owner.getType(), owner.getUuid(), collection );
return owners;
@Metered(group = "core", name = "RelationManager_getCollections")
public Set<String> getCollections() throws Exception {
Map<String, CollectionInfo> collections = getDefaultSchema().getCollections( headEntity.getType() );
if ( collections == null ) {
return null;
return collections.keySet();
@Metered(group = "core", name = "RelationManager_getCollection_start_result")
public Results getCollection( String collectionName, UUID startResult, int count, Level resultsLevel,
boolean reversed ) throws Exception {
// changed intentionally to delegate to search so that behavior is
// consistent across all index access.
// TODO T.N fix cursor parsing here so startResult can be used in this
// context. Needs a bit of refactor
// for accommodating cursor I/O USERGRID-1750. A bit hacky, but until a
// furthur refactor this works.
Query query = new Query().withResultsLevel( resultsLevel ).withReversed( reversed ).withLimit( count )
.withStartResult( startResult );
return searchCollection( collectionName, query );
@Metered(group = "core", name = "RelationManager_getCollecitonForQuery")
public Results getCollection( String collectionName, Query query, Level resultsLevel ) throws Exception {
// changed intentionally to delegate to search so that behavior is
// consistent across all index access.
return searchCollection( collectionName, query );
@Metered(group = "core", name = "RelationManager_addToCollection")
public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {
Entity itemEntity = em.get( itemRef );
if ( itemEntity == null ) {
return null;
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( ( collection != null ) && !collection.getType().equals( itemRef.getType() ) ) {
return null;
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator(cass.getApplicationKeyspace( applicationId ), );
batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
if ( collection.getLinkedCollection() != null ) {
getRelationManager( itemEntity )
.batchAddToCollection( batch, collection.getLinkedCollection(), getHeadEntity(), timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
return itemEntity;
@Metered(group = "core", name = "RelationManager_addToCollections")
public Entity addToCollections( List<EntityRef> owners, String collectionName ) throws Exception {
Entity itemEntity = getHeadEntity();
Map<String, List<UUID>> collectionsByType = new LinkedHashMap<String, List<UUID>>();
for ( EntityRef owner : owners ) {
MapUtils.addMapList( collectionsByType, owner.getType(), owner.getUuid() );
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), );
Schema schema = getDefaultSchema();
for ( Entry<String, List<UUID>> entry : collectionsByType.entrySet() ) {
CollectionInfo collection = schema.getCollection( entry.getKey(), collectionName );
if ( ( collection != null ) && !collection.getType().equals( headEntity.getType() ) ) {
batchAddToCollections( batch, entry.getKey(), entry.getValue(), collectionName, itemEntity, timestampUuid );
if ( collection.getLinkedCollection() != null ) {
"Bulk add to collections used on a linked collection, linked connection will not be updated" );
batchExecute( batch, CassandraService.RETRY_COUNT );
return null;
@Metered(group = "core", name = "RelationManager_createItemInCollection")
public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
throws Exception {
if ( headEntity.getUuid().equals( applicationId ) ) {
if ( itemType.equals( TYPE_ENTITY ) ) {
itemType = singularize( collectionName );
if ( itemType.equals( TYPE_ROLE ) ) {
Long inactivity = ( Long ) properties.get( PROPERTY_INACTIVITY );
if ( inactivity == null ) {
inactivity = 0L;
return em.createRole( ( String ) properties.get( PROPERTY_NAME ),
( String ) properties.get( PROPERTY_TITLE ), inactivity );
return em.create( itemType, properties );
else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collectionName.equals( COLLECTION_ROLES ) ) ) {
UUID groupId = headEntity.getUuid();
String roleName = ( String ) properties.get( PROPERTY_NAME );
return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
return null;
properties = getDefaultSchema().cleanUpdatedProperties( itemType, properties, true );
Entity itemEntity = em.create( itemType, properties );
if ( itemEntity != null ) {
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), );
batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
if ( collection.getLinkedCollection() != null ) {
getRelationManager( itemEntity )
.batchAddToCollection( batch, collection.getLinkedCollection(), getHeadEntity(),
timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
return itemEntity;
@Metered(group = "core", name = "RelationManager_removeFromCollection")
public void removeFromCollection( String collectionName, EntityRef itemRef ) throws Exception {
if ( headEntity.getUuid().equals( applicationId ) ) {
if ( collectionName.equals( COLLECTION_ROLES ) ) {
Entity itemEntity = em.get( itemRef );
if ( itemEntity != null ) {
RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole( roleRef.getApplicationRoleName() );
em.delete( itemEntity );
em.delete( itemRef );
Entity itemEntity = em.get( itemRef );
if ( itemEntity == null ) {
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), );
batchRemoveFromCollection( batch, collectionName, itemEntity, timestampUuid );
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if ( ( collection != null ) && ( collection.getLinkedCollection() != null ) ) {
getRelationManager( itemEntity )
.batchRemoveFromCollection( batch, collection.getLinkedCollection(), getHeadEntity(),
timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
if ( collectionName.equals( COLLECTION_ROLES ) ) {
String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );
if ( path.startsWith( "/roles/" ) ) {
RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole( roleRef.getApplicationRoleName() );
@Metered(group = "core", name = "RelationManager_batchRemoveFromContainers")
public void batchRemoveFromContainers( Mutator<ByteBuffer> m, UUID timestampUuid ) throws Exception {
Entity entity = getHeadEntity();
// find all the containing collections
Map<EntityRef, Set<String>> containers = getContainingCollections();
if ( containers != null ) {
for ( Entry<EntityRef, Set<String>> container : containers.entrySet() ) {
for ( String collectionName : container.getValue() ) {
getRelationManager( container.getKey() )
.batchRemoveFromCollection( m, collectionName, entity, true, timestampUuid );
@Metered(group = "core", name = "RelationManager_copyRelationships")
public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
throws Exception {
headEntity = em.validate( headEntity );
dstEntityRef = em.validate( dstEntityRef );
CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
Results results = null;
do {
if ( srcCollection != null ) {
results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false );
else {
results = em.getConnectedEntities( headEntity, srcRelationName, null, Level.REFS );
if ( ( results != null ) && ( results.size() > 0 ) ) {
List<EntityRef> refs = results.getRefs();
for ( EntityRef ref : refs ) {
if ( dstCollection != null ) {
em.addToCollection( dstEntityRef, dstRelationName, ref );
else {
em.createConnection( dstEntityRef, dstRelationName, ref );
while ( ( results != null ) && ( results.hasMoreResults() ) );
@Metered(group = "core", name = "RelationManager_searchCollection")
public Results searchCollection( String collectionName, Query query ) throws Exception {
if ( query == null ) {
query = new Query();
headEntity = em.validate( headEntity );
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
query.setEntityType( collection.getType() );
final CollectionResultsLoaderFactory factory = new CollectionResultsLoaderFactory();
// we have something to search with, visit our tree and evaluate the
// results
QueryProcessorImpl qp = new QueryProcessorImpl( query, collection, em, factory );
SearchCollectionVisitor visitor = new SearchCollectionVisitor( qp );
return qp.getResults( visitor );
@Metered(group = "core", name = "RelationManager_createConnection_connection_ref")
public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
ConnectionRefImpl connectionImpl = new ConnectionRefImpl( connection );
updateEntityConnection( false, connectionImpl );
return connection;
@Metered(group = "core", name = "RelationManager_createConnection_connectionType")
public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {
headEntity = em.validate( headEntity );
connectedEntityRef = em.validate( connectedEntityRef );
ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
updateEntityConnection( false, connection );
return connection;
@Metered(group = "core", name = "RelationManager_createConnection_paired_connection_type")
public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
EntityRef connectedEntityRef ) throws Exception {
ConnectionRefImpl connection =
new ConnectionRefImpl( headEntity, new ConnectedEntityRefImpl( pairedConnectionType, pairedEntity ),
new ConnectedEntityRefImpl( connectionType, connectedEntityRef ) );
updateEntityConnection( false, connection );
return connection;
@Metered(group = "core", name = "RelationManager_createConnection_connected_entity_ref")
public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {
ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connections );
updateEntityConnection( false, connection );
return connection;
@Metered(group = "core", name = "RelationManager_connectionRef_type_entity")
public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {
ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
return connection;
@Metered(group = "core", name = "RelationManager_connectionRef_entity_to_entity")
public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
EntityRef connectedEntityRef ) throws Exception {
ConnectionRef connection =
new ConnectionRefImpl( headEntity, new ConnectedEntityRefImpl( pairedConnectionType, pairedEntity ),
new ConnectedEntityRefImpl( connectionType, connectedEntityRef ) );
return connection;
@Metered(group = "core", name = "RelationManager_connectionRef_connections")
public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {
ConnectionRef connection = new ConnectionRefImpl( headEntity, connections );
return connection;
@Metered(group = "core", name = "RelationManager_deleteConnection")
public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
updateEntityConnection( true, new ConnectionRefImpl( connectionRef ) );
@Metered(group = "core", name = "RelationManager_getConnectionTypes_entity_id")
public Set<String> getConnectionTypes( UUID connectedEntityId ) throws Exception {
// Add connection type to connections set
// addInsertToMutator(batch, ENTITY_DICTIONARIES,
// key(connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES),
// connection.getConnectionType(), null,
// timestamp);
// // Add connection type to connections set
// addInsertToMutator(batch, ENTITY_DICTIONARIES,
// key(connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES),
// connection.getConnectionType(), null,
// timestamp);
// Object key = key(connectedEntityId, DICTIONARY_CONNECTED_TYPES);
Set<String> connections = cast( em.getDictionaryAsSet( new SimpleEntityRef( connectedEntityId ),
return connections;
// Set<String> connection_types = new TreeSet<String>(CASE_INSENSITIVE_ORDER);
// //TODO T.N. get this from the dictionary
// List<ConnectionRefImpl> connections = getConnections(new ConnectionRefImpl(headEntity,
// new ConnectedEntityRefImpl(
// NULL_ID), new ConnectedEntityRefImpl(connectedEntityId)), false);
// for (ConnectionRefImpl connection : connections) {
// if ((connection.getConnectionType() != null) && (connection.getFirstPairedConnectedEntityId() ==
// null)) {
// connection_types.add(connection.getConnectionType());
// }
// }
// return connection_types;
// <<<<<<< HEAD
public Set<String> getConnectionTypes() throws Exception {
return getConnectionTypes( false );
@Metered(group = "core", name = "RelationManager_getConnectionTypes")
public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {
Set<String> connections = cast( em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) );
if ( connections == null ) {
return null;
if ( filterConnection && ( connections.size() > 0 ) ) {
connections.remove( "connection" );
return connections;
@Metered(group = "core", name = "RelationManager_getConnectedEntities")
public Results getConnectedEntities( String connectionType, String connectedEntityType, Level resultsLevel )
throws Exception {
return getConnectedEntities( headEntity, connectionType, connectedEntityType, resultsLevel );
* Get all edges that are from the sourceEntity
* @param sourceEntity The source entity to search edges in
* @param connectionType The type of connection. If not specified, all connections are returned
* @param connectedEntityType The connected entity type, if not specified all types are returned
* @param resultsLevel The results level to return
private Results getConnectedEntities( EntityRef sourceEntity, String connectionType, String connectedEntityType,
Level resultsLevel ) throws Exception {
Query query = new Query();
query.setResultsLevel( resultsLevel );
ConnectionRefImpl connectionRef =
new ConnectionRefImpl( sourceEntity, connectionType, new SimpleEntityRef( connectedEntityType, null ) );
// EntityRef connectedEntity) {
// ConnectionRefImpl connectionRef = new ConnectionRefImpl(new ConnectedEntityRefImpl(connectionType,
// connectedEntityType, null, true), sourceEntity );
final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
QueryProcessorImpl qp = new QueryProcessorImpl( query, null, em, factory );
SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
return qp.getResults( visitor );
@Metered(group = "core", name = "RelationManager_getConnectingEntities")
public Results getConnectingEntities( String connectionType, String connectedEntityType,
Level resultsLevel ) throws Exception {
return getConnectingEntities(connectionType, connectedEntityType, resultsLevel, 0 );
@Metered(group = "core", name = "RelationManager_getConnectingEntities")
public Results getConnectingEntities(String connectionType,
String entityType, Level level, int count) throws Exception {
return getConnectingEntities(headEntity, connectionType, entityType, level, count );
* Get all edges that are to the targetEntity
* @param targetEntity The target entity to search edges in
* @param connectionType The type of connection. If not specified, all connections are returned
* @param connectedEntityType The connected entity type, if not specified all types are returned
* @param count result limit
private Results getConnectingEntities(EntityRef targetEntity,
String connectionType, String connectedEntityType, Level level, int count) throws Exception {
Query query = new Query();
query.setResultsLevel( level );
final ConnectionRefImpl connectionRef =
new ConnectionRefImpl( new SimpleEntityRef( connectedEntityType, null ), connectionType, targetEntity );
final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
QueryProcessorImpl qp = new QueryProcessorImpl( query, null, em, factory );
SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, false );
return qp.getResults( visitor );
* Get all edges that are to the targetEntity
* @param targetEntity The target entity to search edges in
* @param connectionType The type of connection. If not specified, all connections are returned
* @param connectedEntityType The connected entity type, if not specified all types are returned
* @param resultsLevel The results level to return
private Results getConnectingEntities( EntityRef targetEntity, String connectionType, String connectedEntityType,
Level resultsLevel ) throws Exception {
return getConnectingEntities(targetEntity, connectionType, connectedEntityType, resultsLevel, 0);
@Metered(group = "core", name = "RelationManager_searchConnectedEntities")
public Results searchConnectedEntities( Query query ) throws Exception {
Preconditions.checkNotNull(query, "Query must not be null");
final String connectedEntityType = query.getEntityType();
final String connectionType = query.getConnectionType();
Preconditions.checkNotNull( connectedEntityType, "entityType must not be null" );
Preconditions.checkNotNull( connectionType, "connectionType must not be null" );
headEntity = em.validate( headEntity );
ConnectionRefImpl connectionRef =
new ConnectionRefImpl( headEntity, connectionType, new SimpleEntityRef( connectedEntityType, null ) );
final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
QueryProcessorImpl qp = new QueryProcessorImpl( query, null, em, factory );
SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
return qp.getResults( visitor );
public Set<String> getConnectionIndexes( String connectionType ) throws Exception {
return getConnectionIndexes( new ConnectionRefImpl( headEntity, connectionType, null ) );
private static final UUIDIndexSliceParser UUID_PARSER = new UUIDIndexSliceParser();
* Simple search visitor that performs all the joining
* @author tnine
private class SearchCollectionVisitor extends SearchVisitor {
private final CollectionInfo collection;
* @param queryProcessor
public SearchCollectionVisitor( QueryProcessorImpl queryProcessor ) {
super( queryProcessor );
this.collection = queryProcessor.getCollectionInfo();
/* (non-Javadoc)
* @see
* .QueryNode,
protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
// NOTE we explicitly do not append the slice value here. This
// is done in the searchIndex method below
Object indexKey = key( headEntity.getUuid(), collection.getName() );
// update the cursor and order before we perform the slice
// operation. Should be done after subkeying since this can
// change the hash value of the slice
queryProcessor.applyCursorAndSort( slice );
IndexScanner columns = null;
// nothing left to search for this range
if ( slice.isComplete() ) {
columns = new NoOpIndexScanner();
// perform the search
else {
columns = searchIndexBuckets( indexKey, slice, collection.getName(),
queryProcessor.getPageSizeHint( node ) );
return columns;
public void visit( AllNode node ) throws Exception {
String collectionName = collection.getName();
QuerySlice slice = node.getSlice();
queryProcessor.applyCursorAndSort( slice );
UUID startId = null;
if ( slice.hasCursor() ) {
startId = UUID_PARSER.parse( slice.getCursor() ).getUUID();
IndexScanner indexScanner = cass.getIdList( cass.getApplicationKeyspace( applicationId ),
key( headEntity.getUuid(), DICTIONARY_COLLECTIONS, collectionName ), startId, null,
queryProcessor.getPageSizeHint( node ), query.isReversed(), indexBucketLocator, applicationId,
collectionName, node.isForceKeepFirst() );
this.results.push( new SliceIterator( slice, indexScanner, UUID_PARSER ) );
* (non-Javadoc)
* @see
public void visit( WithinNode node ) throws Exception {
QuerySlice slice = node.getSlice();
queryProcessor.applyCursorAndSort( slice );
GeoIterator itr = new GeoIterator(
new CollectionGeoSearch( em, indexBucketLocator, cass, headEntity, collection.getName() ),
query.getLimit(), slice, node.getPropertyName(),
new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
results.push( itr );
public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
EntityRef ref = em.getAlias(
headEntity, collection.getType(), nameIdentifierNode.getName() );
if ( ref == null ) {
this.results.push( new EmptyIterator() );
this.results.push( new StaticIdIterator( ref.getUuid() ) );
* Simple search visitor that performs all the joining
* @author tnine
private class SearchConnectionVisitor extends SearchVisitor {
private final ConnectionRefImpl connection;
/** True if we should search from source->target edges. False if we should search from target<-source edges */
private final boolean outgoing;
* @param queryProcessor They query processor to use
* @param connection The connection refernce
* @param outgoing The direction to search. True if we should search from source->target edges. False if we
* should search from target<-source edges
public SearchConnectionVisitor( QueryProcessorImpl queryProcessor, ConnectionRefImpl connection,
boolean outgoing ) {
super( queryProcessor );
this.connection = connection;
this.outgoing = outgoing;
/* (non-Javadoc)
* @see
* .QueryNode,
protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
UUID id = ConnectionRefImpl.getIndexId( ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE, headEntity,
connection.getConnectionType(), connection.getConnectedEntityType(), new ConnectedEntityRef[0] );
Object key = key( id, INDEX_CONNECTIONS );
// update the cursor and order before we perform the slice
// operation
queryProcessor.applyCursorAndSort( slice );
IndexScanner columns = null;
if ( slice.isComplete() ) {
columns = new NoOpIndexScanner();
else {
columns = searchIndex( key, slice, queryProcessor.getPageSizeHint( node ) );
return columns;
* (non-Javadoc)
* @see
public void visit( WithinNode node ) throws Exception {
QuerySlice slice = node.getSlice();
queryProcessor.applyCursorAndSort( slice );
GeoIterator itr =
new GeoIterator( new ConnectionGeoSearch( em, indexBucketLocator, cass, connection.getIndexId() ),
query.getLimit(), slice, node.getPropertyName(),
new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
results.push( itr );
public void visit( AllNode node ) throws Exception {
QuerySlice slice = node.getSlice();
queryProcessor.applyCursorAndSort( slice );
int size = queryProcessor.getPageSizeHint( node );
ByteBuffer start = null;
if ( slice.hasCursor() ) {
start = slice.getCursor();
boolean skipFirst = node.isForceKeepFirst() ? false : slice.hasCursor();
UUID entityIdToUse;
//change our type depending on which direction we're loading
String dictionaryType;
//the target type
String targetType;
//this is on the "source" side of the edge
if ( outgoing ) {
entityIdToUse = connection.getConnectingEntityId();
targetType = connection.getConnectedEntityType();
//we're on the target side of the edge
else {
entityIdToUse = connection.getConnectedEntityId();
targetType = connection.getConnectingEntityType();
final String connectionType = connection.getConnectionType();
final ConnectionIndexSliceParser connectionParser = new ConnectionIndexSliceParser( targetType );
final Iterator<String> connectionTypes;
//use the provided connection type
if ( connectionType != null ) {
connectionTypes = Collections.singleton( connectionType ).iterator();
//we need to iterate all connection types
else {
connectionTypes = new ConnectionTypesIterator( cass, applicationId, entityIdToUse, outgoing, size );
IndexScanner connectionScanner =
new ConnectedIndexScanner( cass, dictionaryType, applicationId, entityIdToUse, connectionTypes,
start, slice.isReversed(), size, skipFirst );
this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
//TODO T.N. USERGRID-1919 actually validate this is connected
EntityRef ref = em.getAlias( new SimpleEntityRef(Application.ENTITY_TYPE, applicationId),
connection.getConnectedEntityType(), nameIdentifierNode.getName() );
if ( ref == null ) {
this.results.push( new EmptyIterator() );
this.results.push( new StaticIdIterator( ref.getUuid() ) );