blob: a0c4c5bb159f2a8b24e3419ae313ab622a0def92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.persistence.cassandra;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;
import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
import org.apache.usergrid.locking.Lock;
import org.apache.usergrid.mq.Message;
import org.apache.usergrid.mq.QueueManager;
import org.apache.usergrid.mq.cassandra.QueueManagerFactoryImpl;
import org.apache.usergrid.persistence.AggregateCounter;
import org.apache.usergrid.persistence.AggregateCounterSet;
import org.apache.usergrid.persistence.CollectionRef;
import org.apache.usergrid.persistence.ConnectedEntityRef;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.DynamicEntity;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityFactory;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.IndexBucketLocator;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.RoleRef;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.SimpleCollectionRef;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.TypedEntity;
import org.apache.usergrid.persistence.cassandra.CounterUtils.AggregateCounterSelection;
import org.apache.usergrid.persistence.cassandra.util.TraceParticipant;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.entities.Event;
import org.apache.usergrid.persistence.entities.Group;
import org.apache.usergrid.persistence.entities.Role;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
import org.apache.usergrid.persistence.index.query.CounterResolution;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.query.Query.CounterFilterPredicate;
import org.apache.usergrid.persistence.index.query.Query.Level;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.ClassUtils;
import org.apache.usergrid.utils.CompositeUtils;
import org.apache.usergrid.utils.UUIDUtils;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.yammer.metrics.annotation.Metered;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.CounterRow;
import me.prettyprint.hector.api.beans.CounterRows;
import me.prettyprint.hector.api.beans.CounterSlice;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HCounterColumn;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.beans.Rows;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.MultigetSliceCounterQuery;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceCounterQuery;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import static java.util.Arrays.asList;
import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
import static org.apache.commons.lang.StringUtils.capitalize;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.usergrid.locking.LockHelper.getUniqueUpdateLock;
import static org.apache.usergrid.persistence.Results.fromEntities;
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_PROPERTIES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLENAMES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLETIMES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_SETS;
import static org.apache.usergrid.persistence.Schema.PROPERTY_ASSOCIATED;
import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
import static org.apache.usergrid.persistence.Schema.PROPERTY_MODIFIED;
import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
import static org.apache.usergrid.persistence.Schema.PROPERTY_TIMESTAMP;
import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
import static org.apache.usergrid.persistence.Schema.TYPE_CONNECTION;
import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
import static org.apache.usergrid.persistence.Schema.TYPE_MEMBER;
import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
import static org.apache.usergrid.persistence.Schema.defaultCollectionName;
import static org.apache.usergrid.persistence.Schema.deserializeEntityProperties;
import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
import static org.apache.usergrid.persistence.SimpleEntityRef.getUuid;
import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
import static org.apache.usergrid.persistence.SimpleRoleRef.getIdForGroupIdAndRoleName;
import static org.apache.usergrid.persistence.SimpleRoleRef.getIdForRoleName;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_PROPERTIES;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_UNIQUE;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addPropertyToMutator;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.toStorableBinaryValue;
import static org.apache.usergrid.persistence.cassandra.CassandraService.ALL_COUNT;
import static org.apache.usergrid.persistence.cassandra.Serializers.be;
import static org.apache.usergrid.persistence.cassandra.Serializers.le;
import static org.apache.usergrid.persistence.cassandra.Serializers.se;
import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.hector.CountingMutator;
import static org.apache.usergrid.persistence.index.query.Query.Level.REFS;
import static org.apache.usergrid.utils.ClassUtils.cast;
import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
import static org.apache.usergrid.utils.ConversionUtils.getLong;
import static org.apache.usergrid.utils.ConversionUtils.object;
import static org.apache.usergrid.utils.ConversionUtils.string;
import static org.apache.usergrid.utils.ConversionUtils.uuid;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMillis;
import static org.apache.usergrid.utils.UUIDUtils.isTimeBased;
import static org.apache.usergrid.utils.UUIDUtils.newTimeUUID;
/**
* Cassandra-specific implementation of Datastore
*
* @author edanuff
* @author tnine
*/
public class EntityManagerImpl implements EntityManager {
/** The log4j logger. */
private static final Logger logger = LoggerFactory.getLogger( EntityManagerImpl.class );
public static final String APPLICATION_COLLECTION = "application.collection.";
public static final String APPLICATION_ENTITIES = "application.entities";
public static final long ONE_COUNT = 1L;
@Resource
private EntityManagerFactoryImpl emf;
@Resource
private QueueManagerFactoryImpl qmf;
@Resource
private IndexBucketLocator indexBucketLocator;
private UUID applicationId;
private Application application;
@Resource
private CassandraService cass;
@Resource
private CounterUtils counterUtils;
private boolean skipAggregateCounters;
public EntityManagerImpl() {
}
@Override
public void init(EntityManagerFactory emf, UUID applicationId) {
init( (EntityManagerFactoryImpl)emf, null, null, applicationId, false);
}
public EntityManager init(
EntityManagerFactoryImpl emf, CassandraService cass, CounterUtils counterUtils,
UUID applicationId, boolean skipAggregateCounters ) {
this.emf = emf;
this.cass = cass;
this.counterUtils = counterUtils;
this.applicationId = applicationId;
this.skipAggregateCounters = skipAggregateCounters;
qmf = ( QueueManagerFactoryImpl ) getApplicationContext().getBean( "queueManagerFactory" );
indexBucketLocator = ( IndexBucketLocator ) getApplicationContext().getBean( "indexBucketLocator" );
// prime the application entity for the EM
try {
getApplication();
}
catch ( Exception ex ) {
ex.printStackTrace();
}
return this;
}
public void setApplicationId( UUID applicationId ) {
this.applicationId = applicationId;
}
public ApplicationContext getApplicationContext() {
return emf.applicationContext;
}
@Override
public EntityRef getApplicationRef() {
return ref( TYPE_APPLICATION, applicationId );
}
@Override
public Application getApplication() throws Exception {
if ( application == null ) {
application = get( applicationId, Application.class );
}
return application;
}
@Override
public void updateApplication( Application app ) throws Exception {
update( app );
this.application = app;
}
@Override
public void updateApplication( Map<String, Object> properties ) throws Exception {
this.updateProperties( applicationId, Application.ENTITY_TYPE, properties );
this.application = get( applicationId, Application.class );
}
@Override
public RelationManagerImpl getRelationManager( EntityRef entityRef ) {
//RelationManagerImpl rmi = applicationContext.getBean(RelationManagerImpl.class);
RelationManagerImpl rmi = new RelationManagerImpl();
rmi.init( this, cass, applicationId, entityRef, indexBucketLocator );
return rmi;
}
/**
* Batch dictionary property.
*
* @param batch The batch to set the property into
* @param entity The entity that owns the property
* @param propertyName the property name
* @param propertyValue the property value
* @param timestampUuid The update timestamp as a uuid
*
* @return batch
*
* @throws Exception the exception
*/
public Mutator<ByteBuffer> batchSetProperty( Mutator<ByteBuffer> batch, EntityRef entity, String propertyName,
Object propertyValue, UUID timestampUuid ) throws Exception {
return this.batchSetProperty( batch, entity, propertyName, propertyValue, false, false, timestampUuid );
}
public Mutator<ByteBuffer> batchSetProperty( Mutator<ByteBuffer> batch, EntityRef entity, String propertyName,
Object propertyValue, boolean force, boolean noRead,
UUID timestampUuid ) throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
// propertyName = propertyName.toLowerCase();
boolean entitySchemaHasProperty = getDefaultSchema().hasProperty( entity.getType(), propertyName );
propertyValue = getDefaultSchema().validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
Schema defaultSchema = Schema.getDefaultSchema();
if ( PROPERTY_TYPE.equalsIgnoreCase( propertyName ) && ( propertyValue != null ) ) {
if ( "entity".equalsIgnoreCase( propertyValue.toString() ) || "dynamicentity"
.equalsIgnoreCase( propertyValue.toString() ) ) {
String errorMsg =
"Unable to dictionary entity type to " + propertyValue + " because that is not a valid type.";
logger.error( errorMsg );
throw new IllegalArgumentException( errorMsg );
}
}
if ( entitySchemaHasProperty ) {
if ( !force ) {
if ( !defaultSchema.isPropertyMutable( entity.getType(), propertyName ) ) {
return batch;
}
// Passing null for propertyValue indicates delete the property
// so if required property, exit
if ( ( propertyValue == null ) && defaultSchema.isRequiredProperty( entity.getType(), propertyName ) ) {
return batch;
}
}
/**
* Unique property, load the old value and remove it, check if it's not a duplicate
*/
if ( defaultSchema.getEntityInfo( entity.getType() ).isPropertyUnique( propertyName ) ) {
Lock lock = getUniqueUpdateLock( cass.getLockManager(), applicationId, propertyValue, entity.getType(),
propertyName );
try {
lock.lock();
if ( !isPropertyValueUniqueForEntity( entity.getUuid(), entity.getType(), propertyName,
propertyValue ) ) {
throw new DuplicateUniquePropertyExistsException( entity.getType(), propertyName,
propertyValue );
}
String collectionName = Schema.defaultCollectionName( entity.getType() );
uniquePropertyDelete( batch, collectionName, entity.getType(), propertyName, propertyValue,
entity.getUuid(), timestamp - 1 );
uniquePropertyWrite( batch, collectionName, propertyName, propertyValue, entity.getUuid(),
timestamp );
}
finally {
lock.unlock();
}
}
}
if ( getDefaultSchema().isPropertyIndexed( entity.getType(), propertyName ) ) {
//this call is incorrect. The current entity is NOT the head entity
getRelationManager( entity )
.batchUpdatePropertyIndexes( batch, propertyName, propertyValue, entitySchemaHasProperty, noRead,
timestampUuid );
}
if ( propertyValue != null ) {
// Set the new value
addPropertyToMutator( batch, key( entity.getUuid() ), entity.getType(), propertyName, propertyValue,
timestamp );
if ( !entitySchemaHasProperty ) {
// Make a list of all the properties ever dictionary on this
// entity
addInsertToMutator( batch, ENTITY_DICTIONARIES, key( entity.getUuid(), DICTIONARY_PROPERTIES ),
propertyName, null, timestamp );
}
}
else {
addDeleteToMutator( batch, ENTITY_PROPERTIES, key( entity.getUuid() ), propertyName, timestamp );
}
return batch;
}
/**
* Batch update properties.
*
* @param batch the batch
* @param entity The owning entity reference
* @param properties the properties to set
* @param timestampUuid the timestamp of the update operation as a time uuid
*
* @return batch
*
* @throws Exception the exception
*/
public Mutator<ByteBuffer> batchUpdateProperties( Mutator<ByteBuffer> batch, EntityRef entity,
Map<String, Object> properties, UUID timestampUuid )
throws Exception {
for ( String propertyName : properties.keySet() ) {
Object propertyValue = properties.get( propertyName );
batch = batchSetProperty( batch, entity, propertyName, propertyValue, timestampUuid );
}
return batch;
}
/**
* Batch update set.
*
* @param batch the batch
* @param entity The owning entity
* @param dictionaryName the dictionary name
* @param elementValue the dictionary value
* @param removeFromDictionary True to delete from the dictionary
* @param timestampUuid the timestamp
*
* @return batch
*
* @throws Exception the exception
*/
public Mutator<ByteBuffer> batchUpdateDictionary( Mutator<ByteBuffer> batch, EntityRef entity,
String dictionaryName, Object elementValue,
boolean removeFromDictionary, UUID timestampUuid )
throws Exception {
return batchUpdateDictionary( batch, entity, dictionaryName, elementValue, null, removeFromDictionary,
timestampUuid );
}
public Mutator<ByteBuffer> batchUpdateDictionary( Mutator<ByteBuffer> batch, EntityRef entity,
String dictionaryName, Object elementValue, Object elementCoValue,
boolean removeFromDictionary, UUID timestampUuid )
throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
// dictionaryName = dictionaryName.toLowerCase();
if ( elementCoValue == null ) {
elementCoValue = ByteBuffer.allocate( 0 );
}
boolean entityHasDictionary = getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
// Don't index dynamic dictionaries not defined by the schema
if ( entityHasDictionary ) {
getRelationManager( entity )
.batchUpdateSetIndexes( batch, dictionaryName, elementValue, removeFromDictionary, timestampUuid );
}
ApplicationCF dictionary_cf = entityHasDictionary ? ENTITY_DICTIONARIES : ENTITY_COMPOSITE_DICTIONARIES;
if ( elementValue != null ) {
if ( !removeFromDictionary ) {
// Set the new value
elementCoValue = toStorableBinaryValue( elementCoValue, !entityHasDictionary );
addInsertToMutator( batch, dictionary_cf, key( entity.getUuid(), dictionaryName ),
entityHasDictionary ? elementValue : asList( elementValue ), elementCoValue, timestamp );
if ( !entityHasDictionary ) {
addInsertToMutator( batch, ENTITY_DICTIONARIES, key( entity.getUuid(), DICTIONARY_SETS ),
dictionaryName, null, timestamp );
}
}
else {
addDeleteToMutator( batch, dictionary_cf, key( entity.getUuid(), dictionaryName ),
entityHasDictionary ? elementValue : asList( elementValue ), timestamp );
}
}
return batch;
}
/**
* Returns true if the property is unique, and the entity can be saved. If it's not unique, false is returned
*
* @return True if this entity can safely "own" this property name and value unique combination
*/
@Metered( group = "core", name = "EntityManager_isPropertyValueUniqueForEntity" )
public boolean isPropertyValueUniqueForEntity( UUID ownerEntityId, String entityType, String propertyName,
Object propertyValue ) throws Exception {
if ( !getDefaultSchema().isPropertyUnique( entityType, propertyName ) ) {
return true;
}
if ( propertyValue == null ) {
return true;
}
/**
* Doing this in a loop sucks, but we need to account for possibly having more than 1 entry in the index due
* to corruption. We need to allow them to update, otherwise
* both entities will be unable to update and must be deleted
*/
Set<UUID> ownerEntityIds = getUUIDsForUniqueProperty(
new SimpleEntityRef(Application.ENTITY_TYPE, applicationId), entityType, propertyName, propertyValue );
//if there are no entities for this property, we know it's unique. If there are,
// we have to make sure the one we were passed is in the set. otherwise it belongs
//to a different entity
return ownerEntityIds.size() == 0 || ownerEntityIds.contains( ownerEntityId );
}
/**
* Return all UUIDs that have this unique value
*
* @param ownerEntityId The entity id that owns this entity collection
* @param collectionName The entity collection name
* @param propertyName The name of the unique property
* @param propertyValue The value of the unique property
*/
private Set<UUID> getUUIDsForUniqueProperty(
EntityRef ownerEntityRef, String collectionName, String propertyName,
Object propertyValue ) throws Exception {
String collectionNameInternal = defaultCollectionName( collectionName );
Object key = createUniqueIndexKey(
ownerEntityRef.getUuid(), collectionNameInternal, propertyName, propertyValue );
List<HColumn<ByteBuffer, ByteBuffer>> cols = cass.getColumns(
cass.getApplicationKeyspace( applicationId ), ENTITY_UNIQUE, key, null, null, 2, false );
//No columns at all, it's unique
if ( cols.size() == 0 ) {
return Collections.emptySet();
}
//shouldn't happen, but it's an error case
if ( cols.size() > 1 ) {
logger.error( "INDEX CORRUPTION: More than 1 unique value exists for entities in "
+ "ownerId {} of type {} on property {} with value {}",
new Object[] { ownerEntityRef, collectionNameInternal, propertyName, propertyValue } );
}
/**
* Doing this in a loop sucks, but we need to account for possibly having more than
* 1 entry in the index due
* to corruption. We need to allow them to update, otherwise
* both entities will be unable to update and must be deleted
*/
Set<UUID> results = new HashSet<UUID>( cols.size() );
for ( HColumn<ByteBuffer, ByteBuffer> col : cols ) {
results.add( ue.fromByteBuffer( col.getName() ) );
}
return results;
}
/** Add this unique index to the delete */
private void uniquePropertyDelete( Mutator<ByteBuffer> m, String collectionName, String entityType,
String propertyName, Object propertyValue, UUID entityId, long timestamp ) throws Exception {
//read the old value and delete it
Object oldValue = getProperty( new SimpleEntityRef( entityType, entityId ), propertyName );
//we have an old value. If the new value is empty, we want to delete the old value. If the new value is
// different we want to delete, otherwise we don't issue the delete
if ( oldValue != null && ( propertyValue == null || !oldValue.equals( propertyValue ) ) ) {
Object key = createUniqueIndexKey( applicationId, collectionName, propertyName, oldValue );
addDeleteToMutator( m, ENTITY_UNIQUE, key, timestamp, entityId );
}
}
/** Add this unique index to the delete */
private void uniquePropertyWrite( Mutator<ByteBuffer> m, String collectionName, String propertyName,
Object propertyValue, UUID entityId, long timestamp ) throws Exception {
Object key = createUniqueIndexKey( applicationId, collectionName, propertyName, propertyValue );
addInsertToMutator( m, ENTITY_UNIQUE, key, entityId, null, timestamp );
}
/**
* Create a row key for the entity of the given type with the name and value in the property.
* Used for fast unique
* index lookups
*/
private Object createUniqueIndexKey(
UUID ownerId, String collectionName, String propertyName, Object value ) {
return key( ownerId, collectionName, propertyName, value );
}
@Override
@Metered( group = "core", name = "EntityManager_getAlias_single" )
public EntityRef getAlias( EntityRef ownerRef, String collectionType, String aliasValue )
throws Exception {
Assert.notNull( ownerRef, "ownerRef is required" );
Assert.notNull( collectionType, "collectionType is required" );
Assert.notNull( aliasValue, "aliasValue is required" );
Map<String, EntityRef> results = getAlias(
ownerRef, collectionType, Collections.singletonList( aliasValue ) );
if ( results == null || results.size() == 0 ) {
return null;
}
//add a warn statement so we can see if we have data migration issues.
//TODO When we get an event system, trigger a repair if this is detected
if ( results.size() > 1 ) {
logger.warn(
"More than 1 entity with Owner id '{}' of type '{}' and alias '{}' exists. "
+ "This is a duplicate alias, and needs audited",
new Object[] { ownerRef, collectionType, aliasValue } );
}
return results.get( aliasValue );
}
@Override
public Map<String, EntityRef> getAlias( String aliasType, List<String> aliases ) throws Exception {
return getAlias( new SimpleEntityRef(Application.ENTITY_TYPE, applicationId), aliasType, aliases );
}
@Override
@Metered( group = "core", name = "EntityManager_getAlias_multi" )
public Map<String, EntityRef> getAlias(
EntityRef ownerRef, String collectionName, List<String> aliases ) throws Exception {
Assert.notNull( ownerRef, "ownerRef is required" );
Assert.notNull( collectionName, "collectionName is required" );
Assert.notEmpty( aliases, "aliases are required" );
String propertyName = Schema.getDefaultSchema().aliasProperty( collectionName );
String entityType = Schema.getDefaultSchema().getCollectionType(ownerRef.getType(), collectionName);
Map<String, EntityRef> results = new HashMap<String, EntityRef>();
for ( String alias : aliases ) {
for ( UUID id : getUUIDsForUniqueProperty( ownerRef, collectionName, propertyName, alias ) ) {
results.put( alias, new SimpleEntityRef( entityType, id ) );
}
}
return results;
}
@SuppressWarnings( "unchecked" )
@Override
public <A extends Entity> A create(
String entityType, Class<A> entityClass, Map<String, Object> properties )
throws Exception {
if ( ( entityType != null ) && ( entityType.startsWith( TYPE_ENTITY ) || entityType
.startsWith( "entities" ) ) ) {
throw new IllegalArgumentException( "Invalid entity type" );
}
A e = null;
try {
e = ( A ) create( entityType, ( Class<Entity> ) entityClass, properties, null );
}
catch ( ClassCastException e1 ) {
logger.error( "Unable to create typed entity", e1 );
}
return e;
}
@Override
public Entity create( UUID importId, String entityType, Map<String, Object> properties )
throws Exception {
return create( entityType, null, properties, importId );
}
@SuppressWarnings( "unchecked" )
@Override
public <A extends TypedEntity> A create( A entity ) throws Exception {
return ( A ) create( entity.getType(), entity.getClass(), entity.getProperties() );
}
@Override
@TraceParticipant
public Entity create( String entityType, Map<String, Object> properties ) throws Exception {
return create( entityType, null, properties );
}
/**
* Creates a new entity.
*
* @param entityType the entity type
* @param entityClass the entity class
* @param properties the properties
* @param importId an existing external uuid to use as the id for the new entity
*
* @return new entity
*
* @throws Exception the exception
*/
@Metered( group = "core", name = "EntityManager_create" )
@TraceParticipant
public <A extends Entity> A create(
String entityType, Class<A> entityClass, Map<String, Object> properties,
UUID importId ) throws Exception {
UUID timestampUuid = newTimeUUID();
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
A entity = batchCreate( m, entityType, entityClass, properties, importId, timestampUuid );
batchExecute( m, CassandraService.RETRY_COUNT );
return entity;
}
@SuppressWarnings( "unchecked" )
@Metered( group = "core", name = "EntityManager_batchCreate" )
public <A extends Entity> A batchCreate(
Mutator<ByteBuffer> m, String entityType, Class<A> entityClass,
Map<String, Object> properties, UUID importId, UUID timestampUuid )
throws Exception {
String eType = Schema.normalizeEntityType( entityType );
Schema schema = getDefaultSchema();
boolean is_application = TYPE_APPLICATION.equals( eType );
if ( ( ( applicationId == null ) || applicationId.equals( UUIDUtils.ZERO_UUID ) ) && !is_application ) {
return null;
}
long timestamp = getTimestampInMicros( timestampUuid );
UUID itemId = null;
if ( is_application ) {
itemId = applicationId;
}
if ( importId != null ) {
itemId = importId;
}
if (itemId == null) {
itemId = UUIDUtils.newTimeUUID();
}
boolean emptyPropertyMap = false;
if ( properties == null ) {
properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
}
if ( properties.isEmpty() ) {
emptyPropertyMap = true;
}
if ( importId != null ) {
if ( isTimeBased( importId ) ) {
timestamp = UUIDUtils.getTimestampInMicros( importId );
}
else if ( properties.get( PROPERTY_CREATED ) != null ) {
timestamp = getLong( properties.get( PROPERTY_CREATED ) ) * 1000;
}
}
if ( entityClass == null ) {
entityClass = ( Class<A> ) Schema.getDefaultSchema().getEntityClass( entityType );
}
Set<String> required = schema.getRequiredProperties( entityType );
if ( required != null ) {
for ( String p : required ) {
if ( !PROPERTY_UUID.equals( p ) && !PROPERTY_TYPE.equals( p ) && !PROPERTY_CREATED.equals( p )
&& !PROPERTY_MODIFIED.equals( p ) ) {
Object v = properties.get( p );
if ( schema.isPropertyTimestamp( entityType, p ) ) {
if ( v == null ) {
properties.put( p, timestamp / 1000 );
}
else {
long ts = getLong( v );
if ( ts <= 0 ) {
properties.put( p, timestamp / 1000 );
}
}
continue;
}
if ( v == null ) {
throw new RequiredPropertyNotFoundException( entityType, p );
}
else if ( ( v instanceof String ) && isBlank( ( String ) v ) ) {
throw new RequiredPropertyNotFoundException( entityType, p );
}
}
}
}
// Create collection name based on entity: i.e. "users"
String collection_name = Schema.defaultCollectionName( eType );
// Create collection key based collection name
String bucketId = indexBucketLocator.getBucket( applicationId, IndexType.COLLECTION, itemId, collection_name );
Object collection_key = key( applicationId, Schema.DICTIONARY_COLLECTIONS, collection_name, bucketId );
CollectionInfo collection = null;
if ( !is_application ) {
// Add entity to collection
if ( !emptyPropertyMap ) {
addInsertToMutator( m, ENTITY_ID_SETS, collection_key, itemId, null, timestamp );
}
// Add name of collection to dictionary property
// Application.collections
addInsertToMutator( m, ENTITY_DICTIONARIES, key( applicationId, Schema.DICTIONARY_COLLECTIONS ),
collection_name, null, timestamp );
addInsertToMutator( m, ENTITY_COMPOSITE_DICTIONARIES, key( itemId, Schema.DICTIONARY_CONTAINER_ENTITIES ),
asList( TYPE_APPLICATION, collection_name, applicationId ), null, timestamp );
}
if ( emptyPropertyMap ) {
return null;
}
properties.put( PROPERTY_UUID, itemId );
properties.put( PROPERTY_TYPE, Schema.normalizeEntityType( entityType, false ) );
if ( importId != null ) {
if ( properties.get( PROPERTY_CREATED ) == null ) {
properties.put( PROPERTY_CREATED, timestamp / 1000 );
}
if ( properties.get( PROPERTY_MODIFIED ) == null ) {
properties.put( PROPERTY_MODIFIED, timestamp / 1000 );
}
}
else {
properties.put( PROPERTY_CREATED, timestamp / 1000 );
properties.put( PROPERTY_MODIFIED, timestamp / 1000 );
}
// special case timestamp and published properties
// and dictionary their timestamp values if not set
// this is sure to break something for someone someday
if ( properties.containsKey( PROPERTY_TIMESTAMP ) ) {
long ts = getLong( properties.get( PROPERTY_TIMESTAMP ) );
if ( ts <= 0 ) {
properties.put( PROPERTY_TIMESTAMP, timestamp / 1000 );
}
}
A entity = EntityFactory.newEntity( itemId, eType, entityClass );
logger.info( "Entity created of type {}", entity.getClass().getName() );
if ( Event.ENTITY_TYPE.equals( eType ) ) {
Event event = ( Event ) entity.toTypedEntity();
for ( String prop_name : properties.keySet() ) {
Object propertyValue = properties.get( prop_name );
if ( propertyValue != null ) {
event.setProperty( prop_name, propertyValue );
}
}
Message message = storeEventAsMessage( m, event, timestamp );
incrementEntityCollection( "events", timestamp );
entity.setUuid( message.getUuid() );
return entity;
}
for ( String prop_name : properties.keySet() ) {
Object propertyValue = properties.get( prop_name );
if ( propertyValue == null ) {
continue;
}
if ( User.ENTITY_TYPE.equals( entityType ) && "me".equals( prop_name ) ) {
throw new DuplicateUniquePropertyExistsException( entityType, prop_name, propertyValue );
}
entity.setProperty( prop_name, propertyValue );
batchSetProperty( m, entity, prop_name, propertyValue, true, true, timestampUuid );
}
if ( !is_application ) {
incrementEntityCollection( collection_name, timestamp );
}
return entity;
}
private void incrementEntityCollection( String collection_name, long cassandraTimestamp ) {
try {
incrementAggregateCounters( null, null, null, APPLICATION_COLLECTION + collection_name,
ONE_COUNT, cassandraTimestamp );
}
catch ( Exception e ) {
logger.error( "Unable to increment counter application.collection: {}.", new Object[]{ collection_name, e} );
}
try {
incrementAggregateCounters( null, null, null, APPLICATION_ENTITIES, ONE_COUNT, cassandraTimestamp );
}
catch ( Exception e ) {
logger.error( "Unable to increment counter application.entities for collection: {} with timestamp: {}", new Object[]{collection_name, cassandraTimestamp,e} );
}
}
public void decrementEntityCollection( String collection_name ) {
long cassandraTimestamp = cass.createTimestamp();
decrementEntityCollection( collection_name, cassandraTimestamp );
}
public void decrementEntityCollection( String collection_name, long cassandraTimestamp ) {
try {
incrementAggregateCounters( null, null, null, APPLICATION_COLLECTION + collection_name, -ONE_COUNT,
cassandraTimestamp );
}
catch ( Exception e ) {
logger.error( "Unable to decrement counter application.collection: {}.", new Object[]{collection_name, e} );
}
try {
incrementAggregateCounters( null, null, null, APPLICATION_ENTITIES, -ONE_COUNT, cassandraTimestamp );
}
catch ( Exception e ) {
logger.error( "Unable to decrement counter application.entities for collection: {} with timestamp: {}", new Object[]{collection_name, cassandraTimestamp,e} );
}
}
@Metered( group = "core", name = "EntityManager_insertEntity" )
public void insertEntity( EntityRef entityRef ) throws Exception {
String type = entityRef.getType();
UUID entityId = entityRef.getUuid();
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
Object itemKey = key( entityId );
long timestamp = cass.createTimestamp();
addPropertyToMutator( m, itemKey, type, PROPERTY_UUID, entityId, timestamp );
addPropertyToMutator( m, itemKey, type, PROPERTY_TYPE, type, timestamp );
batchExecute( m, CassandraService.RETRY_COUNT );
}
public Message storeEventAsMessage( Mutator<ByteBuffer> m, Event event, long timestamp ) {
counterUtils.addEventCounterMutations( m, applicationId, event, timestamp );
QueueManager q = qmf.getQueueManager( applicationId );
Message message = new Message();
message.setType( "event" );
message.setCategory( event.getCategory() );
message.setStringProperty( "message", event.getMessage() );
message.setTimestamp( timestamp );
q.postToQueue( "events", message );
return message;
}
/**
* Gets the type.
*
* @param entityId the entity id
*
* @return entity type
*
* @throws Exception the exception
*/
@Metered( group = "core", name = "EntityManager_getEntityType" )
public String getEntityType( UUID entityId ) throws Exception {
HColumn<String, String> column =
cass.getColumn( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, key( entityId ),
PROPERTY_TYPE, se, se );
if ( column != null ) {
return column.getValue();
}
return null;
}
/**
* Gets the entity info. If no propertyNames are passed it loads the ENTIRE entity!
*
* @param entityId the entity id
* @param propertyNames the property names
*
* @return DynamicEntity object holding properties
*
* @throws Exception the exception
*/
@Metered( group = "core", name = "EntityManager_loadPartialEntity" )
public DynamicEntity loadPartialEntity( UUID entityId, String... propertyNames ) throws Exception {
List<HColumn<String, ByteBuffer>> results = null;
if ( ( propertyNames != null ) && ( propertyNames.length > 0 ) ) {
Set<String> column_names = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
column_names.add( PROPERTY_TYPE );
column_names.add( PROPERTY_UUID );
Collections.addAll(column_names, propertyNames);
results = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, key( entityId ),
column_names, se, be );
}
else {
results = cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES,
key( entityId ) );
}
Map<String, Object> entityProperties = deserializeEntityProperties( results );
if ( entityProperties == null ) {
return null;
}
String entityType = ( String ) entityProperties.get( PROPERTY_TYPE );
UUID id = ( UUID ) entityProperties.get( PROPERTY_UUID );
return new DynamicEntity( entityType, id, entityProperties );
}
/**
* Gets the specified entity.
*
* @param entityId the entity id
* @param entityClass the entity class
*
* @return entity
*
* @throws Exception the exception
*/
public <A extends Entity> A getEntity( UUID entityId, Class<A> entityClass ) throws Exception {
Object entity_key = key( entityId );
Map<String, Object> results = null;
// if (entityType == null) {
results = deserializeEntityProperties( cass.getAllColumns(
cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, entity_key ) );
// } else {
// Set<String> columnNames = Schema.getPropertyNames(entityType);
// results = getColumns(getApplicationKeyspace(applicationId),
// EntityCF.PROPERTIES, entity_key, columnNames, se, be);
// }
if ( results == null ) {
logger.warn( "getEntity(): No properties found for entity {}, "
+ "probably doesn't exist...", entityId );
return null;
}
UUID id = uuid( results.get( PROPERTY_UUID ) );
String type = string( results.get( PROPERTY_TYPE ) );
if ( !entityId.equals( id ) ) {
logger.error( "Expected entity id {}, found {}. Returning null entity",
new Object[]{entityId, id, new Throwable()} );
return null;
}
A entity = EntityFactory.newEntity( id, type, entityClass );
entity.setProperties( results );
return entity;
}
/**
* Gets the specified list of entities.
*
* @param entityIds the entity ids
* @param entityClass the entity class
*
* @return entity
*
* @throws Exception the exception
*/
@Metered( group = "core", name = "EntityManager_getEntities" )
public <A extends Entity> List<A> getEntities(
Collection<UUID> entityIds, Class<A> entityClass ) throws Exception {
List<A> entities = new ArrayList<A>();
if ( ( entityIds == null ) || ( entityIds.size() == 0 ) ) {
return entities;
}
Map<UUID, A> resultSet = new LinkedHashMap<UUID, A>();
Rows<UUID, String, ByteBuffer> results = null;
// if (entityType == null) {
results = cass.getRows( cass.getApplicationKeyspace( applicationId ),
ENTITY_PROPERTIES, entityIds, ue, se, be );
// } else {
// Set<String> columnNames = Schema.getPropertyNames(entityType);
// results = getRows(getApplicationKeyspace(applicationId),
// EntityCF.PROPERTIES,
// entityIds, columnNames, ue, se, be);
// }
if ( results != null ) {
for ( UUID key : entityIds ) {
Map<String, Object> properties = deserializeEntityProperties( results.getByKey( key ) );
if ( properties == null ) {
logger.error( "Error deserializing entity with key {} entity probaby "
+ "doesn't exist, where did this key come from?", key );
continue;
}
UUID id = uuid( properties.get( PROPERTY_UUID ) );
String type = string( properties.get( PROPERTY_TYPE ) );
if ( ( id == null ) || ( type == null ) ) {
logger.error( "Error retrieving entity with key {}, no type or id deseriazable, where did this key come from?", key );
continue;
}
A entity = EntityFactory.newEntity( id, type, entityClass );
entity.setProperties( properties );
resultSet.put( id, entity );
}
for ( UUID entityId : entityIds ) {
A entity = resultSet.get( entityId );
if ( entity != null ) {
entities.add( entity );
}
}
}
return entities;
}
@Metered( group = "core", name = "EntityManager_getPropertyNames" )
public Set<String> getPropertyNames( EntityRef entity ) throws Exception {
Set<String> propertyNames = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
List<HColumn<String, ByteBuffer>> results =
cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
key( entity.getUuid(), DICTIONARY_PROPERTIES ) );
for ( HColumn<String, ByteBuffer> result : results ) {
String str = string( result.getName() );
if ( str != null ) {
propertyNames.add( str );
}
}
Set<String> schemaProperties = getDefaultSchema().getPropertyNames( entity.getType() );
if ( ( schemaProperties != null ) && !schemaProperties.isEmpty() ) {
propertyNames.addAll( schemaProperties );
}
return propertyNames;
}
@Metered( group = "core", name = "EntityManager_getDictionaryNames" )
public Set<String> getDictionaryNames( EntityRef entity ) throws Exception {
Set<String> dictionaryNames = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
List<HColumn<String, ByteBuffer>> results =
cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
key( entity.getUuid(), DICTIONARY_SETS ) );
for ( HColumn<String, ByteBuffer> result : results ) {
String str = string( result.getName() );
if ( str != null ) {
dictionaryNames.add( str );
}
}
Set<String> schemaSets = getDefaultSchema().getDictionaryNames( entity.getType() );
if ( ( schemaSets != null ) && !schemaSets.isEmpty() ) {
dictionaryNames.addAll( schemaSets );
}
return dictionaryNames;
}
@Override
@Metered( group = "core", name = "EntityManager_getDictionaryElementValue" )
public Object getDictionaryElementValue( EntityRef entity, String dictionaryName, String elementName )
throws Exception {
Object value = null;
ApplicationCF dictionaryCf = null;
boolean entityHasDictionary = getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
if ( entityHasDictionary ) {
dictionaryCf = ENTITY_DICTIONARIES;
}
else {
dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
}
Class<?> dictionaryCoType = getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType );
HColumn<ByteBuffer, ByteBuffer> result =
cass.getColumn( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
key( entity.getUuid(), dictionaryName ),
entityHasDictionary ? bytebuffer( elementName ) : DynamicComposite.toByteBuffer( elementName ),
be, be );
if ( result != null ) {
if ( entityHasDictionary && coTypeIsBasic ) {
value = object( dictionaryCoType, result.getValue() );
}
else if ( result.getValue().remaining() > 0 ) {
value = Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(), dictionaryCoType );
}
}
else {
logger.info( "Results of EntityManagerImpl.getDictionaryElementValue is null" );
}
return value;
}
@Metered( group = "core", name = "EntityManager_getDictionaryElementValues" )
public Map<String, Object> getDictionaryElementValues( EntityRef entity, String dictionaryName,
String... elementNames ) throws Exception {
Map<String, Object> values = null;
ApplicationCF dictionaryCf = null;
boolean entityHasDictionary = getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
if ( entityHasDictionary ) {
dictionaryCf = ENTITY_DICTIONARIES;
}
else {
dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
}
Class<?> dictionaryCoType = getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType );
ByteBuffer[] columnNames = new ByteBuffer[elementNames.length];
for ( int i = 0; i < elementNames.length; i++ ) {
columnNames[i] = entityHasDictionary ? bytebuffer( elementNames[i] ) :
DynamicComposite.toByteBuffer( elementNames[i] );
}
ColumnSlice<ByteBuffer, ByteBuffer> results =
cass.getColumns( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
key( entity.getUuid(), dictionaryName ), columnNames, be, be );
if ( results != null ) {
values = new HashMap<String, Object>();
for ( HColumn<ByteBuffer, ByteBuffer> result : results.getColumns() ) {
String name = entityHasDictionary ? string( result.getName() ) :
DynamicComposite.fromByteBuffer( result.getName() ).get( 0, se );
if ( entityHasDictionary && coTypeIsBasic ) {
values.put( name, object( dictionaryCoType, result.getValue() ) );
}
else if ( result.getValue().remaining() > 0 ) {
values.put( name, Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(),
dictionaryCoType ) );
}
}
}
else {
logger.error( "Results of EntityManagerImpl.getDictionaryElementValues is null" );
}
return values;
}
/**
* Gets the set.
*
* @param entity The owning entity
* @param dictionaryName the dictionary name
*
* @return contents of dictionary property
*
* @throws Exception the exception
*/
@Override
@Metered( group = "core", name = "EntityManager_getDictionaryAsMap" )
public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName ) throws Exception {
entity = validate( entity );
Map<Object, Object> dictionary = new LinkedHashMap<Object, Object>();
ApplicationCF dictionaryCf = null;
boolean entityHasDictionary = getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
if ( entityHasDictionary ) {
dictionaryCf = ENTITY_DICTIONARIES;
}
else {
dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
}
Class<?> setType = getDefaultSchema().getDictionaryKeyType( entity.getType(), dictionaryName );
Class<?> setCoType = getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
boolean coTypeIsBasic = ClassUtils.isBasicType( setCoType );
List<HColumn<ByteBuffer, ByteBuffer>> results =
cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
key( entity.getUuid(), dictionaryName ), be, be );
for ( HColumn<ByteBuffer, ByteBuffer> result : results ) {
Object name = null;
if ( entityHasDictionary ) {
name = object( setType, result.getName() );
}
else {
name = CompositeUtils.deserialize( result.getName() );
}
Object value = null;
if ( entityHasDictionary && coTypeIsBasic ) {
value = object( setCoType, result.getValue() );
}
else if ( result.getValue().remaining() > 0 ) {
value = Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(), setCoType );
}
if ( name != null ) {
dictionary.put( name, value );
}
}
return dictionary;
}
@Override
public Set<Object> getDictionaryAsSet( EntityRef entity, String dictionaryName ) throws Exception {
return new LinkedHashSet<Object>( getDictionaryAsMap( entity, dictionaryName ).keySet() );
}
/**
* Update properties.
*
* @param entityId the entity id
* @param properties the properties
*
* @throws Exception the exception
*/
@Metered( group = "core", name = "EntityManager_updateProperties" )
public void updateProperties(
UUID entityId, String type, Map<String, Object> properties ) throws Exception {
EntityRef entity = new SimpleEntityRef( type, entityId );
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
UUID timestampUuid = newTimeUUID();
properties.put( PROPERTY_MODIFIED, getTimestampInMillis( timestampUuid ) );
batchUpdateProperties( m, entity, properties, timestampUuid );
batchExecute( m, CassandraService.RETRY_COUNT );
}
@Metered( group = "core", name = "EntityManager_deleteEntity" )
public void deleteEntity( UUID entityId, String type ) throws Exception {
logger.info( "deleteEntity {} of application {}", entityId, applicationId );
EntityRef entity = new SimpleEntityRef( type, entityId );
logger.info( "deleteEntity: {} is of type {}", entityId, entity.getType() );
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
UUID timestampUuid = newTimeUUID();
long timestamp = getTimestampInMicros( timestampUuid );
// get all connections and disconnect them
getRelationManager( ref( type, entityId ) ).batchDisconnect( m, timestampUuid );
// delete all core properties and any dynamic property that's ever been
// dictionary for this entity
Set<String> properties = getPropertyNames( entity );
if ( properties != null ) {
for ( String propertyName : properties ) {
m = batchSetProperty( m, entity, propertyName, null, true, false, timestampUuid );
}
}
// delete any core dictionaries and dynamic dictionaries associated with
// this entity
Set<String> dictionaries = getDictionaryNames( entity );
if ( dictionaries != null ) {
for ( String dictionary : dictionaries ) {
Set<Object> values = getDictionaryAsSet( entity, dictionary );
if ( values != null ) {
for ( Object value : values ) {
batchUpdateDictionary( m, entity, dictionary, value, true, timestampUuid );
}
}
}
}
// find all the containing collections
getRelationManager( entity ).batchRemoveFromContainers( m, timestampUuid );
//decrease entity count
if ( !TYPE_APPLICATION.equals( entity.getType() ) ) {
String collection_name = Schema.defaultCollectionName( entity.getType() );
decrementEntityCollection( collection_name );
}
timestamp += 1;
if ( dictionaries != null ) {
for ( String dictionary : dictionaries ) {
ApplicationCF cf =
getDefaultSchema().hasDictionary( entity.getType(), dictionary ) ? ENTITY_DICTIONARIES :
ENTITY_COMPOSITE_DICTIONARIES;
addDeleteToMutator( m, cf, key( entity.getUuid(), dictionary ), timestamp );
}
}
addDeleteToMutator( m, ENTITY_PROPERTIES, key( entityId ), timestamp );
batchExecute( m, CassandraService.RETRY_COUNT );
}
@Override
public void delete( EntityRef entityRef ) throws Exception {
deleteEntity( entityRef.getUuid(), entityRef.getType() );
}
public void batchCreateRole( Mutator<ByteBuffer> batch, UUID groupId, String roleName, String roleTitle,
long inactivity, RoleRef roleRef, UUID timestampUuid ) throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
if ( roleRef == null ) {
roleRef = new SimpleRoleRef( groupId, roleName );
}
if ( roleTitle == null ) {
roleTitle = roleRef.getRoleName();
}
EntityRef ownerRef = null;
if ( roleRef.getGroupId() != null ) {
ownerRef = new SimpleEntityRef( Group.ENTITY_TYPE, roleRef.getGroupId() );
}
else {
ownerRef = new SimpleEntityRef( Application.ENTITY_TYPE, applicationId );
}
Map<String, Object> properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
properties.put( PROPERTY_TYPE, Role.ENTITY_TYPE );
properties.put( "group", roleRef.getGroupId() );
properties.put( PROPERTY_NAME, roleRef.getApplicationRoleName() );
properties.put( "roleName", roleRef.getRoleName() );
properties.put( "title", roleTitle );
properties.put( PROPERTY_INACTIVITY, inactivity );
Entity role = batchCreate( batch, Role.ENTITY_TYPE, null, properties, roleRef.getUuid(), timestampUuid );
addInsertToMutator( batch, ENTITY_DICTIONARIES, key( ownerRef.getUuid(), Schema.DICTIONARY_ROLENAMES ),
roleRef.getRoleName(), roleTitle, timestamp );
addInsertToMutator( batch, ENTITY_DICTIONARIES, key( ownerRef.getUuid(), Schema.DICTIONARY_ROLETIMES ),
roleRef.getRoleName(), inactivity, timestamp );
addInsertToMutator( batch, ENTITY_DICTIONARIES, key( ownerRef.getUuid(), DICTIONARY_SETS ),
Schema.DICTIONARY_ROLENAMES, null, timestamp );
if ( roleRef.getGroupId() != null ) {
getRelationManager( ownerRef ).batchAddToCollection( batch, COLLECTION_ROLES, role, timestampUuid );
}
}
@Override
public EntityRef getUserByIdentifier( Identifier identifier ) throws Exception {
if ( identifier == null ) {
return null;
}
if ( identifier.isUUID() ) {
return new SimpleEntityRef( "user", identifier.getUUID() );
}
if ( identifier.isName() ) {
return this.getAlias(
new SimpleEntityRef(Application.ENTITY_TYPE, applicationId),
"user", identifier.getName() );
}
if ( identifier.isEmail() ) {
Query query = new Query();
query.setEntityType( "user" );
query.addEqualityFilter( "email", identifier.getEmail() );
query.setLimit( 1 );
query.setResultsLevel( REFS );
Results r = getRelationManager(
ref( Application.ENTITY_TYPE, applicationId ) ).searchCollection( "users", query );
if ( r != null && r.getRef() != null ) {
return r.getRef();
}
else {
// look-aside as it might be an email in the name field
return this.getAlias(
new SimpleEntityRef(Application.ENTITY_TYPE, applicationId),
"user", identifier.getEmail() );
}
}
return null;
}
@Override
public EntityRef getGroupByIdentifier( Identifier identifier ) throws Exception {
if ( identifier == null ) {
return null;
}
if ( identifier.isUUID() ) {
return new SimpleEntityRef( "group", identifier.getUUID() );
}
if ( identifier.isName() ) {
return this.getAlias(
new SimpleEntityRef(Application.ENTITY_TYPE, applicationId),
"group", identifier.getName() );
}
return null;
}
@Override
public Results getAggregateCounters( UUID userId, UUID groupId, String category,
String counterName, CounterResolution resolution, long start, long finish, boolean pad ) {
return this.getAggregateCounters(
userId, groupId, null, category, counterName, resolution, start, finish, pad );
}
@Override
@Metered( group = "core", name = "EntityManager_getAggregateCounters" )
public Results getAggregateCounters( UUID userId, UUID groupId, UUID queueId, String category,
String counterName, CounterResolution resolution, long start, long finish, boolean pad ) {
start = resolution.round( start );
finish = resolution.round( finish );
long expected_time = start;
Keyspace ko = cass.getApplicationKeyspace( applicationId );
SliceCounterQuery<String, Long> q = createCounterSliceQuery( ko, se, le );
q.setColumnFamily( APPLICATION_AGGREGATE_COUNTERS.toString() );
q.setRange( start, finish, false, ALL_COUNT );
QueryResult<CounterSlice<Long>> r = q.setKey(
counterUtils.getAggregateCounterRow(
counterName, userId, groupId, queueId, category, resolution ) ).execute();
List<AggregateCounter> counters = new ArrayList<AggregateCounter>();
for ( HCounterColumn<Long> column : r.get().getColumns() ) {
AggregateCounter count = new AggregateCounter( column.getName(), column.getValue() );
if ( pad && !( resolution == CounterResolution.ALL ) ) {
while ( count.getTimestamp() != expected_time ) {
counters.add( new AggregateCounter( expected_time, 0 ) );
expected_time = resolution.next( expected_time );
}
expected_time = resolution.next( expected_time );
}
counters.add( count );
}
if ( pad && !( resolution == CounterResolution.ALL ) ) {
while ( expected_time <= finish ) {
counters.add( new AggregateCounter( expected_time, 0 ) );
expected_time = resolution.next( expected_time );
}
}
return Results.fromCounters( new AggregateCounterSet( counterName, userId, groupId, category, counters ) );
}
@Override
@Metered( group = "core", name = "EntityManager_getAggregateCounters_fromQueryObj" )
public Results getAggregateCounters( Query query ) throws Exception {
CounterResolution resolution = query.getResolution();
if ( resolution == null ) {
resolution = CounterResolution.ALL;
}
long start = query.getStartTime() != null ? query.getStartTime() : 0;
long finish = query.getFinishTime() != null ? query.getFinishTime() : 0;
boolean pad = query.isPad();
if ( start <= 0 ) {
start = 0;
}
if ( ( finish <= 0 ) || ( finish < start ) ) {
finish = System.currentTimeMillis();
}
start = resolution.round( start );
finish = resolution.round( finish );
long expected_time = start;
if ( pad && ( resolution != CounterResolution.ALL ) ) {
long max_counters = ( finish - start ) / resolution.interval();
if ( max_counters > 1000 ) {
finish = resolution.round( start + ( resolution.interval() * 1000 ) );
}
}
List<CounterFilterPredicate> filters = query.getCounterFilters();
if ( filters == null ) {
return null;
}
Map<String, AggregateCounterSelection> selections = new HashMap<String, AggregateCounterSelection>();
Keyspace ko = cass.getApplicationKeyspace( applicationId );
for ( CounterFilterPredicate filter : filters ) {
AggregateCounterSelection selection =
new AggregateCounterSelection( filter.getName(), getUuid( getUserByIdentifier( filter.getUser() ) ),
getUuid( getGroupByIdentifier( filter.getGroup() ) ),
org.apache.usergrid.mq.Queue.getQueueId( filter.getQueue() ), filter.getCategory() );
selections.put( selection.getRow( resolution ), selection );
}
MultigetSliceCounterQuery<String, Long> q = HFactory.createMultigetSliceCounterQuery( ko, se, le );
q.setColumnFamily( APPLICATION_AGGREGATE_COUNTERS.toString() );
q.setRange( start, finish, false, ALL_COUNT );
QueryResult<CounterRows<String, Long>> rows = q.setKeys( selections.keySet() ).execute();
List<AggregateCounterSet> countSets = new ArrayList<AggregateCounterSet>();
for ( CounterRow<String, Long> r : rows.get() ) {
expected_time = start;
List<AggregateCounter> counters = new ArrayList<AggregateCounter>();
for ( HCounterColumn<Long> column : r.getColumnSlice().getColumns() ) {
AggregateCounter count = new AggregateCounter( column.getName(), column.getValue() );
if ( pad && ( resolution != CounterResolution.ALL ) ) {
while ( count.getTimestamp() != expected_time ) {
counters.add( new AggregateCounter( expected_time, 0 ) );
expected_time = resolution.next( expected_time );
}
expected_time = resolution.next( expected_time );
}
counters.add( count );
}
if ( pad && ( resolution != CounterResolution.ALL ) ) {
while ( expected_time <= finish ) {
counters.add( new AggregateCounter( expected_time, 0 ) );
expected_time = resolution.next( expected_time );
}
}
AggregateCounterSelection selection = selections.get( r.getKey() );
countSets.add( new AggregateCounterSet( selection.getName(), selection.getUserId(), selection.getGroupId(),
selection.getCategory(), counters ) );
}
Collections.sort( countSets, new Comparator<AggregateCounterSet>() {
@Override
public int compare( AggregateCounterSet o1, AggregateCounterSet o2 ) {
String s1 = o1.getName();
String s2 = o2.getName();
return s1.compareTo( s2 );
}
} );
return Results.fromCounters( countSets );
}
@Override
@Metered( group = "core", name = "EntityManager_getEntityCounters" )
public Map<String, Long> getEntityCounters( UUID entityId ) throws Exception {
Map<String, Long> counters = new HashMap<String, Long>();
Keyspace ko = cass.getApplicationKeyspace( applicationId );
SliceCounterQuery<UUID, String> q = createCounterSliceQuery( ko, ue, se );
q.setColumnFamily( ENTITY_COUNTERS.toString() );
q.setRange( null, null, false, ALL_COUNT );
QueryResult<CounterSlice<String>> r = q.setKey( entityId ).execute();
for ( HCounterColumn<String> column : r.get().getColumns() ) {
counters.put( column.getName(), column.getValue() );
}
return counters;
}
@Override
public Map<String, Long> getApplicationCounters() throws Exception {
return getEntityCounters( applicationId );
}
@Override
@Metered( group = "core", name = "EntityManager_createApplicationCollection" )
public void createApplicationCollection( String entityType ) throws Exception {
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
long timestamp = cass.createTimestamp();
String collection_name = Schema.defaultCollectionName( entityType );
// Add name of collection to dictionary property Application.collections
addInsertToMutator( m, ENTITY_DICTIONARIES, key( applicationId, Schema.DICTIONARY_COLLECTIONS ),
collection_name, null, timestamp );
batchExecute( m, CassandraService.RETRY_COUNT );
}
@Override
public EntityRef getAlias( String aliasType, String alias ) throws Exception {
return getAlias( new SimpleEntityRef(Application.ENTITY_TYPE, applicationId), aliasType, alias );
}
@Override
public EntityRef validate( EntityRef entityRef ) throws Exception {
return validate( entityRef, true );
}
public EntityRef validate( EntityRef entityRef, boolean verify ) throws Exception {
if ( ( entityRef == null ) || ( entityRef.getUuid() == null ) ) {
if(verify){
throw new EntityNotFoundException( "An unknown entity cannot be verified" );
}
return null;
}
if ( ( entityRef.getType() == null ) || verify ) {
UUID entityId = entityRef.getUuid();
String entityType = entityRef.getType();
try {
get( entityRef ).getType();
}
catch ( Exception e ) {
logger.error( "Unable to load entity: {}", new Object[] {entityRef.getUuid(), e} );
}
if ( entityRef == null ) {
throw new EntityNotFoundException( "Entity " + entityId.toString() + " cannot be verified" );
}
if ( ( entityType != null ) && !entityType.equalsIgnoreCase( entityRef.getType() ) ) {
throw new UnexpectedEntityTypeException(
"Entity " + entityId + " is not the expected type, expected " + entityType + ", found "
+ entityRef.getType() );
}
}
return entityRef;
}
public String getType( EntityRef entity ) throws Exception {
if ( entity.getType() != null ) {
return entity.getType();
}
return getEntityType( entity.getUuid() );
}
@Override
public Entity get(UUID id) throws Exception {
return getEntity( id, null );
}
@Override
public Entity get( EntityRef entityRef ) throws Exception {
if ( entityRef == null ) {
return null;
}
return getEntity( entityRef.getUuid(), null );
}
@Override
public <A extends Entity> A get( EntityRef entityRef, Class<A> entityClass ) throws Exception {
if ( entityRef == null ) {
return null;
}
return get( entityRef.getUuid(), entityClass );
}
@SuppressWarnings( "unchecked" )
@Override
public <A extends Entity> A get( UUID entityId, Class<A> entityClass ) throws Exception {
A e = null;
try {
e = ( A ) getEntity( entityId, ( Class<Entity> ) entityClass );
}
catch ( ClassCastException e1 ) {
logger.error( "Unable to get typed entity: {} of class {}",
new Object[] {entityId, entityClass.getCanonicalName(), e1} );
}
return e;
}
@Override
public Results get( Collection<UUID> entityIds, Class<? extends Entity> entityClass,
Level resultsLevel ) throws Exception {
return fromEntities( getEntities( entityIds, entityClass ) );
}
@Override
public Results get( Collection<UUID> entityIds, String entityType, Class<? extends Entity> entityClass,
Level resultsLevel ) throws Exception {
return fromEntities( getEntities( entityIds, entityClass ) );
}
public Results loadEntities( Results results, Level resultsLevel, int count ) throws Exception {
return loadEntities( results, resultsLevel, null, count );
}
public Results loadEntities( Results results, Level resultsLevel,
Map<UUID, UUID> associatedMap, int count ) throws Exception {
results = results.trim( count );
if ( resultsLevel.ordinal() <= results.getLevel().ordinal() ) {
return results;
}
results.setEntities( getEntities( results.getIds(), (Class)null ) );
if ( resultsLevel == Level.LINKED_PROPERTIES ) {
List<Entity> entities = results.getEntities();
BiMap<UUID, UUID> associatedIds = null;
if ( associatedMap != null ) {
associatedIds = HashBiMap.create( associatedMap );
}
else {
associatedIds = HashBiMap.create( entities.size() );
for ( Entity entity : entities ) {
Object id = entity.getMetadata( PROPERTY_ASSOCIATED );
if ( id instanceof UUID ) {
associatedIds.put( entity.getUuid(), ( UUID ) id );
}
}
}
List<DynamicEntity> linked = getEntities(
new ArrayList<UUID>( associatedIds.values() ), (Class)null );
for ( DynamicEntity l : linked ) {
Map<String, Object> p = l.getDynamicProperties();
if ( ( p != null ) && ( p.size() > 0 ) ) {
Entity e = results.getEntitiesMap().get( associatedIds.inverse().get( l.getUuid() ) );
if ( l.getType().endsWith( TYPE_MEMBER ) ) {
e.setProperty( TYPE_MEMBER, p );
}
else if ( l.getType().endsWith( TYPE_CONNECTION ) ) {
e.setProperty( TYPE_CONNECTION, p );
}
}
}
}
return results;
}
@Override
public void update( Entity entity ) throws Exception {
updateProperties( entity.getUuid(), entity.getType(), entity.getProperties() );
}
@Override
public Object getProperty( EntityRef entityRef, String propertyName ) throws Exception {
Entity entity = loadPartialEntity( entityRef.getUuid(), propertyName );
if ( entity == null ) {
return null;
}
return entity.getProperty( propertyName );
}
@Override
public Map<String, Object> getProperties( EntityRef entityRef ) throws Exception {
Entity entity = loadPartialEntity( entityRef.getUuid() );
Map<String, Object> props = entity.getProperties();
return props;
}
@Override
public List<Entity> getPartialEntities( Collection<UUID> ids, Collection<String> fields ) throws Exception {
List<Entity> entities = new ArrayList<Entity>( ids.size() );
if ( ids == null || ids.size() == 0 ) {
return entities;
}
fields.add( PROPERTY_UUID );
fields.add( PROPERTY_TYPE );
Rows<UUID, String, ByteBuffer> results = null;
results = cass.getRows( cass.getApplicationKeyspace( applicationId ), ENTITY_PROPERTIES, ids, fields, ue, se,
be );
if ( results == null ) {
return entities;
}
for ( Row<UUID, String, ByteBuffer> row : results ) {
Map<String, Object> properties =
deserializeEntityProperties( results.getByKey( row.getKey() ).getColumnSlice().getColumns(), true,
false );
//Could get a tombstoned row if the index is behind, just ignore it
if ( properties == null ) {
logger.warn( "Received row key {} with no type or properties, skipping", row.getKey() );
continue;
}
UUID id = uuid( properties.get( PROPERTY_UUID ) );
String type = string( properties.get( PROPERTY_TYPE ) );
if ( id == null || type == null ) {
logger.warn( "Error retrieving entity with key {} no type or id deseriazable, skipping", row.getKey() );
continue;
}
Entity entity = EntityFactory.newEntity( id, type );
entity.setProperties( properties );
entities.add( entity );
}
return entities;
}
@Override
public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception {
setProperty( entityRef, propertyName, propertyValue, false );
}
@Override
public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue, boolean override )
throws Exception {
if ( ( propertyValue instanceof String ) && ( ( String ) propertyValue ).equals( "" ) ) {
propertyValue = null;
}
// todo: would this ever need to load more?
DynamicEntity entity = loadPartialEntity( entityRef.getUuid(), propertyName );
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
propertyValue = getDefaultSchema().validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
entity.setProperty( propertyName, propertyValue );
batch = batchSetProperty( batch, entity, propertyName, propertyValue, override, false, timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public void updateProperties( EntityRef entityRef, Map<String, Object> properties ) throws Exception {
entityRef = validate( entityRef );
properties = getDefaultSchema().cleanUpdatedProperties( entityRef.getType(), properties, false );
updateProperties( entityRef.getUuid(), entityRef.getType(), properties );
}
@Override
public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue ) throws Exception {
addToDictionary( entityRef, dictionaryName, elementValue, null );
}
@Override
public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue,
Object elementCoValue ) throws Exception {
if ( elementValue == null ) {
return;
}
EntityRef entity = get( entityRef );
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, elementCoValue, false,
timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public void addSetToDictionary( EntityRef entityRef, String dictionaryName, Set<?> elementValues )
throws Exception {
if ( ( elementValues == null ) || elementValues.isEmpty() ) {
return;
}
EntityRef entity = get( entityRef );
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
for ( Object elementValue : elementValues ) {
batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, null, false, timestampUuid );
}
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public void addMapToDictionary( EntityRef entityRef, String dictionaryName, Map<?, ?> elementValues )
throws Exception {
if ( ( elementValues == null ) || elementValues.isEmpty() || entityRef == null ) {
return;
}
EntityRef entity = get( entityRef );
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
for ( Map.Entry<?, ?> elementValue : elementValues.entrySet() ) {
batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue.getKey(),
elementValue.getValue(), false, timestampUuid );
}
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public void removeFromDictionary( EntityRef entityRef, String dictionaryName, Object elementValue )
throws Exception {
if ( elementValue == null ) {
return;
}
EntityRef entity = get( entityRef );
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, true, timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public Set<String> getDictionaries( EntityRef entity ) throws Exception {
return getDictionaryNames( entity );
}
@Override
public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
setProperty( entityRef, propertyName, null );
}
@Override
public Set<String> getApplicationCollections() throws Exception {
Set<String> collections = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
Set<String> dynamic_collections = cast( getDictionaryAsSet( getApplicationRef(), DICTIONARY_COLLECTIONS ) );
if ( dynamic_collections != null ) {
for ( String collection : dynamic_collections ) {
if ( !Schema.isAssociatedEntityType( collection ) ) {
collections.add( collection );
}
}
}
Set<String> system_collections = getDefaultSchema().getCollectionNames( Application.ENTITY_TYPE );
if ( system_collections != null ) {
for ( String collection : system_collections ) {
if ( !Schema.isAssociatedEntityType( collection ) ) {
collections.add( collection );
}
}
}
return collections;
}
@Override
public long getApplicationCollectionSize( String collectionName ) throws Exception {
Long count = null;
if ( !Schema.isAssociatedEntityType( collectionName ) ) {
Map<String, Long> counts = getApplicationCounters();
count = counts.get( new String( APPLICATION_COLLECTION + collectionName ) );
}
return count != null ? count : 0;
}
@Override
public Map<String, Object> getApplicationCollectionMetadata() throws Exception {
Set<String> collections = getApplicationCollections();
Map<String, Long> counts = getApplicationCounters();
Map<String, Object> metadata = new HashMap<String, Object>();
if ( collections != null ) {
for ( String collectionName : collections ) {
if ( !Schema.isAssociatedEntityType( collectionName ) ) {
Long count = counts.get( APPLICATION_COLLECTION + collectionName );
/*
* int count = emf .countColumns(
* getApplicationKeyspace(applicationId),
* ApplicationCF.ENTITY_ID_SETS, key(applicationId,
* DICTIONARY_COLLECTIONS, collectionName));
*/
Map<String, Object> entry = new HashMap<String, Object>();
entry.put( "count", count != null ? count : 0 );
entry.put( "type", singularize( collectionName ) );
entry.put( "name", collectionName );
entry.put( "title", capitalize( collectionName ) );
metadata.put( collectionName, entry );
}
}
}
/*
* if ((counts != null) && !counts.isEmpty()) { metadata.put("counters",
* counts); }
*/
return metadata;
}
public Object getRolePermissionsKey( String roleName ) {
return key( getIdForRoleName( roleName ), DICTIONARY_PERMISSIONS );
}
public Object getRolePermissionsKey( UUID groupId, String roleName ) {
return key( getIdForGroupIdAndRoleName( groupId, roleName ), DICTIONARY_PERMISSIONS );
}
public EntityRef userRef( UUID userId ) {
return ref( User.ENTITY_TYPE, userId );
}
public EntityRef groupRef( UUID groupId ) {
return ref( Group.ENTITY_TYPE, groupId );
}
public EntityRef roleRef( String roleName ) {
return ref( TYPE_ROLE, getIdForRoleName( roleName ) );
}
public EntityRef roleRef( UUID groupId, String roleName ) {
return ref( TYPE_ROLE, getIdForGroupIdAndRoleName( groupId, roleName ) );
}
@Override
public Map<String, String> getRoles() throws Exception {
return cast( getDictionaryAsMap( getApplicationRef(), DICTIONARY_ROLENAMES ) );
}
@Override
public String getRoleTitle( String roleName ) throws Exception {
String title = string( getDictionaryElementValue( getApplicationRef(), DICTIONARY_ROLENAMES, roleName ) );
if ( title == null ) {
title = roleName;
}
return title;
}
@Override
public Map<String, Role> getRolesWithTitles( Set<String> roleNames ) throws Exception {
Map<String, Role> rolesWithTitles = new HashMap<String, Role>();
Map<String, Object> nameResults = null;
if ( roleNames != null ) {
nameResults = getDictionaryElementValues( getApplicationRef(), DICTIONARY_ROLENAMES,
roleNames.toArray(new String[roleNames.size()]));
}
else {
nameResults = cast( getDictionaryAsMap( getApplicationRef(), DICTIONARY_ROLENAMES ) );
roleNames = nameResults.keySet();
}
Map<String, Object> timeResults = getDictionaryElementValues( getApplicationRef(), DICTIONARY_ROLETIMES,
roleNames.toArray(new String[roleNames.size()]));
for ( String roleName : roleNames ) {
String savedTitle = string( nameResults.get( roleName ) );
// no title, skip the role
if ( savedTitle == null ) {
continue;
}
Role newRole = new Role();
newRole.setName( roleName );
newRole.setTitle( savedTitle );
newRole.setInactivity( getLong( timeResults.get( roleName ) ) );
rolesWithTitles.put( roleName, newRole );
}
return rolesWithTitles;
}
@Override
public Entity createRole( String roleName, String roleTitle, long inactivity ) throws Exception {
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
batchCreateRole( batch, null, roleName, roleTitle, inactivity, null, timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
return get( roleRef( roleName ) );
}
@Override
public void grantRolePermission( String roleName, String permission ) throws Exception {
roleName = roleName.toLowerCase();
permission = permission.toLowerCase();
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ), permission,
ByteBuffer.allocate( 0 ), timestamp );
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public void grantRolePermissions( String roleName, Collection<String> permissions ) throws Exception {
roleName = roleName.toLowerCase();
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
for ( String permission : permissions ) {
permission = permission.toLowerCase();
addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ), permission,
ByteBuffer.allocate( 0 ), timestamp );
}
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public void revokeRolePermission( String roleName, String permission ) throws Exception {
roleName = roleName.toLowerCase();
permission = permission.toLowerCase();
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
CassandraPersistenceUtils
.addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ),
permission, timestamp );
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public Set<String> getRolePermissions( String roleName ) throws Exception {
roleName = roleName.toLowerCase();
return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES,
getRolePermissionsKey( roleName ) );
}
@Override
public void deleteRole( String roleName ) throws Exception {
roleName = roleName.toLowerCase();
removeFromDictionary( getApplicationRef(), DICTIONARY_ROLENAMES, roleName );
removeFromDictionary( getApplicationRef(), DICTIONARY_ROLETIMES, roleName );
delete( roleRef( roleName ) );
}
public CollectionRef memberRef( UUID groupId, UUID userId ) {
return new SimpleCollectionRef( groupRef( groupId ), COLLECTION_USERS, userRef( userId ) );
}
@Override
public Map<String, String> getGroupRoles( UUID groupId ) throws Exception {
return cast( getDictionaryAsMap( groupRef( groupId ), DICTIONARY_ROLENAMES ) );
}
@Override
public Entity createGroupRole( UUID groupId, String roleName, long inactivity ) throws Exception {
UUID timestampUuid = newTimeUUID();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
batchCreateRole( batch, groupId, roleName, null, inactivity, null, timestampUuid );
batchExecute( batch, CassandraService.RETRY_COUNT );
return get( roleRef( groupId, roleName ) );
}
@Override
public void grantGroupRolePermission( UUID groupId, String roleName, String permission ) throws Exception {
roleName = roleName.toLowerCase();
permission = permission.toLowerCase();
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( groupId, roleName ),
permission, ByteBuffer.allocate( 0 ), timestamp );
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public void revokeGroupRolePermission( UUID groupId, String roleName, String permission ) throws Exception {
roleName = roleName.toLowerCase();
permission = permission.toLowerCase();
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
CassandraPersistenceUtils.addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
getRolePermissionsKey( groupId, roleName ), permission, timestamp );
batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
public Set<String> getGroupRolePermissions( UUID groupId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES,
getRolePermissionsKey( groupId, roleName ) );
}
@Override
public void deleteGroupRole( UUID groupId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
removeFromDictionary( groupRef( groupId ), DICTIONARY_ROLENAMES, roleName );
cass.deleteRow( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES,
getIdForGroupIdAndRoleName( groupId, roleName ) );
}
@Override
public Set<String> getUserRoles( UUID userId ) throws Exception {
return cast( getDictionaryAsSet( userRef( userId ), DICTIONARY_ROLENAMES ) );
}
@SuppressWarnings( "unchecked" )
@Override
public Map<String, Role> getUserRolesWithTitles( UUID userId ) throws Exception {
return getRolesWithTitles(
( Set<String> ) cast( getDictionaryAsSet( userRef( userId ), DICTIONARY_ROLENAMES ) ) );
}
@Override
public void addUserToRole( UUID userId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
addToDictionary( userRef( userId ), DICTIONARY_ROLENAMES, roleName, roleName );
addToCollection( userRef( userId ), COLLECTION_ROLES, roleRef( roleName ) );
// addToCollection(roleRef(roleName), COLLECTION_USERS,
// userRef(userId));
}
@Override
public void removeUserFromRole( UUID userId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
removeFromDictionary( userRef( userId ), DICTIONARY_ROLENAMES, roleName );
removeFromCollection( userRef( userId ), COLLECTION_ROLES, roleRef( roleName ) );
// removeFromCollection(roleRef(roleName), COLLECTION_USERS,
// userRef(userId));
}
@Override
public Set<String> getUserPermissions( UUID userId ) throws Exception {
return cast( getDictionaryAsSet( userRef( userId ), Schema.DICTIONARY_PERMISSIONS ) );
}
@Override
public void grantUserPermission( UUID userId, String permission ) throws Exception {
permission = permission.toLowerCase();
addToDictionary( userRef( userId ), DICTIONARY_PERMISSIONS, permission );
}
@Override
public void revokeUserPermission( UUID userId, String permission ) throws Exception {
permission = permission.toLowerCase();
removeFromDictionary( userRef( userId ), DICTIONARY_PERMISSIONS, permission );
}
@Override
public Map<String, String> getUserGroupRoles( UUID userId, UUID groupId ) throws Exception {
// TODO this never returns anything - write path not invoked
return cast( getDictionaryAsMap( memberRef( groupId, userId ), DICTIONARY_ROLENAMES ) );
}
@Override
public void addUserToGroupRole( UUID userId, UUID groupId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
EntityRef memberRef = memberRef( groupId, userId );
EntityRef roleRef = roleRef( groupId, roleName );
addToDictionary( memberRef, DICTIONARY_ROLENAMES, roleName, roleName );
addToCollection( memberRef, COLLECTION_ROLES, roleRef );
addToCollection( roleRef, COLLECTION_USERS, userRef( userId ) );
}
@Override
public void removeUserFromGroupRole( UUID userId, UUID groupId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
EntityRef memberRef = memberRef( groupId, userId );
EntityRef roleRef = roleRef( groupId, roleName );
removeFromDictionary( memberRef, DICTIONARY_ROLENAMES, roleName );
removeFromCollection( memberRef, COLLECTION_ROLES, roleRef );
removeFromCollection( roleRef, COLLECTION_USERS, userRef( userId ) );
}
@Override
public Results getUsersInGroupRole( UUID groupId, String roleName, Level level ) throws Exception {
EntityRef roleRef = roleRef( groupId, roleName );
return this.getCollection( roleRef, COLLECTION_USERS, null, 10000, level, false );
}
@Override
public void incrementAggregateCounters( UUID userId, UUID groupId, String category, String counterName,
long value ) {
long cassandraTimestamp = cass.createTimestamp();
incrementAggregateCounters( userId, groupId, category, counterName, value, cassandraTimestamp );
}
private void incrementAggregateCounters( UUID userId, UUID groupId, String category, String counterName, long value,
long cassandraTimestamp ) {
// TODO short circuit
if ( !skipAggregateCounters ) {
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
counterUtils
.batchIncrementAggregateCounters( m, applicationId, userId, groupId, null, category, counterName,
value, cassandraTimestamp / 1000, cassandraTimestamp );
batchExecute( m, CassandraService.RETRY_COUNT );
}
}
@Override
public void incrementAggregateCounters( UUID userId, UUID groupId, String category, Map<String, Long> counters ) {
// TODO shortcircuit
if ( !skipAggregateCounters ) {
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
be );
counterUtils.batchIncrementAggregateCounters( m, applicationId, userId, groupId, null, category, counters,
timestamp );
batchExecute( m, CassandraService.RETRY_COUNT );
}
}
@Override
public Set<String> getCounterNames() throws Exception {
Set<String> names = new TreeSet<String>( CASE_INSENSITIVE_ORDER );
Set<String> nameSet = cast( getDictionaryAsSet( getApplicationRef(), Schema.DICTIONARY_COUNTERS ) );
names.addAll( nameSet );
return names;
}
@Override
public boolean isPropertyValueUniqueForEntity( String entityType, String propertyName, Object propertyValue )
throws Exception {
return isPropertyValueUniqueForEntity( applicationId, entityType, propertyName, propertyValue );
}
@Override
public Map<String, Map<UUID, Set<String>>> getOwners( EntityRef entityRef ) throws Exception {
return getRelationManager( entityRef ).getOwners();
}
/* (non-Javadoc)
* @see org.apache.usergrid.persistence.EntityManager#isOwner(org.apache.usergrid.persistence.EntityRef,
* org.apache.usergrid.persistence.EntityRef)
*/
@Override
public boolean isCollectionMember( EntityRef owner, String collectionName, EntityRef entity ) throws Exception {
return getRelationManager( owner ).isCollectionMember( collectionName, entity );
}
/* (non-Javadoc)
* @see org.apache.usergrid.persistence.EntityManager#isConnectionMember(org.apache.usergrid.persistence.EntityRef,
* java.lang.String, org.apache.usergrid.persistence.EntityRef)
*/
@Override
public boolean isConnectionMember( EntityRef owner, String connectionName, EntityRef entity ) throws Exception {
return getRelationManager( owner ).isConnectionMember( connectionName, entity );
}
@Override
public Set<String> getCollections( EntityRef entityRef ) throws Exception {
return getRelationManager( entityRef ).getCollections();
}
@Override
public Results getCollection( EntityRef entityRef, String collectionName, UUID startResult, int count,
Level resultsLevel, boolean reversed ) throws Exception {
return getRelationManager( entityRef )
.getCollection( collectionName, startResult, count, resultsLevel, reversed );
}
@Override
public Results getCollection( UUID entityId, String collectionName, Query query, Level resultsLevel )
throws Exception {
return getRelationManager( ref( entityId ) ).getCollection( collectionName, query, resultsLevel );
}
@Override
public Entity addToCollection( EntityRef entityRef, String collectionName, EntityRef itemRef ) throws Exception {
return getRelationManager( entityRef ).addToCollection( collectionName, itemRef );
}
@Override
public Entity addToCollections( List<EntityRef> ownerEntities, String collectionName, EntityRef itemRef )
throws Exception {
return getRelationManager( itemRef ).addToCollections( ownerEntities, collectionName );
}
@Override
public Entity createItemInCollection( EntityRef entityRef, String collectionName, String itemType,
Map<String, Object> properties ) throws Exception {
return getRelationManager( entityRef ).createItemInCollection( collectionName, itemType, properties );
}
@Override
public void removeFromCollection( EntityRef entityRef, String collectionName, EntityRef itemRef ) throws Exception {
getRelationManager( entityRef ).removeFromCollection( collectionName, itemRef );
}
@Override
public void copyRelationships( EntityRef srcEntityRef, String srcRelationName, EntityRef dstEntityRef,
String dstRelationName ) throws Exception {
getRelationManager( srcEntityRef ).copyRelationships( srcRelationName, dstEntityRef, dstRelationName );
}
@Override
public Results searchCollection( EntityRef entityRef, String collectionName, Query query ) throws Exception {
return getRelationManager( entityRef ).searchCollection( collectionName, query );
}
@Override
public Set<String> getCollectionIndexes( EntityRef entity, String collectionName ) throws Exception {
return getRelationManager( entity ).getCollectionIndexes( collectionName );
}
@Override
public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
return getRelationManager( connection ).createConnection( connection );
}
@Override
public ConnectionRef createConnection( EntityRef connectingEntity, String connectionType,
EntityRef connectedEntityRef ) throws Exception {
return getRelationManager( connectingEntity ).createConnection( connectionType, connectedEntityRef );
}
@Override
public ConnectionRef createConnection( EntityRef connectingEntity, String pairedConnectionType,
EntityRef pairedEntity, String connectionType, EntityRef connectedEntityRef )
throws Exception {
return getRelationManager( connectingEntity )
.createConnection( pairedConnectionType, pairedEntity, connectionType, connectedEntityRef );
}
@Override
public ConnectionRef createConnection( EntityRef connectingEntity, ConnectedEntityRef... connections )
throws Exception {
return getRelationManager( connectingEntity ).createConnection( connections );
}
@Override
public ConnectionRef connectionRef( EntityRef connectingEntity, String connectionType,
EntityRef connectedEntityRef ) throws Exception {
return getRelationManager( connectingEntity ).connectionRef( connectionType, connectedEntityRef );
}
@Override
public ConnectionRef connectionRef( EntityRef connectingEntity, String pairedConnectionType, EntityRef pairedEntity,
String connectionType, EntityRef connectedEntityRef ) throws Exception {
return getRelationManager( connectingEntity )
.connectionRef( pairedConnectionType, pairedEntity, connectionType, connectedEntityRef );
}
@Override
public ConnectionRef connectionRef( EntityRef connectingEntity, ConnectedEntityRef... connections ) {
return getRelationManager( connectingEntity ).connectionRef( connections );
}
@Override
public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
getRelationManager( connectionRef ).deleteConnection( connectionRef );
}
@Override
public Set<String> getConnectionTypes( EntityRef ref ) throws Exception {
return getRelationManager( ref ).getConnectionTypes();
}
@Override
public Results getConnectedEntities( EntityRef entityRef, String connectionType,
String connectedEntityType, Level resultsLevel ) throws Exception {
return getRelationManager( entityRef )
.getConnectedEntities( connectionType, connectedEntityType, resultsLevel );
}
@Override
public Results getConnectingEntities( EntityRef entityRef, String connectionType,
String connectedEntityType, Level resultsLevel ) throws Exception {
return getConnectingEntities( entityRef, connectionType, connectedEntityType, resultsLevel, 0);
}
@Override
public Results getConnectingEntities( EntityRef entityRef, String connectionType,
String entityType, Level level, int count) throws Exception {
return getRelationManager( entityRef )
.getConnectingEntities( connectionType, entityType, level, count );
}
@Override
public Results searchConnectedEntities( EntityRef connectingEntity, Query query ) throws Exception {
return getRelationManager( connectingEntity ).searchConnectedEntities( query );
}
@Override
public Set<String> getConnectionIndexes( EntityRef entity, String connectionType ) throws Exception {
return getRelationManager( entity ).getConnectionIndexes( connectionType );
}
@Override
public void resetRoles() throws Exception {
try {
createRole( "admin", "Administrator", 0 );
}
catch ( Exception e ) {
logger.error( "Could not create admin role, may already exist", e );
}
try {
createRole( "default", "Default", 0 );
}
catch ( Exception e ) {
logger.error( "Could not create default role, may already exist", e );
}
try {
createRole( "guest", "Guest", 0 );
}
catch ( Exception e ) {
logger.error( "Could not create guest role, may already exist", e );
}
try {
grantRolePermissions( "default", Arrays.asList( "get,put,post,delete:/**" ) );
}
catch ( Exception e ) {
logger.error( "Could not populate default role", e );
}
try {
grantRolePermissions( "guest", Arrays.asList( "post:/users", "post:/devices", "put:/devices/*" ) );
}
catch ( Exception e ) {
logger.error( "Could not populate guest role", e );
}
}
/** @return the applicationId */
public UUID getApplicationId() {
return applicationId;
}
/** @return the cass */
public CassandraService getCass() {
return cass;
}
public GeoIndexManager getGeoIndexManager() {
GeoIndexManager gim = new GeoIndexManager();
gim.init( this );
return gim;
//return applicationContext.getAutowireCapableBeanFactory()
// .createBean(GeoIndexManager.class).init(this);
}
/** @return the indexBucketLocator */
public IndexBucketLocator getIndexBucketLocator() {
return indexBucketLocator;
}
@SuppressWarnings( "unchecked" )
@Override
public Map<String, Role> getGroupRolesWithTitles( UUID groupId ) throws Exception {
return getRolesWithTitles(
( Set<String> ) cast( getDictionaryAsSet( groupRef( groupId ), DICTIONARY_ROLENAMES ) ) );
}
@Override
public void addGroupToRole( UUID groupId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
addToDictionary( groupRef( groupId ), DICTIONARY_ROLENAMES, roleName, roleName );
addToCollection( groupRef( groupId ), COLLECTION_ROLES, roleRef( roleName ) );
}
@Override
public void removeGroupFromRole( UUID groupId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
removeFromDictionary( groupRef( groupId ), DICTIONARY_ROLENAMES, roleName );
removeFromCollection( groupRef( groupId ), COLLECTION_ROLES, roleRef( roleName ) );
}
@Override
public Set<String> getGroupPermissions( UUID groupId ) throws Exception {
return cast( getDictionaryAsSet( groupRef( groupId ), Schema.DICTIONARY_PERMISSIONS ) );
}
@Override
public void grantGroupPermission( UUID groupId, String permission ) throws Exception {
permission = permission.toLowerCase();
addToDictionary( groupRef( groupId ), DICTIONARY_PERMISSIONS, permission );
}
@Override
public void revokeGroupPermission( UUID groupId, String permission ) throws Exception {
permission = permission.toLowerCase();
removeFromDictionary( groupRef( groupId ), DICTIONARY_PERMISSIONS, permission );
}
@Override
public Results getEntities(List<UUID> ids, String type) {
ArrayList<Entity> entities = new ArrayList<Entity>();
for ( UUID uuid : ids ) {
EntityRef ref = new SimpleEntityRef( type, uuid );
Entity entity = null;
try {
entity = get( ref );
} catch (Exception ex) {
logger.warn("Entity {}/{} not found", uuid, type);
}
if ( entity != null) {
entities.add( entity );
}
}
return Results.fromEntities( entities );
}
@Override
public void refreshIndex() {
// no action necessary
}
@Override
public void createIndex() {
//no op
}
@Override
public EntityRef getGroupRoleRef( UUID ownerId, String roleName) throws Exception {
return new SimpleEntityRef( Role.ENTITY_TYPE, SimpleRoleRef.getIdForGroupIdAndRoleName( ownerId, roleName ));
}
@Override
public void flushManagerCaches() {
// no-op
}
@Override
public void reindex(EntityManagerFactory.ProgressObserver po) throws Exception {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public Health getIndexHealth() {
return Health.GREEN; // no good way to assess index status using old-school entity manager
}
}