/*
 * Copyright 2014 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.usergrid.corepersistence;


import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
import org.apache.usergrid.corepersistence.service.CollectionSearch;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.ConnectionSearch;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.Query.Level;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Group;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.graph.*;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.queue.settings.IndexConsistency;
import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import rx.Observable;

import java.util.*;

import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
import static org.apache.usergrid.persistence.Schema.*;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
import static org.apache.usergrid.utils.MapUtils.addMapSet;


/**
 * Implement good-old Usergrid RelationManager with the new-fangled Core Persistence API.
 */
public class CpRelationManager implements RelationManager {

    private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class );
    private final EntityManagerFig entityManagerFig;

    private ManagerCache managerCache;

    private EntityManager em;

    private UUID applicationId;

    private EntityRef headEntity;

    private org.apache.usergrid.persistence.model.entity.Entity cpHeadEntity;

    private final ApplicationScope applicationScope;

    private final AsyncEventService indexService;

    private final CollectionSettingsFactory collectionSettingsFactory;


    private final CollectionService collectionService;
    private final ConnectionService connectionService;


    public CpRelationManager( final ManagerCache managerCache,
                              final AsyncEventService indexService, final CollectionService collectionService,
                              final ConnectionService connectionService,
                              final EntityManager em,
                              final EntityManagerFig entityManagerFig, final UUID applicationId,
                              final CollectionSettingsFactory collectionSettingsFactory,
                              final EntityRef headEntity) {


        Assert.notNull( em, "Entity manager cannot be null" );
        Assert.notNull( applicationId, "Application Id cannot be null" );
        Assert.notNull( headEntity, "Head entity cannot be null" );
        Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
        Assert.notNull( indexService, "indexService cannot be null" );
        Assert.notNull( collectionService, "collectionService cannot be null" );
        Assert.notNull( connectionService, "connectionService cannot be null" );

        this.entityManagerFig = entityManagerFig;

        // TODO: this assert should not be failing
        //Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" );
        this.em = em;
        this.applicationId = applicationId;
        this.headEntity = headEntity;
        this.managerCache = managerCache;
        this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );

        this.collectionService = collectionService;
        this.connectionService = connectionService;

        if ( logger.isDebugEnabled() ) {
            logger.debug( "Loading head entity {}:{} from app {}",
                headEntity.getType(), headEntity.getUuid(), applicationScope
            );
        }

        Id entityId = new SimpleId( headEntity.getUuid(), headEntity.getType() );

        this.cpHeadEntity = ( ( CpEntityManager ) em ).load( entityId );

        // commented out because it is possible that CP entity has not been created yet
        Assert.notNull( cpHeadEntity, String
            .format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) );

        this.indexService = indexService;
        this.collectionSettingsFactory = collectionSettingsFactory;

    }


    @Override
    public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
        GraphManager gm = managerCache.getGraphManager( applicationScope );

        String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );

        if (logger.isTraceEnabled()) {
            logger.trace("getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}",
                edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() );
        }

        Observable<Set<String>> types =
            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) )
              .collect( () -> new HashSet<>(), ( set, type ) -> set.add( type ) );


        return types.toBlocking().last();
    }


    @Override
    public Map<String, Map<UUID, Set<String>>> getOwners() throws Exception {

        // TODO: do we need to restrict this to edges prefixed with owns?
        //Map<EntityRef, Set<String>> containerEntities = getContainers(-1, "owns", null);
        Map<EntityRef, Set<String>> containerEntities = getContainers();

        Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();

        for ( EntityRef owner : containerEntities.keySet() ) {
            Set<String> collections = containerEntities.get( owner );
            for ( String collection : collections ) {
                MapUtils.addMapMapSet( owners, owner.getType(), owner.getUuid(), collection );
            }
        }

        return owners;
    }


    private Map<EntityRef, Set<String>> getContainers() {
        return getContainers( -1, null, null );
    }


    /**
     * Gets containing collections and/or connections depending on the edge type you pass in
     *
     * @param limit Max number to return
     * @param edgeType Edge type, edge type prefix or null to allow any edge type
     * @param fromEntityType Only consider edges from entities of this type
     */
    Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) {

        final GraphManager gm = managerCache.getGraphManager( applicationScope );

        Observable<Edge> edges =
            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
              .flatMap( edgeType1 -> gm.loadEdgesToTarget(
                  new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType1, Long.MAX_VALUE,
                      SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ) );

        //if our limit is set, take them.  Note this logic is still borked, we can't possibly fit everything in memmory
        if ( limit > -1 ) {
            edges = edges.take( limit );
        }


        return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge ) -> {
            if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
                }
                return;
            }

            final EntityRef eref =
                new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );

            String name = getNameFromEdgeType( edge.getType() );
            addMapSet( entityRefSetMap, eref, name );
        } ).toBlocking().last();
    }


    @Override
    public boolean isConnectionMember( String connectionType, EntityRef entity ) throws Exception {

        Id entityId = new SimpleId( entity.getUuid(), entity.getType() );


        if (logger.isTraceEnabled()) {
            logger.trace("isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}",
                connectionType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() );
        }

        GraphManager gm = managerCache.getGraphManager( applicationScope );
        Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
            .createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType,
                entityId ) );

        return edges.toBlocking().firstOrDefault( null ) != null;
    }


    @SuppressWarnings( "unchecked" )
    @Override
    public boolean isCollectionMember( String collectionName, EntityRef entity ) throws Exception {

        Id entityId = new SimpleId( entity.getUuid(), entity.getType() );

        if (logger.isTraceEnabled()) {
            logger.trace("isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}",
                collectionName, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() );
        }

        GraphManager gm = managerCache.getGraphManager( applicationScope );
        Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
            .createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName,
                entityId ) );

        return edges.toBlocking().firstOrDefault( null ) != null;
    }


    @Override
    public Set<String> getCollections() throws Exception {

        final Set<String> indexes = new HashSet<String>();

        GraphManager gm = managerCache.getGraphManager( applicationScope );

        Observable<String> str =
            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );

        Iterator<String> iter = str.toBlocking().getIterator();
        while ( iter.hasNext() ) {
            String edgeType = iter.next();
            indexes.add( getNameFromEdgeType( edgeType ) );
        }

        return indexes;
    }


    @Override
    public Results getCollection( String collectionName, UUID startResult, int count, Level resultsLevel,
                                  boolean reversed ) throws Exception {

        final String ql;


        if (startResult != null ) {

            // UUID timestamp is a different measure than 'created' field on entities
            Calendar uuidEpoch = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
            uuidEpoch.clear();
            uuidEpoch.set(1582, 9, 15, 0, 0, 0); // 9 = October
            long epochMillis = uuidEpoch.getTime().getTime();

            long time = (startResult.timestamp() / 10000L) + epochMillis;

            if ( !reversed ) {
                ql = "select * where created > " + time;
            } else {
                ql = "select * where created < " + time;
            }

        } else {
            ql = "select *";
        }

        Query query = Query.fromQL( ql );
        if(query == null ){
            throw new RuntimeException("Unable to get data for collection: "+collectionName);
        }
        query.setLimit( count );
        query.setReversed( reversed );

        return searchCollection( collectionName, query );
    }


    @Override
    public Results getCollection( String collectionName, Query query, Level level ) throws Exception {

        return searchCollection( collectionName, query );
    }


    // add to a named collection of the head entity
    @Override
    public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {

        Preconditions.checkNotNull( itemRef, "itemref is null" );
        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
        if ( ( collection != null && collection.getType() != null ) && !collection.getType()
                                                                                  .equals( itemRef.getType() ) ) {
            return null;
        }


        Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
        org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId );

        Id memberEntityId = memberEntity.getId();

        // don't fetch entity if we've already got one
        final Entity itemEntity;
        if ( itemRef instanceof Entity ) {
            itemEntity = ( Entity ) itemRef;
        }
        else {
            itemEntity = em.get( itemRef );
        }

        if ( itemEntity == null ) {
            return null;
        }


        if ( memberEntityId == null ) {
            throw new RuntimeException(
                "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
        }

        if ( logger.isDebugEnabled() ) {
            logger.debug( "Loaded member entity {}:{} from   app {}\n    data {}",
                itemRef.getType(), itemRef.getUuid(), applicationScope, CpEntityMapUtils.toMap( memberEntity ) );
        }


        // create graph edge connection from head entity to member entity
        final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntityId );
        final String linkedCollection = collection.getLinkedCollection();

        GraphManager gm = managerCache.getGraphManager( applicationScope );

        gm.writeEdge( edge ).doOnNext( writtenEdge -> {
            if ( logger.isDebugEnabled() ) {
                logger.debug( "Wrote edge {}", writtenEdge );
            }
        } ).filter( writtenEdge -> linkedCollection != null ).flatMap( writtenEdge -> {
            final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
            final Edge reverseEdge = createCollectionEdge( memberEntityId, pluralType, cpHeadEntity.getId() );

            //reverse
            return gm.writeEdge( reverseEdge ).doOnNext( reverseEdgeWritten -> {

                String entityType = cpHeadEntity.getId().getType();
                if ( !skipIndexingForType( entityType) ) {
                    QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
                    indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, queueIndexingStrategy);
                }

            } );
        } ).doOnCompleted( () -> {

            String entityType = memberEntity.getId().getType();
            if ( !skipIndexingForType( entityType ) ) {
                QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
                indexService.queueNewEdge(applicationScope, memberEntityId, edge, queueIndexingStrategy);
            }


            if ( logger.isDebugEnabled() ) {
                logger.debug( "Added entity {}:{} to collection {}",
                    itemRef.getUuid().toString(), itemRef.getType(), collectionName );
            }
        } ).toBlocking().lastOrDefault( null );


        // remove any duplicate edges (keeps the duplicate edge with same timestamp)
        removeDuplicateEdgesAsync(gm, edge);


        if ( logger.isDebugEnabled() ) {
            logger.debug( "Added entity {}:{} to collection {}",
                itemRef.getUuid().toString(), itemRef.getType(), collectionName  );
        }


        return itemEntity;
    }


    @Override
    public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
        throws Exception {

        if ( headEntity.getUuid().equals( applicationId ) ) {
            if ( itemType.equals( TYPE_ENTITY ) ) {
                itemType = singularize( collectionName );
            }

            if ( itemType.equals( TYPE_ROLE ) ) {
                Long inactivity = ( Long ) properties.get( PROPERTY_INACTIVITY );
                if ( inactivity == null ) {
                    inactivity = 0L;
                }
                return em.createRole( ( String ) properties.get( PROPERTY_NAME ),
                    ( String ) properties.get( PROPERTY_TITLE ), inactivity );
            }
            return em.create( itemType, properties );
        }

        else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collectionName.equals( COLLECTION_ROLES ) ) ) {
            UUID groupId = headEntity.getUuid();
            String roleName = ( String ) properties.get( PROPERTY_NAME );
            return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
        }

        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
        if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
            return null;
        }

        properties = getDefaultSchema().cleanUpdatedProperties( itemType, properties, true );

        Entity itemEntity = em.create( itemType, properties );

        if ( itemEntity != null ) {

            addToCollection( collectionName, itemEntity );

            if ( collection != null && collection.getLinkedCollection() != null ) {
                Id itemEntityId = new SimpleId( itemEntity.getUuid(), itemEntity.getType() );
                final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, itemEntityId );

                GraphManager gm = managerCache.getGraphManager( applicationScope );
                gm.writeEdge( edge );
            }
        }

        return itemEntity;
    }


    @Override
    public void removeFromCollection( String collectionName, EntityRef itemRef ) throws Exception {

        if ( headEntity.getUuid().equals( applicationId ) ) {
            if ( collectionName.equals( COLLECTION_ROLES ) ) {
                // special handling for roles collection of the application
                Entity itemEntity = em.get( itemRef );
                if ( itemEntity != null ) {
                    RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
                    em.deleteRole(roleRef.getApplicationRoleName(), Optional.fromNullable(itemEntity) );
                    return;
                }
            }
            // handles normal app collection deletes
            em.delete( itemRef );
            return;
        }

        // headEntity is not an application (used for management entities and entity collections like user devices)

        if ( logger.isDebugEnabled() ) {
            logger.debug( "Loading entity to remove from collection {}:{} from app {}\n",
                itemRef.getType(), itemRef.getUuid(), applicationScope );
        }

        Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );


        // this will remove the edges from app->entity(collection)
        removeItemFromCollection(collectionName, itemRef);

        // special handling for roles collection of a group
        if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {

            if ( collectionName.equals( COLLECTION_ROLES ) ) {
                String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );

                if ( path.startsWith( "/roles/" ) ) {

                    Entity itemEntity =
                        em.get( new SimpleEntityRef( entityId.getType(), entityId.getUuid() ) );

                    RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
                    em.deleteRole( roleRef.getApplicationRoleName(), Optional.fromNullable(itemEntity) );
                }
            }
        }
    }

    @Override
    public void removeItemFromCollection( String collectionName, EntityRef itemRef ) throws Exception {

        Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );

        // remove edge from collection to item
        GraphManager gm = managerCache.getGraphManager( applicationScope );



        // mark the edge versions and take the first for later delete edge queue event ( load is descending )
        final Edge markedSourceEdge = gm.loadEdgeVersions(
            CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, entityId ) )
            .flatMap(edge -> gm.markEdge(edge)).toBlocking().firstOrDefault(null);


        Edge markedReversedEdge = null;
        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
        if (collection != null && collection.getLinkedCollection() != null) {
            // delete reverse edges
            final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
            markedReversedEdge = gm.loadEdgeVersions(
                CpNamingUtils.createEdgeFromCollectionName( entityId, pluralType, cpHeadEntity.getId() ) )
                .flatMap(reverseEdge -> gm.markEdge(reverseEdge)).toBlocking().firstOrDefault(null);
        }


        /**
         * Remove from the index.  This will call gm.deleteEdge which also deletes the reverse edge(s) and de-indexes
         * older versions of the edge(s).
         *
         */
        if( markedSourceEdge != null ) {
            indexService.queueDeleteEdge(applicationScope, markedSourceEdge);
        }
        if( markedReversedEdge != null ){
            indexService.queueDeleteEdge(applicationScope, markedReversedEdge);

        }

    }


    @Override
    public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
        throws Exception {

        headEntity = em.validate( headEntity );
        dstEntityRef = em.validate( dstEntityRef );

        CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );

        CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );

        Results results = null;
        do {
            if ( srcCollection != null ) {
                results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false );
            }
            else {
                results = em.getTargetEntities( headEntity, srcRelationName, null, Level.REFS );
            }

            if ( ( results != null ) && ( results.size() > 0 ) ) {
                List<EntityRef> refs = results.getRefs();
                for ( EntityRef ref : refs ) {
                    if ( dstCollection != null ) {
                        em.addToCollection( dstEntityRef, dstRelationName, ref );
                    }
                    else {
                        em.createConnection( dstEntityRef, dstRelationName, ref );
                    }
                }
            }
        }
        while ( ( results != null ) && ( results.hasMoreResults() ) );
    }


    @Override
    public Results searchCollection( String collectionName, Query query ) throws Exception {

        if ( query == null ) {
            query = new Query();
            query.setCollection( collectionName );
        }

        headEntity = em.validate( headEntity );

        CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );

        if ( collection == null ) {
            throw new RuntimeException(
                "Cannot find collection-info for '" + collectionName + "' of " + headEntity.getType() + ":" + headEntity
                    .getUuid() );
        }


        query.setEntityType( collection.getType() );
        final Query toExecute = adjustQuery( query );
        final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
        final Id ownerId = headEntity.asId();
        final boolean analyzeOnly = query.getAnalyzeOnly();
        final boolean returnQuery = query.getReturnQuery();


        if(query.getLevel() == Level.IDS ){

            return new IdQueryExecutor( toExecute.getCursor() ) {
                @Override
                protected Observable<ResultsPage<Id>> buildNewResultsPage(
                    final Optional<String> cursor ) {

                    final CollectionSearch search =
                        new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
                            queryString, cursor );

                    search.setAnalyzeOnly(analyzeOnly);
                    search.setReturnQuery(returnQuery);

                    return collectionService.searchCollectionIds( search );
                }
            }.next();

        }

        //wire the callback so we can get each page
        return new EntityQueryExecutor( toExecute.getCursor() ) {
            @Override
            protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> buildNewResultsPage(
                final Optional<String> cursor ) {

                final CollectionSearch search =
                    new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
                        queryString, cursor );

                search.setAnalyzeOnly(analyzeOnly);
                search.setReturnQuery(returnQuery);
                IndexConsistency indexConsistency = getIndexConsistencyForType(collectionName);
                search.setKeepStaleEntries(indexConsistency == IndexConsistency.LATEST);

                return collectionService.searchCollection( search );
            }
        }.next();
    }


    @Override
    public Results searchCollectionConsistent( String collectionName, Query query, int expectedResults )
        throws Exception {
        Results results;
        long maxLength = entityManagerFig.pollForRecordsTimeout();
        long sleepTime = entityManagerFig.sleep();
        boolean found;
        long current = System.currentTimeMillis(), length = 0;
        do {
            results = searchCollection( collectionName, query );
            length = System.currentTimeMillis() - current;
            found = expectedResults == results.size();
            if ( found ) {
                break;
            }
            logger.info("Sleeping {} ms during searchCollectionConsistent", sleepTime);
            Thread.sleep( sleepTime );
        }
        while ( !found && length <= maxLength );
        logger.info( "Consistent Search finished in {}, results={}, expected={}...dumping stack",
            length, results.size(), expectedResults );
        return results;
    }


    @Override
    public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {

        return createConnection( connection.getConnectionType(), connection.getTargetRefs() );
    }


    @Override
    public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {

        headEntity = em.validate( headEntity );
        connectedEntityRef = em.validate( connectedEntityRef );

        ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );

        if ( logger.isTraceEnabled() ) {
            logger.trace( "createConnection(): Indexing connection type '{}'\n   from source {}:{}]\n   to target {}:{}\n   app {}",
                connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(),
                connectedEntityRef.getUuid(), applicationScope );
        }

        final Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() );
        final org.apache.usergrid.persistence.model.entity.Entity targetEntity =
            ( ( CpEntityManager ) em ).load( entityId );

        // create graph edge connection from head entity to member entity
        final Edge edge = createConnectionEdge( cpHeadEntity.getId(), connectionType, targetEntity.getId() );

        final GraphManager gm = managerCache.getGraphManager( applicationScope );


        //write new edge

        gm.writeEdge(edge).toBlocking().lastOrDefault(null); //throw an exception if this fails


        String entityType = targetEntity.getId().getType();
        if ( !skipIndexingForType( entityType ) ) {
            QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
            indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, queueIndexingStrategy);
        }

        // remove any duplicate edges (keeps the duplicate edge with same timestamp)
        removeDuplicateEdgesAsync(gm, edge);



        return connection;
    }


    @Override
    public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
                                           EntityRef connectedEntityRef ) throws Exception {

        throw new UnsupportedOperationException( "Paired connections not supported" );
    }


    @Override
    public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {

        throw new UnsupportedOperationException( "Paired connections not supported" );
    }


    @Override
    public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {

        ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );

        return connection;
    }


    @Override
    public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
                                        EntityRef connectedEntityRef ) throws Exception {

        throw new UnsupportedOperationException( "Paired connections not supported" );
    }


    @Override
    public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {

        throw new UnsupportedOperationException( "Paired connections not supported" );
    }


    @Override
    public void deleteConnection( ConnectionRef connectionRef ) throws Exception {

        // First, clean up the dictionary records of the connection
        EntityRef connectingEntityRef = connectionRef.getSourceRefs();  // source
        EntityRef connectedEntityRef = connectionRef.getTargetRefs();  // target

        String connectionType = connectionRef.getTargetRefs().getConnectionType();


        if ( logger.isTraceEnabled() ) {
            logger.trace( "Deleting connection '{}' from source {}:{} \n   to target {}:{}",
                connectionType, connectingEntityRef.getType(), connectingEntityRef.getUuid(),
                connectedEntityRef.getType(), connectedEntityRef.getUuid() );
        }

        Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() );
        org.apache.usergrid.persistence.model.entity.Entity targetEntity = ( ( CpEntityManager ) em ).load( entityId );

        GraphManager gm = managerCache.getGraphManager( applicationScope );

        final Id sourceId = new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() );

        final SearchByEdge search = createConnectionSearchByEdge( sourceId, connectionType, targetEntity.getId() );

        //delete all the edges and queue their processing
        gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) )
          .doOnNext( returnedEdge -> {

              if ( !skipIndexingForType( returnedEdge.getSourceNode().getType() ) || !skipIndexingForType( returnedEdge.getTargetNode().getType() ) ) {

                  indexService.queueDeleteEdge(applicationScope, returnedEdge);
              }

          }).toBlocking()
          .lastOrDefault( null );
    }


    @Override
    public Set<String> getConnectionTypes( UUID connectedEntityId ) throws Exception {
        throw new UnsupportedOperationException( "Cannot specify entity by UUID alone." );
    }


    @Override
    public Set<String> getConnectionTypes() throws Exception {
        return getConnectionTypes( false );
    }


    @Override
    public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {

        final GraphManager gm = managerCache.getGraphManager( applicationScope );

        Observable<String> edges =
            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );

        return edges.collect( () -> new HashSet<String>(), ( edgeSet, edge ) -> {
            edgeSet.add( CpNamingUtils.getNameFromEdgeType( edge ) );

        } ).toBlocking().last();

    }


    @Override
    public Results getTargetEntities( String connectionType, String connectedEntityType, Level level )
        throws Exception {

        //until this is refactored properly, we will delegate to a search by query
        Results raw = null;

        Preconditions.checkNotNull( connectionType, "connectionType cannot be null" );

        Query query = new Query();
        query.setConnectionType( connectionType );
        query.setEntityType( connectedEntityType );
        query.setResultsLevel( level );

        return searchTargetEntities( query );
    }


    @Override
    public Results getSourceEntities( String connType, String fromEntityType, Level resultsLevel ) throws Exception {

        return getSourceEntities( connType, fromEntityType, resultsLevel, -1 );
    }


    @Override
    public Results getSourceEntities( String connType, String fromEntityType, Level level, int count )
        throws Exception {

        // looking for edges to the head entity
        String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connType );

        Map<EntityRef, Set<String>> containers = getContainers( count, edgeType, fromEntityType );

        if ( Level.REFS.equals( level ) ) {
            List<EntityRef> refList = new ArrayList<EntityRef>( containers.keySet() );
            return Results.fromRefList( refList );
        }

        if ( Level.IDS.equals( level ) ) {
            // TODO: someday this should return a list of Core Persistence Ids
            List<UUID> idList = new ArrayList<UUID>();
            for ( EntityRef ref : containers.keySet() ) {
                idList.add( ref.getUuid() );
            }
            return Results.fromIdList( idList );
        }

        List<Entity> entities = new ArrayList<Entity>();
        for ( EntityRef ref : containers.keySet() ) {
            Entity entity = em.get( ref );

            if (logger.isTraceEnabled()) {
                logger.trace("   Found connecting entity: " + entity.getProperties());
            }

            entities.add( entity );
        }
        return Results.fromEntities( entities );
    }


    @Override
    public Results searchTargetEntities( Query query ) throws Exception {

        Preconditions.checkNotNull( query, "query cannot be null" );

        final String connection = query.getConnectionType();

        Preconditions.checkNotNull( connection, "connection must be specified" );

        headEntity = em.validate( headEntity );

        final boolean analyzeOnly = query.getAnalyzeOnly();
        final boolean returnQuery = query.getReturnQuery();


        final Query toExecute = adjustQuery( query );

        final Optional<String> entityType = Optional.fromNullable( query.getEntityType() );
        //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
        // -> 1.0 results

        //  startid -- graph edge load -- entity load (verify) from ids -> results page collector
        // startid -- eq query candiddate -- entity load (verify) from canddiates -> results page collector

        //startid -- graph edge load -- entity id verify --> filter to connection ref --> connection ref collector
        //startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref
        // collector

        final Id sourceId = headEntity.asId();

        final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
        final boolean isConnecting = query.isConnecting();

        if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) {


            return new ConnectionRefQueryExecutor( toExecute.getCursor() ) {
                @Override
                protected Observable<ResultsPage<ConnectionRef>> buildNewResultsPage( final Optional<String> cursor ) {


                //we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders

                    final ConnectionSearch search =
                        new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
                            queryString, cursor, isConnecting );
                    search.setAnalyzeOnly(analyzeOnly);
                    search.setReturnQuery(returnQuery);
                    return connectionService.searchConnectionAsRefs( search );
                }
            }.next();
        }


        return new EntityQueryExecutor( toExecute.getCursor() ) {
            @Override
            protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> buildNewResultsPage(
                final Optional<String> cursor ) {

                //we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
                final ConnectionSearch search =
                    new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
                        queryString, cursor, isConnecting );
                search.setAnalyzeOnly(analyzeOnly);
                search.setReturnQuery(returnQuery);
                return connectionService.searchConnection( search );
            }
        }.next();
    }


    private Query adjustQuery( Query query ) {

        // handle the select by identifier case
        if ( query.getQl().isPresent() ) {
            return query;
        }

        // a name alias or email alias was specified
        if ( query.containsSingleNameOrEmailIdentifier() ) {

            Identifier ident = query.getSingleIdentifier();

            // an email was specified.  An edge case that only applies to users.
            // This is fulgy to put here, but required.
            if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {

                final String newQuery = "select * where email='" + query.getSingleNameOrEmailIdentifier() + "'";

                query.setQl( newQuery );
            }
            // groups have a special unique identifier
            else if ( query.getEntityType().equals( Group.ENTITY_TYPE ) ){

                final String newQuery = "select * where path='" + query.getSingleNameOrEmailIdentifier() + "'";

                query.setQl( newQuery );
            }

            // use the ident with the default alias. could be an email
            else {

                final String newQuery = "select * where name='" + query.getSingleNameOrEmailIdentifier() + "'";
                query.setQl( newQuery );
            }
        }
        else if ( query.containsSingleUuidIdentifier() ) {

            //TODO, this shouldn't even come from ES, it should look up the entity directly
            final String newQuery = "select * where uuid=" + query.getSingleUuidIdentifier() + "";
            query.setQl( newQuery );
        }


        //TODO T.N. not sure if we still need this.  If we do then we ned to modify our query interface.

        //        final String ql = query.getQl();
        //
        //        if ( query.isReversed()  && ( StringUtils.isEmpty( ql ) || !(ql.contains( "order by " )) )) {
        //
        //
        //            final String sortQueryString =
        //
        //            SortPredicate
        //            Query.SortPredicate desc =
        //                new Query.SortPredicate( PROPERTY_CREATED, Query.SortDirection.DESCENDING );
        //
        //            try {
        //                query.addSort( desc );
        //            }
        //            catch ( Exception e ) {
        //                logger.warn( "Attempted to reverse sort order already set", PROPERTY_CREATED );
        //            }
        //        }



        return query;
    }


    @Override
    public Set<String> getConnectionIndexes( String connectionType ) throws Exception {
        throw new UnsupportedOperationException( "Not supported yet." );
    }


    /** side effect: converts headEntity into an Entity if it is an EntityRef! */
    private Entity getHeadEntity() throws Exception {
        Entity entity = null;
        if ( headEntity instanceof Entity ) {
            entity = ( Entity ) headEntity;
        }
        else {
            entity = em.get( headEntity );
            headEntity = entity;
        }
        return entity;
    }

    private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){

        //now read all older versions of an edge, and remove them.  Finally calling delete
        final SearchByEdge searchByEdge =
            new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE,
                SearchByEdgeType.Order.DESCENDING, Optional.absent() );

        //load our versions, only retain the most recent one
        gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
            if (logger.isDebugEnabled()) {
                logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
            }
            return gm.markEdge(edgeToDelete );
        }).lastOrDefault(null).doOnNext(lastEdge -> {
            //no op if we hit our default
            if (lastEdge == null) {
                return;
            }
            //don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
            gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
        }).toBlocking().lastOrDefault(null);//this should throw an exception

    }

    private IndexConsistency getIndexConsistencyForType(String type ) {
        return CpCollectionUtils.getIndexConsistencyForType(collectionSettingsFactory, applicationId, type);
    }

    private QueueIndexingStrategy getIndexingStrategyForType(String type ) {
        return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);

    }

    private boolean skipIndexingForType( String type ) {
        return CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, type);
    }

    /**
     * Get the map manager for uuid mapping
     */
    private MapManager getMapManagerForTypes() {
        Id mapOwner = new SimpleId( applicationId, TYPE_APPLICATION );

        final MapScope ms = CpNamingUtils.getEntityTypeMapScope(mapOwner);

        MapManager mm = managerCache.getMapManager( ms );

        return mm;
    }

}
