/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.usergrid.services.notifications;


import java.util.*;

import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.services.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.usergrid.mq.Message;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueScope;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.Injector;

import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import static org.apache.usergrid.utils.InflectionUtils.pluralize;

public class NotificationsService extends AbstractCollectionService {


    private MetricsFactory metricsService;
    private Meter postMeter;
    private Timer postTimer;

    private static final int PAGE = 100;
    private static final Logger logger = LoggerFactory.getLogger(NotificationsService.class);
    //need a mocking framework, this is to substitute for no mocking

    static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";

    static {
        Message.MESSAGE_PROPERTIES.put(
                MESSAGE_PROPERTY_DEVICE_UUID, UUID.class);
    }

//not really a queue manager at all
    private ApplicationQueueManager notificationQueueManager;
    private long gracePeriod;
    private ServiceManagerFactory smf;
    private EntityManagerFactory emf;
    private QueueManagerFactory queueManagerFactory;

    public NotificationsService() {
        if (logger.isTraceEnabled()) {
            logger.trace("/notifications");
        }
    }

    @Override
    public void init( ServiceInfo info ) {
        super.init(info);
        smf = getApplicationContext().getBean(ServiceManagerFactory.class);
        emf = getApplicationContext().getBean(EntityManagerFactory.class);

        Properties props = (Properties)getApplicationContext().getBean("properties");
        metricsService = getApplicationContext().getBean(Injector.class).getInstance(MetricsFactory.class);
        postMeter = metricsService.getMeter(NotificationsService.class, "collection.post_requests");
        postTimer = metricsService.getTimer(this.getClass(), "collection.post_requests");
        JobScheduler jobScheduler = new JobScheduler(sm,em);
        String name = ApplicationQueueManagerImpl.getQueueNames( props );
        QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCAL);
        queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
        QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
        notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);
        gracePeriod = JobScheduler.SCHEDULER_GRACE_PERIOD;
    }

    public ApplicationQueueManager getQueueManager(){
        return notificationQueueManager;
    }


    @Override
    public ServiceContext getContext(ServiceAction action,
            ServiceRequest request, ServiceResults previousResults,
            ServicePayload payload) throws Exception {

        ServiceContext context = super.getContext(action, request, previousResults, payload);

        if (action == ServiceAction.POST) {
            context.setQuery(null); // we don't use this, and it must be null to
                                    // force the correct execution path
        }
        return context;
    }

    @Override
    public ServiceResults postCollection(ServiceContext context) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("NotificationService: start request.");
        }
        Timer.Context timer = postTimer.time();
        postMeter.mark();
        try {

            validate(null, context.getPayload());

            // perform some input validates on useGraph payload property vs. ql= path query
            final List<ServiceParameter> parameters = context.getRequest().getOriginalParameters();
            for (ServiceParameter parameter : parameters){
                if( parameter instanceof ServiceParameter.QueryParameter && context.getProperties().get("useGraph").equals(true)){
                    throw new IllegalArgumentException("Query ql parameter cannot be used with useGraph:true property value");
                }
            }

            Notification.PathTokens pathTokens = getPathTokens(parameters);

            // set defaults
            context.getProperties().put("filters", context.getProperties().getOrDefault("filters", new HashMap<>()));
            context.getProperties().put("useGraph", context.getProperties().getOrDefault("useGraph", false));
            context.getProperties().put("saveReceipts", context.getProperties().getOrDefault("saveReceipts", true));
            context.getProperties().put("processingFinished", 0L); // defaulting processing finished to 0
            context.getProperties().put("deviceProcessedCount", 0); // defaulting processing finished to 0
            context.getProperties().put("state", Notification.State.CREATED);
            context.getProperties().put("pathQuery", pathTokens);
            context.setOwner(sm.getApplication());
            ServiceResults results = super.postCollection(context);
            Notification notification = (Notification) results.getEntity();

            // update Notification properties
            if (notification.getStarted() == null || notification.getStarted() == 0) {
                long now = System.currentTimeMillis();
                notification.setStarted(System.currentTimeMillis());
                Map<String, Object> properties = new HashMap<String, Object>(2);
                properties.put("started", notification.getStarted());
                properties.put("state", notification.getState());
                notification.addProperties(properties);
                if (logger.isTraceEnabled()) {
                    logger.trace("ApplicationQueueMessage: notification {} properties updated in duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
                }
            }

            long now = System.currentTimeMillis();
            notificationQueueManager.queueNotification(notification, null);
            if (logger.isTraceEnabled()) {
                logger.trace("NotificationService: notification {} post queue duration {} ms ", notification.getUuid(), System.currentTimeMillis() - now);
            }
            // future: somehow return 202?
            return results;
        }catch (Exception e){
            logger.error(e.getMessage());
            throw e;
        }finally {
            timer.stop();
        }
    }

    private Notification.PathTokens getPathTokens(List<ServiceParameter> parameters){

        Notification.PathTokens pathTokens = new Notification.PathTokens();
        pathTokens.setApplicationRef((SimpleEntityRef)em.getApplicationRef());

        // first parameter is always collection name, start parsing after that
        for (int i = 0; i < parameters.size() - 1; i += 2 ) {
            String collection = pluralize(parameters.get(i).getName());
            Identifier identifier = null;
            String ql = null;
            ServiceParameter sp = parameters.get(i + 1);

            // if the next param is a query, add a token with the query
            if(sp.isQuery()){
                ql = sp.getQuery().getQl().get();
                pathTokens.getPathTokens().add(new Notification.PathToken( collection, ql));
            }else{
                // if the next param is "notifications", it's the end let identifier be null
                if(sp.isName() && !sp.getName().equalsIgnoreCase("notifications") || sp.isId()){
                    identifier = sp.getIdentifier();
                }
                pathTokens.getPathTokens().add(new Notification.PathToken( collection, identifier));
            }


        }
        return pathTokens;
    }

    @Override
    public ServiceResults postItemsByQuery(ServiceContext context, Query query) throws Exception {
        return postCollection(context);
    }

    @Override
    public Entity updateEntity(ServiceRequest request, EntityRef ref,
            ServicePayload payload) throws Exception {

        validate(ref, payload);

        Notification notification = em.get(ref, Notification.class);

        if ("restart".equals(payload.getProperty("restart"))) { // for emergency
                                                                // use only
            payload.getProperties().clear();
            payload.setProperty("restart", "");
            payload.setProperty("errorMessage", "");
            payload.setProperty("deliver", System.currentTimeMillis() + gracePeriod);

            // once finished, immutable
        } else if (notification.getFinished() != null) {
            throw new ForbiddenServiceOperationException(request,
                    "Notification immutable once sent.");

            // once started, only cancel is allowed
        } else if (notification.getStarted() != null) {
            if (payload.getProperty("canceled") != Boolean.TRUE) {
                throw new ForbiddenServiceOperationException(request,
                        "Notification has started. You may only set canceled.");
            }
            payload.getProperties().clear();
            payload.setProperty("canceled", Boolean.TRUE);
        }

        Entity response = super.updateEntity(request, ref, payload);

        Long deliver = (Long) payload.getProperty("deliver");
        if (deliver != null) {
            if (!deliver.equals(notification.getDeliver())) {
                notificationQueueManager.queueNotification((Notification) response, null);
            }
        }
        return response;
    }

    @Override
    protected boolean isDeleteAllowed(ServiceContext context, Entity entity) {
        Notification notification = (Notification) entity;
        Notification.State state = notification.getState();
        return !(state.equals(Notification.State.CREATED) || state.equals(Notification.State.STARTED) ||
            state.equals(Notification.State.SCHEDULED));
    }

    // validate payloads
    private void validate(EntityRef ref, ServicePayload servicePayload)
            throws Exception {
        Object obj_payloads = servicePayload.getProperty("payloads");
        if (obj_payloads == null && ref == null) {
            throw new RequiredPropertyNotFoundException("notification",
                    "payloads");
        }
        if (obj_payloads != null) {
            if (!(obj_payloads instanceof Map)) {
                throw new IllegalArgumentException(
                        "payloads must be a JSON Map");
            }
            final Map<String, Object> payloads = (Map<String, Object>) obj_payloads;
            final Map<Object, Notifier> notifierMap = getNotifierMap(payloads);
            Observable t = Observable.from(payloads.entrySet()).subscribeOn(Schedulers.io()).map(new Func1<Map.Entry<String, Object>, Object>() {
                @Override
                public Object call(Map.Entry<String, Object> entry) {
                    String notifierId = entry.getKey();
                    Notifier notifier = notifierMap.get(notifierId);
                    if (notifier == null) {
                        throw new IllegalArgumentException("notifier \""
                                + notifierId + "\" not found");
                    }
                    ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier, em);
                    Object payload = entry.getValue();
                    try {
                        return providerAdapter.translatePayload(payload); // validate
                        // specifically to
                        // provider
                    } catch (Exception e) {
                        return e;
                    }
                }
            });
            Object e = t.toBlocking().lastOrDefault(null);
            if(e instanceof Throwable){
                throw new Exception((Exception)e);
            }

        }
    }



    public String getJobQueueName(EntityRef notification) {
        return "notifications/" + notification.getUuid();
    }



    /* adds a single device for delivery - used only by tests */
    public void addDevice(EntityRef notification, EntityRef device) throws Exception {

        String jobQueueName = getJobQueueName(notification);
        Message message = new Message();
        message.setObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID,
                device.getUuid());
        sm.getQueueManager().postToQueue(jobQueueName, message);
    }

    public Notification getSourceNotification(EntityRef receipt)
            throws Exception {
        Receipt r = em.get(receipt.getUuid(), Receipt.class);
        return em.get(r.getNotificationUUID(), Notification.class);
    }


    /* create a map of Notifier UUIDs and/or names to Notifiers */
    protected Map<Object, Notifier> getNotifierMap(Map payloads)
            throws Exception {
        Map<Object, Notifier> notifiers = new HashMap<Object, Notifier>(
                payloads.size() * 2);
        for (Object id : payloads.keySet()) {
            Identifier identifier = Identifier.from(id);
            Notifier notifier;
            if (identifier.isUUID()) {
                notifier = em.get(identifier.getUUID(), Notifier.class);
            } else {
                EntityRef ref = em.getAlias("notifier", identifier.getName());
                notifier = em.get(ref, Notifier.class);
            }
            if (notifier != null) {
                notifiers.put(notifier.getUuid(), notifier);
                notifiers.put(notifier.getUuid().toString(), notifier);
                if (notifier.getName() != null) {
                    notifiers.put(notifier.getName(), notifier);
                }
            }
        }
        return notifiers;
    }


    /**
     * attempts to test the providers connections - throws an Exception on
     * failure
     */
    public void testConnection(Notifier notifier) throws Exception {
        ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
        if (providerAdapter != null) {
            providerAdapter.testConnection();
        }
    }


    public ServiceManagerFactory getServiceManagerFactory(){
        return this.smf;
    }
    public EntityManagerFactory getEntityManagerFactory(){
        return this.emf;
    }


    public MetricsFactory getMetricsFactory() {
        return metricsService;
    }
}
