/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.usergrid.corepersistence.asyncevents;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Optional;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueMessage;
import org.apache.usergrid.persistence.queue.QueueScope;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
import rx.schedulers.Schedulers;


@Singleton
public class AmazonAsyncEventService implements AsyncEventService {


    private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);

    // SQS maximum receive messages is 10
    private static final int MAX_TAKE = 10;
    public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars

    private final QueueManager queue;
    private final QueueScope queueScope;
    private final IndexProcessorFig indexProcessorFig;
    private final IndexProducer indexProducer;
    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
    private final EntityIndexFactory entityIndexFactory;
    private final EventBuilder eventBuilder;
    private final RxTaskScheduler rxTaskScheduler;

    private final Timer readTimer;
    private final Timer writeTimer;
    private final Timer ackTimer;

    /**
     * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
     */
    private final Object mutex = new Object();

    private final Counter indexErrorCounter;
    private final AtomicLong counter = new AtomicLong();
    private final AtomicLong inFlight = new AtomicLong();
    private final Histogram messageCycle;

    //the actively running subscription
    private List<Subscription> subscriptions = new ArrayList<>();


    @Inject
    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
                                    final IndexProcessorFig indexProcessorFig,
                                    final IndexProducer indexProducer,
                                    final MetricsFactory metricsFactory,
                                    final EntityCollectionManagerFactory entityCollectionManagerFactory,
                                    final IndexLocationStrategyFactory indexLocationStrategyFactory,
                                    final EntityIndexFactory entityIndexFactory,
                                    final EventBuilder eventBuilder,
                                    final RxTaskScheduler rxTaskScheduler ) {
        this.indexProducer = indexProducer;

        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
        this.entityIndexFactory = entityIndexFactory;
        this.eventBuilder = eventBuilder;
        this.rxTaskScheduler = rxTaskScheduler;

        this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
        this.queue = queueManagerFactory.getQueueManager(queueScope);
        this.indexProcessorFig = indexProcessorFig;

        this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
        this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
        this.ackTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.ack");
        this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error");
        this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle");


        //wire up the gauge of inflight message
        metricsFactory.addGauge(AmazonAsyncEventService.class, "async-event.inflight", new Gauge<Long>() {
            @Override
            public Long getValue() {
                return inFlight.longValue();
            }
        });

        start();
    }


    /**
     * Offer the EntityIdScope to SQS
     */
    private void offer(final Object operation) {
        final Timer.Context timer = this.writeTimer.time();

        try {
            //signal to SQS
            this.queue.sendMessage( operation );
        } catch (IOException e) {
            throw new RuntimeException("Unable to queue message", e);
        } finally {
            timer.stop();
        }
    }

    private void offerBatch(final List operations){
        final Timer.Context timer = this.writeTimer.time();

        try {
            //signal to SQS
            this.queue.sendMessages( operations );
        } catch (IOException e) {
            throw new RuntimeException("Unable to queue message", e);
        } finally {
            timer.stop();
        }
    }


    /**
     * Take message from SQS
     */
    private Observable<QueueMessage> take() {

        final Timer.Context timer = this.readTimer.time();

        try {
            return queue.getMessages(MAX_TAKE,
                    indexProcessorFig.getIndexQueueVisibilityTimeout(),
                    indexProcessorFig.getIndexQueueTimeout(),
                    AsyncEvent.class);
        }
        //stop our timer
        finally {
            timer.stop();
        }
    }


    /**
     * Ack message in SQS
     */
    public void ack(final QueueMessage message) {

        final Timer.Context timer = this.ackTimer.time();

        try{
            queue.commitMessage(message);

            //decrement our in-flight counter
            inFlight.decrementAndGet();

        }catch(Exception e){
            throw new RuntimeException("Unable to ack messages", e);
        }finally {
            timer.stop();
        }


    }

    /**
     * Ack message in SQS
     */
    public void ack(final List<QueueMessage> messages) {

        final Timer.Context timer = this.ackTimer.time();

        try{
            queue.commitMessages(messages);

            //decrement our in-flight counter
            inFlight.decrementAndGet();

        }catch(Exception e){
            throw new RuntimeException("Unable to ack messages", e);
        }finally {
            timer.stop();
        }


    }


    private List<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
        if (logger.isDebugEnabled()) {
            logger.debug("handleMessages with {} message", messages.size());
        }

        final int bufferSize = messages.size();
        Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
            AsyncEvent event = null;
            try{
                event = (AsyncEvent) message.getBody();
            }catch (ClassCastException cce){
                logger.error("Failed to deserialize message body",cce);
            }

            if (event == null) {
                logger.error("AsyncEvent type or event is null!");
                return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),System.currentTimeMillis()));
            }

            final AsyncEvent thisEvent = event;
            if(logger.isDebugEnabled()) {
                logger.debug("Processing {} event", event);
            }

            try {
                Observable<IndexOperationMessage> indexoperationObservable;
                //merge each operation to a master observable;
                if (event instanceof EdgeDeleteEvent) {
                    indexoperationObservable = handleEdgeDelete(message);
                } else if (event instanceof EdgeIndexEvent) {
                    indexoperationObservable = handleEdgeIndex(message);
                } else if (event instanceof EntityDeleteEvent) {
                    indexoperationObservable = handleEntityDelete(message);
                } else if (event instanceof EntityIndexEvent) {
                    indexoperationObservable = handleEntityIndexUpdate(message);
                } else if (event instanceof InitializeApplicationIndexEvent) {
                    //does not return observable
                    handleInitializeApplicationIndex(event,message);
                    indexoperationObservable = Observable.just(new IndexOperationMessage());
                } else {
                    throw new Exception("Unknown EventType");//TODO: print json instead
                }

                //return type that can be indexed and ack'd later
                return indexoperationObservable
                    .map(indexOperationMessage ->
                            new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage),thisEvent.getCreationTime())
                    );
            } catch (Exception e) {
                logger.error("Failed to index message: " + message.getMessageId(), e, message);
                return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime()));
            }
        });

        //filter for success, send to the index(optional), ack
        return masterObservable
            //take the max
            .buffer(bufferSize)
            //map them to index results and return them
            .flatMap(indexEventResults -> {
                IndexOperationMessage combined = new IndexOperationMessage();
                indexEventResults.stream().forEach(
                    indexEventResult -> {
                        if (indexEventResult.getIndexOperationMessage().isPresent()) {
                            combined.ingest(indexEventResult.getIndexOperationMessage().get());
                        }
                    });


                //ack after successful completion of the operation.
                return indexProducer.put(combined)
                    //change observable type
                    .flatMap(indexOperationMessage -> Observable.from(indexEventResults))
                        //remove unsuccessful
                    .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
                    //measure
                    .doOnNext(indexEventResult -> messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()))
                    //return the queue messages to ack
                    .map(result -> result.getQueueMessage().get())
                    .toList();

            })
            .doOnError(t -> logger.error("Failed to process queuemessages",t))
            .toBlocking().lastOrDefault(null);
    }


    @Override
    public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
            applicationScope );
        offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
    }


    @Override
    public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
                                       final Entity entity) {

        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()), 0));
    }


    public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) {

        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );

        final AsyncEvent event = ( AsyncEvent ) message.getBody();

        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
        Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));

        final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;


        //process the entity immediately
        //only process the same version, otherwise ignore
        final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
        final Id entityId = entityIdScope.getId();
        final long updatedAfter = entityIndexEvent.getUpdatedAfter();

        final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);

        final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
        return observable;
    }


    @Override
    public void queueNewEdge(final ApplicationScope applicationScope,
                             final Entity entity,
                             final Edge newEdge) {

        EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge);

        offer( operation );
    }

    public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) {

        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );

        final AsyncEvent event = (AsyncEvent) message.getBody();

        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
        Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));

        final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;

        final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
        final Edge edge = edgeIndexEvent.getEdge();



        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );

        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge(
            applicationScope, entity, edge));
        return edgeIndexObservable;
    }

    @Override
    public void queueDeleteEdge(final ApplicationScope applicationScope,
                                final Edge edge) {

        offer( new EdgeDeleteEvent( applicationScope, edge ) );
    }

    public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) {

        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );

        final AsyncEvent event = (AsyncEvent) message.getBody();

        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
        Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));


        final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;

        final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope();
        final Edge edge = edgeDeleteEvent.getEdge();

        if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);

        final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
        return observable;
    }


    @Override
    public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {

        offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
    }

    @Override
    public long getQueueDepth() {
        return queue.getQueueDepth();
    }

    public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) {

        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");

        final AsyncEvent event = (AsyncEvent) message.getBody();
        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
        Preconditions.checkArgument( event instanceof EntityDeleteEvent,
            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );


        final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
        final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
        final Id entityId = entityDeleteEvent.getEntityIdScope().getId();

        if (logger.isDebugEnabled())
            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);

        final EventBuilderImpl.EntityDeleteResults
            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );


        final Observable<IndexOperationMessage> merged = entityDeleteResults
            .getEntitiesCompacted()
            .collect(() -> new ArrayList<>(), (list, item) -> list.add(item))
            .flatMap(collected -> entityDeleteResults.getIndexObservable()) ;
        return merged;
    }


    public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
        Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));

        final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
            ( InitializeApplicationIndexEvent ) event;

        final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
        final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
        index.initialize();
    }

    /**
     * Loop through and start the workers
     */
    public void start() {
        final int count = indexProcessorFig.getWorkerCount();

        for (int i = 0; i < count; i++) {
            startWorker();
        }
    }


    /**
     * Stop the workers
     */
    public void stop() {
        synchronized (mutex) {
            //stop consuming

            for (final Subscription subscription : subscriptions) {
                subscription.unsubscribe();
            }
        }
    }


    private void startWorker() {
        synchronized (mutex) {

            Observable<List<QueueMessage>> consumer =
                    Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() {
                        @Override
                        public void call(final Subscriber<? super List<QueueMessage>> subscriber) {

                            //name our thread so it's easy to see
                            Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet());

                            List<QueueMessage> drainList = null;

                            do {
                                try {
                                    drainList = take().toList().toBlocking().lastOrDefault(null);
                                    //emit our list in it's entity to hand off to a worker pool
                                    subscriber.onNext(drainList);

                                    //take since  we're in flight
                                    inFlight.addAndGet(drainList.size());
                                } catch (Throwable t) {
                                    final long sleepTime = indexProcessorFig.getFailureRetryTime();

                                    logger.error("Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t);

                                    if (drainList != null) {
                                        inFlight.addAndGet(-1 * drainList.size());
                                    }


                                    try {
                                        Thread.sleep(sleepTime);
                                    } catch (InterruptedException ie) {
                                        //swallow
                                    }

                                    indexErrorCounter.inc();
                                }
                            }
                            while (true);
                        }
                    })
                            //this won't block our read loop, just reads and proceeds
                            .map(messages ->
                            {
                                if (messages == null || messages.size() == 0) {
                                    return null;
                                }

                                try {

                                    List<QueueMessage> messagesToAck = handleMessages(messages);

                                    if (messagesToAck == null || messagesToAck.size() == 0) {
                                        return messagesToAck;
                                    }
                                    //ack each message, but only if we didn't error.
                                    ack(messagesToAck);
                                    return messagesToAck;
                                } catch (Exception e) {
                                    logger.error("failed to ack messages to sqs", messages.get(0).getMessageId(), e);
                                    return null;
                                    //do not rethrow so we can process all of them
                                }
                            });

            //start in the background

            final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();

            subscriptions.add(subscription);
        }
    }

    public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
        //change to id scope to avoid serialization issues
        offer( new EntityIndexEvent( new EntityIdScope( applicationScope, id ), updatedSince ) );
    }

    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {

        List batch = new ArrayList<EdgeScope>();
        for ( EdgeScope e : edges){
            //change to id scope to avoid serialization issues
            batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
        }
        offerBatch( batch );
    }


    public class IndexEventResult{
        private final Optional<QueueMessage> queueMessage;
        private final Optional<IndexOperationMessage> indexOperationMessage;
        private final long creationTime;


        public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage, long creationTime){

            this.queueMessage = queueMessage;
            this.indexOperationMessage = indexOperationMessage;

            this.creationTime = creationTime;
        }


        public Optional<QueueMessage> getQueueMessage() {
            return queueMessage;
        }

        public Optional<IndexOperationMessage> getIndexOperationMessage() {
            return indexOperationMessage;
        }

        public long getCreationTime() {
            return creationTime;
        }
    }
}
