/*
 *
 *  * Licensed to the Apache Software Foundation (ASF) under one
 *  * or more contributor license agreements.  See the NOTICE file
 *  * distributed with this work for additional information
 *  * regarding copyright ownership.  The ASF licenses this file
 *  * to you under the Apache License, Version 2.0 (the
 *  * "License"); you may not use this file except in compliance
 *  * with the License.  You may obtain a copy of the License at
 *  *
 *  *    http://www.apache.org/licenses/LICENSE-2.0
 *  *
 *  * Unless required by applicable law or agreed to in writing,
 *  * software distributed under the License is distributed on an
 *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  * KIND, either express or implied.  See the License for the
 *  * specific language governing permissions and limitations
 *  * under the License.
 *
 */

package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;


import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.Nullable;

import com.google.common.base.Optional;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;

import com.google.common.base.Preconditions;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;


/**
 * Implementation of the shard group compaction
 */
@Singleton
public class ShardGroupCompactionImpl implements ShardGroupCompaction {


    private final AtomicLong countAudits;
    private static final Logger logger = LoggerFactory.getLogger( ShardGroupCompactionImpl.class );


    private static final Charset CHARSET = Charset.forName( "UTF-8" );

    private static final HashFunction MURMUR_128 = Hashing.murmur3_128();


    private final ListeningExecutorService taskExecutor;
    private final TimeService timeService;
    private final GraphFig graphFig;
    private final NodeShardAllocation nodeShardAllocation;
    private final ShardedEdgeSerialization shardedEdgeSerialization;
    private final EdgeColumnFamilies edgeColumnFamilies;
    private final Keyspace keyspace;
    private final EdgeShardSerialization edgeShardSerialization;

    private final Random random;
    private final ShardCompactionTaskTracker shardCompactionTaskTracker;
    private final ShardAuditTaskTracker shardAuditTaskTracker;
    private final NodeShardCache nodeShardCache;


    @Inject
    public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig graphFig,
                                     final NodeShardAllocation nodeShardAllocation,
                                     final ShardedEdgeSerialization shardedEdgeSerialization,
                                     final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
                                     final EdgeShardSerialization edgeShardSerialization,
                                     final AsyncTaskExecutor asyncTaskExecutor,
                                     final NodeShardCache nodeShardCache ) {

        this.timeService = timeService;
        this.countAudits = new AtomicLong();
        this.graphFig = graphFig;
        this.nodeShardAllocation = nodeShardAllocation;
        this.shardedEdgeSerialization = shardedEdgeSerialization;
        this.edgeColumnFamilies = edgeColumnFamilies;
        this.keyspace = keyspace;
        this.edgeShardSerialization = edgeShardSerialization;

        this.random = new Random();
        this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
        this.shardAuditTaskTracker = new ShardAuditTaskTracker();


        this.taskExecutor = asyncTaskExecutor.getExecutorService();
        this.nodeShardCache = nodeShardCache;
    }


    /**
     * Execute the compaction task.  Will return the status the operations performed
     *
     * @param group The shard entry group to compact
     *
     * @return The result of the compaction operation
     */
    public CompactionResult compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
                                     final ShardEntryGroup group ) {


        final long startTime = timeService.getCurrentTime();


        Preconditions.checkNotNull( group, "group cannot be null" );
        Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
        Preconditions
            .checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet.  Ignoring compaction." );

        if(logger.isTraceEnabled()) {
            logger.trace("Compacting shard group. Audit count is {} ", countAudits.get());
        }
        final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();

        final Shard targetShard = group.getCompactionTarget();

        final Set<Shard> sourceShards = new HashSet<>( group.getReadShards() );

        //remove the target
        sourceShards.remove( targetShard );


        final UUID timestamp = UUIDGenerator.newTimeUUID();

        final long newShardPivot = targetShard.getShardIndex();

        final int maxWorkSize = graphFig.getScanPageSize();




        /**
         * As we move edges, we want to keep track of it
         */
        long totalEdgeCount = 0;


        for ( Shard sourceShard : sourceShards ) {

            final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
            final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
            final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();

            long edgeCount = 0;

            Iterator<MarkedEdge> edges = edgeMeta
                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ),
                    Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );

            MarkedEdge shardEnd = null;

            while ( edges.hasNext() ) {
                final MarkedEdge edge = edges.next();

                final long edgeTimestamp = edge.getTimestamp();

                shardEnd = edge;

                /**
                 * The edge is within a different shard, break
                 */
                if ( edgeTimestamp < newShardPivot ) {
                    break;
                }

                newRowBatch.mergeShallow( edgeMeta
                        .writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge,
                            timestamp ) );

                deleteRowBatch.mergeShallow( edgeMeta
                        .deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge,
                            timestamp ) );


                edgeCount++;



                // if we're at our count, execute the mutation of writing the edges to the new row, then remove them
                // from the old rows
                if ( edgeCount % maxWorkSize == 0 ) {



                    try {

                        // write the edges into the new shard atomically so we know they all succeed
                        newRowBatch.withAtomicBatch(true).execute();


                        // Update the shard end after each batch so any reads during transition stay as close to current
                        sourceShard.setShardEnd(
                            Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
                        );

                        if(logger.isTraceEnabled()) {
                            logger.trace("Updating shard {} during batch removal with shardEnd {}", sourceShard, shardEnd);
                        }
                        updateShardMetaBatch.mergeShallow(
                            edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));



                        // on purpose block this thread before deleting the old edges to be sure there are no gaps
                        // duplicates are filtered on graph seeking so this is OK
                        Thread.sleep(1000);

                        if(logger.isTraceEnabled()) {
                            logger.trace("Deleting batch of {} from old shard", maxWorkSize);
                        }
                        deleteRowBatch.withAtomicBatch(true).execute();

                        updateShardMetaBatch.execute();


                    }
                    catch ( Throwable t ) {
                        logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
                    }

                    totalEdgeCount += edgeCount;
                    edgeCount = 0;
                }



            }

            totalEdgeCount += edgeCount;

            try {

                // write the edges into the new shard atomically so we know they all succeed
                newRowBatch.withAtomicBatch(true).execute();

                // on purpose block this thread before deleting the old edges to be sure there are no gaps
                // duplicates are filtered on graph seeking so this is OK
                Thread.sleep(1000);

                if(logger.isTraceEnabled()) {
                    logger.trace("Deleting remaining {} edges from old shard", edgeCount);
                }
                deleteRowBatch.withAtomicBatch(true).execute();

                if (shardEnd != null){

                    sourceShard.setShardEnd(
                        Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
                    );

                    if(logger.isTraceEnabled()) {
                        logger.trace("Updating for last time shard {} with shardEnd {}", sourceShard, shardEnd);
                    }
                    updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
                    updateShardMetaBatch.execute();
                }


            }
            catch ( Throwable t ) {
                logger.error( "Unable to move edges to target shard {}", targetShard );
            }



        }




        if (logger.isTraceEnabled()) {
            logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
        }

        logger.info("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);


        resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );

        /**
         * We didn't move anything this pass, mark the shard as compacted.  If we move something,
         * it means that we missed it on the first pass
         * or someone is still not writing to the target shard only.
         */
        if ( totalEdgeCount == 0 ) {


            // now that we've marked our target as compacted, we can successfully remove any shards that are not
            // compacted themselves in the sources

            final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch();

            for ( Shard source : sourceShards ) {

                //if we can't safely delete it, don't do so
                if ( !group.canBeDeleted( source ) ) {
                    continue;
                }

                logger.info( "Source shards have been fully drained.  Removing shard {}", source );

                final MutationBatch shardRemoval = edgeShardSerialization.removeShardMeta( scope, source, edgeMeta );
                shardRemovalRollup.mergeShallow( shardRemoval );

                resultBuilder.withRemovedShard( source );
            }


            try {
                shardRemovalRollup.execute();

                // invalidate the shard cache so we can be sure that all read shards are up to date
                nodeShardCache.invalidate(scope, edgeMeta);
            }
            catch ( ConnectionException e ) {
                throw new RuntimeException( "Unable to connect to cassandra", e );
            }

            //Overwrite our shard index with a newly created one that has been marked as compacted
            Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
            compactedShard.setShardEnd(targetShard.getShardEnd());

            logger.info( "Shard has been fully compacted.  Marking shard {} as compacted in Cassandra", compactedShard );


            final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
            try {
                updateMark.execute();

                // invalidate the shard cache so we can be sure that all read shards are up to date
                nodeShardCache.invalidate(scope, edgeMeta);
            }
            catch ( ConnectionException e ) {
                throw new RuntimeException( "Unable to connect to cassandra", e );
            }

            resultBuilder.withCompactedShard( compactedShard );
        }

        return resultBuilder.build();
    }


    @Override
    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
                                                             final DirectedEdgeMeta edgeMeta,
                                                             final ShardEntryGroup group ) {

        final double repairChance = random.nextDouble();


        //don't audit, we didn't hit our chance
        if ( repairChance > graphFig.getShardRepairChance() ) {
            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
        }

        countAudits.getAndIncrement();

        if(logger.isTraceEnabled()) {
            logger.trace("Auditing shard group {}. count is {} ", group, countAudits.get());
        }

        /**
         * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
         * hose the system
         */
        final ListenableFuture<AuditResult> future;

        try {
            future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
        }
        catch ( RejectedExecutionException ree ) {

            //ignore, if this happens we don't care, we're saturated, we can check later
            logger.info( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group );

            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
        }

        /**
         * Log our success or failures for debugging purposes
         */
        Futures.addCallback( future, new FutureCallback<AuditResult>() {
            @Override
            public void onSuccess( @Nullable final AuditResult result ) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Successfully completed audit of task {}", result);
                }
            }


            @Override
            public void onFailure( final Throwable t ) {
                logger.error( "Unable to perform audit.  Exception is ", t );
            }
        } );

        return future;
    }


    private final class ShardAuditTask implements Callable<AuditResult> {

        private final ApplicationScope scope;
        private final DirectedEdgeMeta edgeMeta;
        private final ShardEntryGroup group;


        public ShardAuditTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
                               final ShardEntryGroup group ) {
            this.scope = scope;
            this.edgeMeta = edgeMeta;
            this.group = group;
        }


        @Override
        public AuditResult call() throws Exception {
            /**
             * We don't have a compaction pending.  Run an audit on the shards
             */
            if ( !group.isCompactionPending() ) {

                /**
                 * Check if we should allocate, we may want to
                 */

                /**
                 * It's already compacting, don't do anything
                 */
                if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
                    return AuditResult.CHECKED_NO_OP;
                }

                try {

                    final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
                    if ( !created ) {
                        return AuditResult.CHECKED_NO_OP;
                    }
                }
                finally {
                    shardAuditTaskTracker.complete( scope, edgeMeta, group );
                }


                return AuditResult.CHECKED_CREATED;
            }

            //check our taskmanager


            /**
             * Do the compaction
             */
            if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
                /**
                 * It's already compacting, don't do anything
                 */
                if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {

                    if(logger.isTraceEnabled()) {
                        logger.trace("Already compacting, won't compact group: {}", group);
                    }


                    return AuditResult.COMPACTING;
                }

                /**
                 * We use a finally b/c we always want to remove the task track
                 */
                try {
                    CompactionResult result = compact( scope, edgeMeta, group );
                    logger.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}",
                            scope, edgeMeta, group, result );
                }
                finally {
                    shardCompactionTaskTracker.complete( scope, edgeMeta, group );
                }
                return AuditResult.COMPACTED;
            }

            //no op, there's nothing we need to do to this shard
            return AuditResult.NOT_CHECKED;
        }
    }


    /**
     * Inner class used to track running tasks per instance
     */
    private static abstract class TaskTracker {

        private static final Boolean TRUE = true;

        private ConcurrentHashMap<Long, Boolean> runningTasks = new ConcurrentHashMap<>();


        /**
         * Sets this data into our scope to signal it's running to stop other threads from attempting to run
         */
        public boolean canStartTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
                                     ShardEntryGroup group ) {
            final Long hash = doHash( scope, edgeMeta, group ).hash().asLong();
            final Boolean returned = runningTasks.putIfAbsent( hash, TRUE );
            //logger.info("hash components are app: {}, edgeMeta: {}, group: {}", scope.getApplication(), edgeMeta, group);
            //logger.info("checking hash value of: {}, already started: {}", hash, returned );

            /**
             * Someone already put the value
             */
            return returned == null;
        }


        /**
         * Mark this entry group as complete
         */
        public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, ShardEntryGroup group ) {
            final long hash = doHash( scope, edgeMeta, group ).hash().asLong();
            runningTasks.remove( hash );
        }


        protected abstract Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
                                          final ShardEntryGroup shardEntryGroup );
    }


    /**
     * Task tracker for shard compaction
     */
    private static final class ShardCompactionTaskTracker extends ShardAuditTaskTracker {

        /**
         * Hash our data into a consistent long
         */
        @Override
        protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
                                 final ShardEntryGroup shardEntryGroup ) {

            final Hasher hasher = super.doHash( scope, directedEdgeMeta, shardEntryGroup );

            // add the compaction target to the hash
            final Shard compactionTarget = shardEntryGroup.getCompactionTarget();

            hasher.putLong( compactionTarget.getShardIndex() );


            return hasher;
        }
    }


    /**
     * Task tracker for shard audit
     */
    private static class ShardAuditTaskTracker extends TaskTracker {

        /**
         * Hash our data into a consistent long
         */
        protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
                                 final ShardEntryGroup shardEntryGroup ) {

            final Hasher hasher = MURMUR_128.newHasher();


            addToHash( hasher, scope.getApplication() );


            for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
                addToHash( hasher, nodeMeta.getId() );
                hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
            }


            /**
             * Add our edge type
             */
            for ( String type : directedEdgeMeta.getTypes() ) {
                hasher.putString( type, CHARSET );
            }


            return hasher;
        }


        protected void addToHash( final PrimitiveSink into, final Id id ) {

            final UUID nodeUuid = id.getUuid();
            final String nodeType = id.getType();

            into.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
                .putString( nodeType, CHARSET );
        }
    }


    public static final class CompactionResult {

        public final long copiedEdges;
        public final Shard targetShard;
        public final Set<Shard> sourceShards;
        public final Set<Shard> removedShards;
        public final Shard compactedShard;


        private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
                                  final Set<Shard> removedShards, final Shard compactedShard ) {
            this.copiedEdges = copiedEdges;
            this.targetShard = targetShard;
            this.compactedShard = compactedShard;
            this.sourceShards = Collections.unmodifiableSet( sourceShards );
            this.removedShards = Collections.unmodifiableSet( removedShards );
        }


        /**
         * Create a builder to use to create the result
         */
        public static CompactionBuilder builder() {
            return new CompactionBuilder();
        }


        @Override
        public String toString() {
            return "CompactionResult{" +
                "copiedEdges=" + copiedEdges +
                ", targetShard=" + targetShard +
                ", sourceShards=" + sourceShards +
                ", removedShards=" + removedShards +
                ", compactedShard=" + compactedShard +
                '}';
        }


        public static final class CompactionBuilder {
            private long copiedEdges;
            private Shard targetShard;
            private Set<Shard> sourceShards;
            private Set<Shard> removedShards = new HashSet<>();
            private Shard compactedShard;


            public CompactionBuilder withCopiedEdges( final long copiedEdges ) {
                this.copiedEdges = copiedEdges;
                return this;
            }


            public CompactionBuilder withTargetShard( final Shard targetShard ) {
                this.targetShard = targetShard;
                return this;
            }


            public CompactionBuilder withSourceShards( final Set<Shard> sourceShards ) {
                this.sourceShards = sourceShards;
                return this;
            }


            public CompactionBuilder withRemovedShard( final Shard removedShard ) {
                this.removedShards.add( removedShard );
                return this;
            }


            public CompactionBuilder withCompactedShard( final Shard compactedShard ) {
                this.compactedShard = compactedShard;
                return this;
            }


            public CompactionResult build() {
                return new CompactionResult( copiedEdges, targetShard, sourceShards, removedShards, compactedShard );
            }
        }
    }
}
