blob: 8d9e23933e513807adc82442b01a96272db9ce19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.persistence.cassandra;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.apache.usergrid.persistence.DynamicEntity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
import org.apache.usergrid.persistence.hector.CountingMutator;
import org.apache.usergrid.utils.UUIDUtils;
import org.apache.commons.lang.StringUtils;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.yammer.metrics.annotation.Metered;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.beans.Rows;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.asMap;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
import static org.apache.usergrid.persistence.cassandra.CassandraService.APPLICATIONS_CF;
import static org.apache.usergrid.persistence.cassandra.CassandraService.PROPERTIES_CF;
import static org.apache.usergrid.persistence.cassandra.CassandraService.RETRY_COUNT;
import static org.apache.usergrid.utils.ConversionUtils.uuid;
import static org.apache.usergrid.persistence.cassandra.Serializers.*;
import org.apache.usergrid.persistence.core.util.Health;
/**
* Cassandra-specific implementation of Datastore
*
* @author edanuff
*/
public class EntityManagerFactoryImpl implements EntityManagerFactory, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger( EntityManagerFactoryImpl.class );
public static String IMPLEMENTATION_DESCRIPTION = "Cassandra Entity Manager Factory 1.0";
public static final Class<DynamicEntity> APPLICATION_ENTITY_CLASS = DynamicEntity.class;
ApplicationContext applicationContext;
CassandraService cass;
CounterUtils counterUtils;
private boolean skipAggregateCounters;
private LoadingCache<UUID, EntityManager> entityManagers =
CacheBuilder.newBuilder().maximumSize( 100 ).build( new CacheLoader<UUID, EntityManager>() {
public EntityManager load( UUID appId ) { // no checked exception
return _getEntityManager( appId );
}
} );
private static final int REBUILD_PAGE_SIZE = 100;
/**
* Must be constructed with a CassandraClientPool.
*
* @param cass the cassandraService instance
*/
public EntityManagerFactoryImpl( CassandraService cass, CounterUtils counterUtils, boolean skipAggregateCounters ) {
this.cass = cass;
this.counterUtils = counterUtils;
this.skipAggregateCounters = skipAggregateCounters;
if ( skipAggregateCounters ) {
logger.warn( "NOTE: Counters have been disabled by configuration..." );
}
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.core.Datastore#getImpementationDescription()
*/
@Override
public String getImpementationDescription() {
return IMPLEMENTATION_DESCRIPTION;
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.core.Datastore#getEntityDao(java.util.UUID,
* java.util.UUID)
*/
@Override
public EntityManager getEntityManager( UUID applicationId ) {
try {
return entityManagers.get( applicationId );
}
catch ( Exception ex ) {
ex.printStackTrace();
}
return _getEntityManager( applicationId );
}
private EntityManager _getEntityManager( UUID applicationId ) {
EntityManagerImpl em = new EntityManagerImpl();
em.init( this, cass, counterUtils, applicationId, skipAggregateCounters );
em.setApplicationId( applicationId );
return em;
}
public ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* Gets the setup.
*
* @return Setup helper
*/
public SetupImpl getSetup() {
return new SetupImpl( this, cass );
}
@Override
public void setup() throws Exception {
Setup setup = getSetup();
setup.init();
if ( cass.getPropertiesMap() != null ) {
updateServiceProperties( cass.getPropertiesMap() );
}
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.core.Datastore#createApplication(java.lang.String)
*/
@Override
public UUID createApplication( String organization, String name ) throws Exception {
return createApplication( organization, name, null );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.core.Datastore#createApplication(java.lang.String,
* java.util.Map)
*/
@Override
public UUID createApplication( String organizationName, String name, Map<String, Object> properties )
throws Exception {
String appName = buildAppName( organizationName, name );
HColumn<String, ByteBuffer> column =
cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, appName, PROPERTY_UUID );
if ( column != null ) {
throw new ApplicationAlreadyExistsException( name );
// UUID uuid = uuid(column.getValue());
// return uuid;
}
UUID applicationId = UUIDUtils.newTimeUUID();
logger.info( "New application id " + applicationId.toString() );
initializeApplication( organizationName, applicationId, appName, properties );
return applicationId;
}
private String buildAppName( String organizationName, String name ) {
return StringUtils.lowerCase( name.contains( "/" ) ? name : organizationName + "/" + name );
}
public UUID initializeApplication( String organizationName, UUID applicationId, String name,
Map<String, Object> properties ) throws Exception {
String appName = buildAppName( organizationName, name );
// check for pre-existing
if ( lookupApplication( appName ) != null ) {
throw new ApplicationAlreadyExistsException( appName );
}
if ( properties == null ) {
properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
}
properties.put( PROPERTY_NAME, appName );
getSetup().setupApplicationKeyspace( applicationId, appName );
Keyspace ko = cass.getSystemKeyspace();
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
long timestamp = cass.createTimestamp();
addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_UUID, applicationId, timestamp );
addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_NAME, appName, timestamp );
batchExecute( m, RETRY_COUNT );
EntityManager em = getEntityManager( applicationId );
em.create( TYPE_APPLICATION, APPLICATION_ENTITY_CLASS, properties );
em.resetRoles();
return applicationId;
}
@Override
public UUID importApplication( String organizationName, UUID applicationId, String name,
Map<String, Object> properties ) throws Exception {
name = buildAppName( organizationName, name );
HColumn<String, ByteBuffer> column =
cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
if ( column != null ) {
throw new ApplicationAlreadyExistsException( name );
// UUID uuid = uuid(column.getValue());
// return uuid;
}
return initializeApplication( organizationName, applicationId, name, properties );
}
@Override
@Metered(group = "core", name = "EntityManagerFactory_lookupApplication_byName")
public UUID lookupApplication( String name ) throws Exception {
name = name.toLowerCase();
HColumn<String, ByteBuffer> column =
cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
if ( column != null ) {
return uuid( column.getValue() );
}
return null;
}
/**
* Gets the application.
*
* @param name the name
*
* @return application for name
*
* @throws Exception the exception
*/
@Metered(group = "core", name = "EntityManagerFactory_getApplication")
public Application getApplication( String name ) throws Exception {
name = name.toLowerCase();
HColumn<String, ByteBuffer> column =
cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
if ( column == null ) {
return null;
}
UUID applicationId = uuid( column.getValue() );
EntityManager em = getEntityManager( applicationId );
return ( ( EntityManagerImpl ) em ).getEntity( applicationId, Application.class );
}
@Override
public Map<String, UUID> getApplications() throws Exception {
Map<String, UUID> applications = new TreeMap<String, UUID>( CASE_INSENSITIVE_ORDER );
Keyspace ko = cass.getSystemKeyspace();
RangeSlicesQuery<String, String, UUID> q = createRangeSlicesQuery( ko, se, se, ue );
q.setKeys( "", "\uFFFF" );
q.setColumnFamily( APPLICATIONS_CF );
q.setColumnNames( PROPERTY_UUID );
q.setRowCount( 10000 );
QueryResult<OrderedRows<String, String, UUID>> r = q.execute();
Rows<String, String, UUID> rows = r.get();
for ( Row<String, String, UUID> row : rows ) {
ColumnSlice<String, UUID> slice = row.getColumnSlice();
HColumn<String, UUID> column = slice.getColumnByName( PROPERTY_UUID );
applications.put( row.getKey(), column.getValue() );
}
return applications;
}
@Override
public boolean setServiceProperty( String name, String value ) {
try {
cass.setColumn( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, name, value );
return true;
}
catch ( Exception e ) {
logger.error( "Unable to set property " + name + ": " + e.getMessage() );
}
return false;
}
@Override
public boolean deleteServiceProperty( String name ) {
try {
cass.deleteColumn( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, name );
return true;
}
catch ( Exception e ) {
logger.error( "Unable to delete property " + name + ": " + e.getMessage() );
}
return false;
}
@Override
public boolean updateServiceProperties( Map<String, String> properties ) {
try {
cass.setColumns( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF.getBytes(), properties );
return true;
}
catch ( Exception e ) {
logger.error( "Unable to update properties: " + e.getMessage() );
}
return false;
}
@Override
public Map<String, String> getServiceProperties() {
try {
return asMap( cass.getAllColumns( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, se, se ) );
}
catch ( Exception e ) {
logger.error( "Unable to load properties: " + e.getMessage() );
}
return null;
}
@Override
public void setApplicationContext( ApplicationContext applicationContext ) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public long performEntityCount() {
throw new UnsupportedOperationException("Not supported in v1");
}
public void setCounterUtils( CounterUtils counterUtils ) {
this.counterUtils = counterUtils;
}
static final UUID MANAGEMENT_APPLICATION_ID = new UUID( 0, 1 );
static final UUID DEFAULT_APPLICATION_ID = new UUID( 0, 16 );
@Override
public UUID getManagementAppId() {
return MANAGEMENT_APPLICATION_ID;
}
@Override
public UUID getDefaultAppId() {
return DEFAULT_APPLICATION_ID;
}
@Override
public void refreshIndex() {
// no op
}
@Override
public void flushEntityManagerCaches() {
// no-op
}
@Override
public void rebuildInternalIndexes(ProgressObserver po) throws Exception {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public void rebuildAllIndexes(ProgressObserver po) throws Exception {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public void rebuildApplicationIndexes(UUID appId, ProgressObserver po) throws Exception {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public void migrateData() throws Exception {
}
@Override
public String getMigrateDataStatus() {
throw new UnsupportedOperationException("Not supported in v1");
}
@Override
public int getMigrateDataVersion() {
throw new UnsupportedOperationException("Not supported in v1");
}
@Override
public void setMigrationVersion( final int version ) {
throw new UnsupportedOperationException("Not supported in v1");
}
@Override
public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public Health getEntityStoreHealth() {
throw new UnsupportedOperationException("Not supported yet.");
}
}