| /* |
| * Copyright 2014 The Apache Software Foundation. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.usergrid.corepersistence; |
| |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.inject.Injector; |
| import com.yammer.metrics.annotation.Metered; |
| import static java.lang.String.CASE_INSENSITIVE_ORDER; |
| |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.lang.StringUtils; |
| |
| import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable; |
| import org.apache.usergrid.corepersistence.util.CpNamingUtils; |
| import org.apache.usergrid.persistence.AbstractEntity; |
| import org.apache.usergrid.persistence.Entity; |
| import org.apache.usergrid.persistence.EntityFactory; |
| import org.apache.usergrid.persistence.EntityManager; |
| import org.apache.usergrid.persistence.EntityManagerFactory; |
| import org.apache.usergrid.persistence.EntityRef; |
| import org.apache.usergrid.persistence.Results; |
| import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME; |
| import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; |
| import org.apache.usergrid.persistence.cassandra.CassandraService; |
| import org.apache.usergrid.persistence.cassandra.CounterUtils; |
| import org.apache.usergrid.persistence.cassandra.Setup; |
| import org.apache.usergrid.persistence.collection.CollectionScope; |
| import org.apache.usergrid.persistence.collection.EntityCollectionManager; |
| import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; |
| import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; |
| import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager; |
| import org.apache.usergrid.persistence.core.scope.ApplicationScope; |
| import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; |
| import org.apache.usergrid.persistence.core.util.Health; |
| import org.apache.usergrid.persistence.entities.Application; |
| import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException; |
| import org.apache.usergrid.persistence.graph.Edge; |
| import org.apache.usergrid.persistence.graph.GraphManager; |
| import org.apache.usergrid.persistence.graph.GraphManagerFactory; |
| import org.apache.usergrid.persistence.graph.SearchByEdgeType; |
| import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; |
| import org.apache.usergrid.persistence.index.EntityIndex; |
| import org.apache.usergrid.persistence.index.EntityIndexFactory; |
| import org.apache.usergrid.persistence.index.query.Query; |
| import org.apache.usergrid.persistence.map.MapManagerFactory; |
| import org.apache.usergrid.persistence.model.entity.Id; |
| import org.apache.usergrid.persistence.model.entity.SimpleId; |
| import org.apache.usergrid.utils.UUIDUtils; |
| import org.apache.usergrid.persistence.model.util.UUIDGenerator; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.BeansException; |
| import org.springframework.context.ApplicationContext; |
| import org.springframework.context.ApplicationContextAware; |
| import rx.Observable; |
| |
| |
| /** |
| * Implement good-old Usergrid EntityManagerFactory with the new-fangled Core Persistence API. |
| * This is where we keep track of applications and system properties. |
| */ |
| public class CpEntityManagerFactory implements EntityManagerFactory, ApplicationContextAware { |
| |
| private static final Logger logger = LoggerFactory.getLogger( CpEntityManagerFactory.class ); |
| |
| public static String IMPLEMENTATION_DESCRIPTION = "Core Persistence Entity Manager Factory 1.0"; |
| |
| private ApplicationContext applicationContext; |
| |
| private Setup setup = null; |
| |
| /** Have we already initialized the index for the management app? */ |
| private AtomicBoolean indexInitialized = new AtomicBoolean( ); |
| |
| /** Keep track of applications that already have indexes to avoid redundant re-creation. */ |
| private static final Set<UUID> applicationIndexesCreated = new HashSet<UUID>(); |
| |
| |
| // cache of already instantiated entity managers |
| private LoadingCache<UUID, EntityManager> entityManagers |
| = CacheBuilder.newBuilder().maximumSize(100).build(new CacheLoader<UUID, EntityManager>() { |
| public EntityManager load(UUID appId) { // no checked exception |
| return _getEntityManager(appId); |
| } |
| }); |
| |
| |
| private ManagerCache managerCache; |
| private DataMigrationManager dataMigrationManager; |
| |
| CassandraService cass; |
| CounterUtils counterUtils; |
| |
| |
| public CpEntityManagerFactory( |
| CassandraService cass, CounterUtils counterUtils) { |
| |
| this.cass = cass; |
| this.counterUtils = counterUtils; |
| |
| |
| } |
| |
| |
| private void init() { |
| |
| EntityManager em = getEntityManager( CpNamingUtils.SYSTEM_APP_ID); |
| |
| try { |
| if ( em.getApplication() == null ) { |
| logger.info("Creating system application"); |
| Map sysAppProps = new HashMap<String, Object>(); |
| sysAppProps.put( PROPERTY_NAME, "systemapp"); |
| em.create( CpNamingUtils.SYSTEM_APP_ID, TYPE_APPLICATION, sysAppProps ); |
| em.getApplication(); |
| em.createIndex(); |
| em.refreshIndex(); |
| } |
| |
| } catch (Exception ex) { |
| throw new RuntimeException("Fatal error creating system application", ex); |
| } |
| } |
| |
| |
| public ManagerCache getManagerCache() { |
| |
| if ( managerCache == null ) { |
| |
| // TODO: better solution for getting injector? |
| Injector injector = CpSetup.getInjector(); |
| |
| managerCache = injector.getInstance( ManagerCache.class ); |
| |
| dataMigrationManager = injector.getInstance( DataMigrationManager.class ); |
| } |
| return managerCache; |
| } |
| |
| |
| @Override |
| public String getImpementationDescription() throws Exception { |
| return IMPLEMENTATION_DESCRIPTION; |
| } |
| |
| |
| @Override |
| public EntityManager getEntityManager(UUID applicationId) { |
| try { |
| return entityManagers.get( applicationId ); |
| } |
| catch ( Exception ex ) { |
| logger.error("Error getting entity manager", ex); |
| } |
| return _getEntityManager( applicationId ); |
| } |
| |
| |
| private EntityManager _getEntityManager( UUID applicationId ) { |
| |
| EntityManager em = new CpEntityManager(); |
| em.init( this, applicationId ); |
| |
| // only need to do this once |
| if ( !applicationIndexesCreated.contains( applicationId ) ) { |
| em.createIndex(); |
| applicationIndexesCreated.add( applicationId ); |
| } |
| |
| return em; |
| } |
| |
| |
| @Override |
| public UUID createApplication(String organizationName, String name) throws Exception { |
| return createApplication( organizationName, name, null ); |
| } |
| |
| |
| @Override |
| public UUID createApplication( |
| String orgName, String name, Map<String, Object> properties) throws Exception { |
| |
| String appName = buildAppName( orgName, name ); |
| |
| UUID applicationId = lookupApplication( appName ); |
| |
| if ( applicationId != null ) { |
| throw new ApplicationAlreadyExistsException( name ); |
| } |
| |
| applicationId = UUIDGenerator.newTimeUUID(); |
| |
| logger.debug( "New application orgName {} name {} id {} ", |
| new Object[] { orgName, name, applicationId.toString() } ); |
| |
| initializeApplication( orgName, applicationId, appName, properties ); |
| return applicationId; |
| } |
| |
| |
| private String buildAppName( String organizationName, String name ) { |
| return StringUtils.lowerCase( name.contains( "/" ) ? name : organizationName + "/" + name ); |
| } |
| |
| |
| @Override |
| public UUID initializeApplication( String organizationName, UUID applicationId, String name, |
| Map<String, Object> properties ) throws Exception { |
| |
| |
| EntityManager em = getEntityManager( CpNamingUtils.SYSTEM_APP_ID); |
| |
| final String appName = buildAppName( organizationName, name ); |
| |
| // check for pre-existing application |
| if ( lookupApplication( appName ) != null ) { |
| throw new ApplicationAlreadyExistsException( appName ); |
| } |
| |
| getSetup().setupApplicationKeyspace( applicationId, appName ); |
| |
| UUID orgUuid = lookupOrganization( organizationName ); |
| if ( orgUuid == null ) { |
| |
| // create new org because the specified one does not exist |
| final String orgName = organizationName; |
| Entity orgInfo = em.create("organization", new HashMap<String, Object>() {{ |
| put( PROPERTY_NAME, orgName ); |
| }}); |
| em.refreshIndex(); |
| orgUuid = orgInfo.getUuid(); |
| } |
| |
| // create appinfo entry in the system app |
| final UUID appId = applicationId; |
| final UUID orgId = orgUuid; |
| Map<String, Object> appInfoMap = new HashMap<String, Object>() {{ |
| put( PROPERTY_NAME, appName ); |
| put( "applicationUuid", appId ); |
| put( "organizationUuid", orgId ); |
| }}; |
| Entity appInfo = em.create( "appinfo", appInfoMap ); |
| em.refreshIndex(); |
| |
| // create application entity |
| if ( properties == null ) { |
| properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER ); |
| } |
| properties.put( PROPERTY_NAME, appName ); |
| EntityManager appEm = getEntityManager( applicationId ); |
| |
| appEm.create( applicationId, TYPE_APPLICATION, properties ); |
| appEm.resetRoles(); |
| appEm.refreshIndex(); |
| |
| logger.info("Initialized application {}", appName ); |
| return applicationId; |
| } |
| |
| |
| |
| |
| @Override |
| public UUID importApplication( |
| String organization, UUID applicationId, |
| String name, Map<String, Object> properties) throws Exception { |
| |
| throw new UnsupportedOperationException("Not supported yet."); |
| } |
| |
| |
| public UUID lookupOrganization( String name ) throws Exception { |
| init(); |
| |
| |
| // Query q = Query.fromQL(PROPERTY_NAME + " = '" + name + "'"); |
| EntityManager em = getEntityManager( CpNamingUtils.SYSTEM_APP_ID ); |
| |
| |
| final EntityRef alias = em.getAlias( "organizations", name ); |
| |
| if ( alias == null ) { |
| return null; |
| } |
| |
| final Entity entity = em.get( alias ); |
| |
| if ( entity == null ) { |
| return null; |
| } |
| |
| return entity.getUuid(); |
| // Results results = em.searchCollection( em.getApplicationRef(), "organizations", q ); |
| // |
| // if ( results.isEmpty() ) { |
| // return null; |
| // } |
| // |
| // return results.iterator().next().getUuid(); |
| } |
| |
| |
| @Override |
| public UUID lookupApplication( String name ) throws Exception { |
| init(); |
| |
| EntityManager em = getEntityManager( CpNamingUtils.SYSTEM_APP_ID ); |
| |
| |
| final EntityRef alias = em.getAlias( CpNamingUtils.APPINFOS, name ); |
| |
| if ( alias == null ) { |
| return null; |
| } |
| |
| final Entity entity = em.get( alias ); |
| |
| if ( entity == null ) { |
| return null; |
| } |
| |
| |
| final UUID property = ( UUID ) entity.getProperty( "applicationUuid" ); |
| |
| return property; |
| } |
| |
| |
| @Override |
| @Metered(group = "core", name = "EntityManagerFactory_getApplication") |
| public Map<String, UUID> getApplications() throws Exception { |
| |
| Map<String, UUID> appMap = new HashMap<String, UUID>(); |
| |
| ApplicationScope appScope = CpNamingUtils.getApplicationScope( CpNamingUtils.SYSTEM_APP_ID ); |
| GraphManager gm = managerCache.getGraphManager(appScope); |
| |
| EntityManager em = getEntityManager( CpNamingUtils.SYSTEM_APP_ID); |
| Application app = em.getApplication(); |
| Id fromEntityId = new SimpleId( app.getUuid(), app.getType() ); |
| |
| String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS ); |
| |
| logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}", |
| new Object[] { edgeType, fromEntityId.getType(), fromEntityId.getUuid() } ); |
| |
| Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( |
| fromEntityId, edgeType, Long.MAX_VALUE, |
| SearchByEdgeType.Order.DESCENDING, null )); |
| |
| Iterator<Edge> iter = edges.toBlockingObservable().getIterator(); |
| while ( iter.hasNext() ) { |
| |
| Edge edge = iter.next(); |
| Id targetId = edge.getTargetNode(); |
| |
| logger.debug("getApplications(): Processing edge from {}:{} to {}:{}", new Object[] { |
| edge.getSourceNode().getType(), edge.getSourceNode().getUuid(), |
| edge.getTargetNode().getType(), edge.getTargetNode().getUuid() |
| }); |
| |
| CollectionScope collScope = new CollectionScopeImpl( |
| appScope.getApplication(), |
| appScope.getApplication(), |
| CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS )); |
| |
| org.apache.usergrid.persistence.model.entity.Entity e = |
| managerCache.getEntityCollectionManager( collScope ).load( targetId ) |
| .toBlockingObservable().lastOrDefault(null); |
| |
| appMap.put( |
| (String)e.getField( PROPERTY_NAME ).getValue(), |
| (UUID)e.getField( "applicationUuid" ).getValue()); |
| } |
| |
| return appMap; |
| } |
| |
| |
| @Override |
| public void setup() throws Exception { |
| getSetup().init(); |
| } |
| |
| |
| @Override |
| public Map<String, String> getServiceProperties() { |
| |
| Map<String, String> props = new HashMap<String,String>(); |
| |
| EntityManager em = getEntityManager( CpNamingUtils.SYSTEM_APP_ID); |
| Query q = Query.fromQL("select *"); |
| Results results = null; |
| try { |
| results = em.searchCollection( em.getApplicationRef(), "propertymaps", q); |
| |
| } catch (Exception ex) { |
| logger.error("Error getting system properties", ex); |
| } |
| |
| if ( results == null || results.isEmpty() ) { |
| return props; |
| } |
| |
| org.apache.usergrid.persistence.Entity e = results.getEntity(); |
| for ( String key : e.getProperties().keySet() ) { |
| props.put( key, props.get(key).toString() ); |
| } |
| return props; |
| } |
| |
| |
| @Override |
| public boolean updateServiceProperties(Map<String, String> properties) { |
| |
| EntityManager em = getEntityManager( CpNamingUtils.SYSTEM_APP_ID); |
| Query q = Query.fromQL("select *"); |
| Results results = null; |
| try { |
| results = em.searchCollection( em.getApplicationRef(), "propertymaps", q); |
| |
| } catch (Exception ex) { |
| logger.error("Error getting system properties", ex); |
| return false; |
| } |
| |
| org.apache.usergrid.persistence.Entity propsEntity = null; |
| |
| if ( !results.isEmpty() ) { |
| propsEntity = results.getEntity(); |
| |
| } else { |
| propsEntity = EntityFactory.newEntity( UUIDUtils.newTimeUUID(), "propertymap"); |
| } |
| |
| // intentionally going only one-level deep into fields and treating all |
| // values as strings because that is all we need for service properties |
| for ( String key : properties.keySet() ) { |
| propsEntity.setProperty( key, properties.get(key).toString() ); |
| } |
| |
| try { |
| em.update( propsEntity ); |
| |
| } catch (Exception ex) { |
| logger.error("Error updating service properties", ex); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| |
| @Override |
| public boolean setServiceProperty(final String name, final String value) { |
| return updateServiceProperties( new HashMap<String, String>() {{ |
| put(name, value); |
| }}); |
| } |
| |
| |
| @Override |
| public boolean deleteServiceProperty(String name) { |
| |
| EntityManager em = getEntityManager( CpNamingUtils.SYSTEM_APP_ID); |
| |
| |
| Query q = Query.fromQL("select *"); |
| Results results = null; |
| try { |
| results = em.searchCollection( em.getApplicationRef(), "propertymaps", q); |
| |
| } catch (Exception ex) { |
| logger.error("Error getting service property for delete of property: " + name, ex); |
| return false; |
| } |
| |
| org.apache.usergrid.persistence.Entity propsEntity = null; |
| |
| if ( !results.isEmpty() ) { |
| propsEntity = results.getEntity(); |
| |
| } else { |
| propsEntity = EntityFactory.newEntity( UUIDUtils.newTimeUUID(), "propertymap"); |
| } |
| |
| try { |
| ((AbstractEntity)propsEntity).clearDataset( name ); |
| em.update( propsEntity ); |
| |
| } catch (Exception ex) { |
| logger.error("Error deleting service property name: " + name, ex); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| public ApplicationContext getApplicationContext() { |
| return applicationContext; |
| } |
| |
| @Override |
| public void setApplicationContext( ApplicationContext applicationContext ) throws BeansException { |
| this.applicationContext = applicationContext; |
| try { |
| setup(); |
| } catch (Exception ex) { |
| logger.error("Error setting up EMF", ex); |
| } |
| } |
| |
| |
| @Override |
| public long performEntityCount() { |
| //TODO, this really needs to be a task that writes this data somewhere since this will get |
| //progressively slower as the system expands |
| return AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ).longCount().toBlocking().last(); |
| } |
| |
| |
| /** |
| * @param managerCache the managerCache to set |
| */ |
| public void setManagerCache(CpManagerCache managerCache) { |
| this.managerCache = managerCache; |
| } |
| |
| |
| @Override |
| public UUID getManagementAppId() { |
| return CpNamingUtils.MANAGEMENT_APPLICATION_ID; |
| } |
| |
| |
| @Override |
| public UUID getDefaultAppId() { |
| return CpNamingUtils.DEFAULT_APPLICATION_ID; |
| } |
| |
| |
| |
| |
| /** |
| * Gets the setup. |
| * @return Setup helper |
| */ |
| public Setup getSetup() { |
| if ( setup == null ) { |
| setup = new CpSetup( this, cass ); |
| } |
| return setup; |
| } |
| |
| |
| /** |
| * TODO, these 3 methods are super janky. During refactoring we should clean this model up |
| */ |
| public void refreshIndex() { |
| |
| // refresh special indexes without calling EntityManager refresh because stack overflow |
| maybeCreateIndexes(); |
| // system app |
| |
| for ( EntityIndex index : getManagementIndexes() ) { |
| index.refresh(); |
| } |
| } |
| |
| |
| private void maybeCreateIndexes() { |
| // system app |
| if ( indexInitialized.getAndSet( true ) ) { |
| return; |
| } |
| |
| for ( EntityIndex index : getManagementIndexes() ) { |
| index.initializeIndex(); |
| } |
| } |
| |
| |
| private List<EntityIndex> getManagementIndexes() { |
| |
| return Arrays.asList( |
| getManagerCache().getEntityIndex( |
| new ApplicationScopeImpl( new SimpleId( CpNamingUtils.SYSTEM_APP_ID, "application" ))), |
| |
| // management app |
| getManagerCache().getEntityIndex( |
| new ApplicationScopeImpl( new SimpleId( getManagementAppId(), "application" ))), |
| |
| // default app TODO: do we need this in two-dot-o |
| getManagerCache().getEntityIndex( |
| new ApplicationScopeImpl( new SimpleId( getDefaultAppId(), "application" )))); |
| } |
| |
| |
| public void rebuildAllIndexes( ProgressObserver po ) throws Exception { |
| |
| logger.info("\n\nRebuilding all indexes\n"); |
| |
| rebuildInternalIndexes( po ); |
| |
| Map<String, UUID> appMap = getApplications(); |
| |
| logger.info("About to rebuild indexes for {} applications", appMap.keySet().size()); |
| |
| for ( UUID appUuid : appMap.values() ) { |
| rebuildApplicationIndexes( appUuid, po ); |
| } |
| } |
| |
| |
| @Override |
| public void rebuildInternalIndexes( ProgressObserver po ) throws Exception { |
| rebuildApplicationIndexes( CpNamingUtils.SYSTEM_APP_ID, po); |
| rebuildApplicationIndexes( CpNamingUtils.MANAGEMENT_APPLICATION_ID, po ); |
| rebuildApplicationIndexes( CpNamingUtils.DEFAULT_APPLICATION_ID, po ); |
| } |
| |
| |
| @Override |
| public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) throws Exception { |
| |
| EntityManager em = getEntityManager( appId ); |
| |
| //explicitly invoke create index, we don't know if it exists or not in ES during a rebuild. |
| em.createIndex(); |
| Application app = em.getApplication(); |
| |
| em.reindex( po ); |
| |
| logger.info("\n\nRebuilt index for application {} id {}\n", app.getName(), appId ); |
| } |
| |
| |
| @Override |
| public void migrateData() throws Exception { |
| dataMigrationManager.migrate(); |
| } |
| |
| |
| @Override |
| public String getMigrateDataStatus() { |
| return dataMigrationManager.getLastStatus(); |
| } |
| |
| |
| @Override |
| public int getMigrateDataVersion() { |
| return dataMigrationManager.getCurrentVersion(); |
| } |
| |
| |
| @Override |
| public void setMigrationVersion( final int version ) { |
| dataMigrationManager.resetToVersion( version ); |
| dataMigrationManager.invalidate(); |
| } |
| |
| |
| @Override |
| public void flushEntityManagerCaches() { |
| Map<UUID, EntityManager> entityManagersMap = entityManagers.asMap(); |
| for ( UUID appUuid : entityManagersMap.keySet() ) { |
| EntityManager em = entityManagersMap.get(appUuid); |
| em.flushManagerCaches(); |
| } |
| } |
| |
| @Override |
| public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po ) { |
| throw new UnsupportedOperationException( "Not supported yet." ); |
| } |
| |
| @Override |
| public Health getEntityStoreHealth() { |
| |
| // could use any collection scope here, does not matter |
| EntityCollectionManager ecm = getManagerCache().getEntityCollectionManager( |
| new CollectionScopeImpl( |
| new SimpleId( CpNamingUtils.SYSTEM_APP_ID, "application"), |
| new SimpleId( CpNamingUtils.SYSTEM_APP_ID, "application"), |
| "dummy" |
| )); |
| |
| return ecm.getHealth(); |
| } |
| } |