/*
 * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *          http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific
 * language governing permissions and limitations under the
 * License. See accompanying LICENSE file.
 */
package org.apache.s4.core;

import java.util.Collection;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import net.jcip.annotations.ThreadSafe;

import org.apache.s4.base.Event;
import org.apache.s4.core.gen.OverloadDispatcher;
import org.apache.s4.core.gen.OverloadDispatcherGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;

/**
 * <p>
 * Base class for implementing processing in S4. All instances are organized as follows:
 * <ul>
 * <li>A PE prototype is a special type of instance that, along with {@link Stream} defines the topology of the
 * application graph.
 * <li>PE prototypes manage the creation and destruction of PE instances.
 * <li>All PE instances are clones of a PE prototype.
 * <li>PE instances are associated with a unique key.
 * <li>PE instances do the actual work by processing any number of input events of various types and emit output events
 * of various types.
 * <li>To process events, {@code ProcessingElement} dynamically matches an event type to a processing method. See
 * {@link org.apache.s4.core.gen.OverloadDispatcher} . There are two types of processing methods:
 * <ul>
 * <li>{@code onEvent(SomeEvent event)} When implemented, input events of type {@code SomeEvent} will be dispatched to
 * this method.
 * <li>{@code onTrigger(AnotherEvent event)} When implemented, input events of type {@code AnotherEvent} will be
 * dispatched to this method when certain conditions are met. See {@link #setTrigger(Class, int, long, TimeUnit)}.
 * </ul>
 * <li>
 * A PE implementation must not create threads. A periodic task can be implemented by overloading the {@link #onTime()}
 * method. See {@link #setTimerInterval(long, TimeUnit)}
 * <li>If a reference in the PE prototype shared by the PE instances, the object must be thread safe.
 * <li>The code in a PE instance is synchronized by the framework to avoid concurrency problems.
 * <li>In some special cases, it may be desirable to allow concurrency in the PE instance. For example, there may be
 * several event processing methods that can safely run concurrently. To enable concurrency, annotate the implementation
 * of {@code ProcessingElement} with {@link ThreadSafe}.
 * <li>PE instances never use the constructor. They must be initialized by implementing the {@link #onCreate()} method.
 * <li>PE class fields are cloned from the prototype. References are also copied which means that if the prototype
 * creates a collection object, all instances will be sharing the same collection object which is usually <em>NOT</em>
 * what the programmer intended . The application developer is responsible for initializing objects in the
 * {@link #onCreate()} method. For example, if each instance requires a
 * <tt>List<tt/> object the PE should implement the following:
 *         <pre>
 *         {@code
 *         public class MyPE extends ProcessingElement {
 * 
 *           private Map<String, Integer> wordCount;
 * 
 *           ...
 * 
 *           onCreate() {
 *           wordCount = new HashMap<String, Integer>;
 *           logger.trace("Created a map for instance PE with id {}, getId());
 *           }
 *         }
 *         }
 *         </pre>
 * 
 * 
 * </ul>
 * 
 * 
 * 
 * 
 */
public abstract class ProcessingElement implements Cloneable {

    private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
    private static final String SINGLETON = "singleton";

    protected App app;

    /*
     * This maps holds all the instances. We make it package private to prevent concrete classes from updating the
     * collection.
     */
    Cache<String, ProcessingElement> peInstances;

    /* This map is initialized in the prototype and cloned to instances. */
    Map<Class<? extends Event>, Trigger> triggers;

    /* PE instance id. */
    String id = "";

    /* Private fields. */
    private ProcessingElement pePrototype;
    private boolean haveTriggers = false;
    private long timerIntervalInMilliseconds = 0;
    private Timer timer;
    private boolean isPrototype = true;
    private boolean isThreadSafe = false;
    private boolean isFirst = true;
    private boolean isSingleton = false;

    private transient OverloadDispatcher overloadDispatcher;

    protected ProcessingElement() {
        OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
        Class<?> overloadDispatcherClass = oldg.generate();
        try {
            overloadDispatcher = (OverloadDispatcher) overloadDispatcherClass.newInstance();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        peInstances = CacheBuilder.newBuilder().build(new CacheLoader<String, ProcessingElement>() {
            @Override
            public ProcessingElement load(String key) throws Exception {
                return createPE(key);
            }
        });

        triggers = new MapMaker().makeMap();

        /*
         * Only the PE Prototype uses the constructor. The PEPrototype field will be cloned by the instances and point
         * to the prototype.
         */
        this.pePrototype = this;
    }

    /**
     * Create a PE prototype. By default, PE instances will never expire. Use {@code #configurePECache} to configure.
     * 
     * @param app
     *            the app that contains this PE
     */
    public ProcessingElement(App app) {
        this();
        setApp(app);
    }

    /**
     * This method is called by the PE timer. By default it is synchronized with the {@link #onEvent()} and
     * {@link #onTrigger()} methods. To execute concurrently with other methods, the {@link ProcessingElelment} subclass
     * must be annotated with {@link @ThreadSafe}.
     * 
     * Override this method to implement a periodic process.
     */
    protected void onTime() {
    }

    /**
     * This method is called after a PE instance is created. Use it to initialize fields that are PE instance specific.
     * PE instances are created using {#clone()}.
     * 
     * <p>
     * <b>Fields initialized in the class constructor are shared by all PE instances.</b>
     * </p>
     */
    abstract protected void onCreate();

    /**
     * This method is called before a PE instance is removed. Use it to close resources and clean up.
     */
    abstract protected void onRemove();

    /**
     * PE objects must be associated with one and only one {@code App} object.
     * 
     * @return the app
     */
    public App getApp() {
        return app;
    }

    public void setApp(App app) {
        if (this.app != null) {
            throw new RuntimeException("Application was already assigne to this processing element");
        }
        this.app = app;
        app.addPEPrototype(this, null);

    }

    /**
     * Returns the approximate number of PE instances from the cache.
     * 
     * @return the approximate number of PE instances.
     */
    public long getNumPEInstances() {

        return peInstances.size();
    }

    Map<String, ProcessingElement> getPEInstances() {
        return peInstances.asMap();
    }

    /**
     * Set PE expiration and cache size.
     * <p>
     * PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs
     * creation, or last access.
     * <p>
     * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
     * <p>
     * When this method is called all existing PE instances are destroyed.
     * 
     * 
     * @param maximumSize
     *            the approximate maximum number of PEs in the cache.
     * @param duration
     *            the PE duration
     * @param timeUnit
     *            the time unit
     * @return the PE prototype
     */
    public ProcessingElement setPECache(int maximumSize, long duration, TimeUnit timeUnit) {

        Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");

        peInstances = CacheBuilder.newBuilder().expireAfterAccess(duration, timeUnit).maximumSize(maximumSize)
                .build(new CacheLoader<String, ProcessingElement>() {
                    @Override
                    public ProcessingElement load(String key) throws Exception {
                        return createPE(key);
                    }
                });

        return this;
    }

    /**
     * This trigger is fired when the following conditions occur:
     * 
     * <ul>
     * <li>An event of eventType arrived to the PE instance
     * <li>numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than
     * interval.
     * </ul>
     * 
     * <p>
     * When the trigger fires, the method <tt>trigger(EventType event)</tt> is called. Where <tt>EventType</tt> matches
     * the argument eventType.
     * 
     * @param eventType
     *            the type of event on which this trigger will fire.
     * @param numEvents
     *            number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
     *            every input event.)
     * @param interval
     *            minimum time between triggers. Set to zero if no time interval needed.
     * @param timeUnit
     *            the TimeUnit for the argument interval. Can set to null if no time interval needed.
     * @return the PE prototype
     */
    public ProcessingElement setTrigger(Class<? extends Event> eventType, int numEvents, long interval,
            TimeUnit timeUnit) {

        Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
        Preconditions.checkNotNull(eventType, "Need eventType to set trigger.");
        Preconditions.checkArgument(numEvents > 0 || interval > 0,
                "To set trigger numEvent OR interval must be greater than zero.");
        Preconditions.checkArgument(timeUnit != null || interval < 1,
                "To set trigger timeUnit is needed when interval is greater than zero.");

        /* Skip trigger checking overhead if there are no triggers. */
        haveTriggers = true;

        if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS)
            interval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);

        Trigger config = new Trigger(numEvents, interval);

        triggers.put(eventType, config);

        return this;
    }

    /**
     * @return the isSingleton
     */
    public boolean isSingleton() {
        return isSingleton;
    }

    /**
     * Makes this PE a singleton. A single PE instance is eagerly created and ready to receive events.
     * 
     * @param isSingleton
     * @throws ExecutionException
     */
    public ProcessingElement setSingleton(boolean isSingleton) {

        if (!isPrototype) {
            logger.warn("This method can only be used on the PE prototype.");
            return this;
        }
        this.isSingleton = isSingleton;

        return this;
    }

    /**
     * The duration of the periodic task controlled by the embedded timer.
     * 
     * @param timeUnit
     *            the timeUnt of the returned value.
     * @return the timer interval.
     */
    public long getTimerInterval(TimeUnit timeUnit) {
        return timeUnit.convert(timerIntervalInMilliseconds, TimeUnit.MILLISECONDS);
    }

    /**
     * Set a timer that calls {@link #onTime()}.
     * 
     * If {@code interval==0} the timer is disabled.
     * 
     * @param interval
     *            in timeUnit
     * @param timeUnit
     *            the timeUnit of interval
     */
    public ProcessingElement setTimerInterval(long interval, TimeUnit timeUnit) {
        timerIntervalInMilliseconds = TimeUnit.MILLISECONDS.convert(interval, timeUnit);

        Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");

        if (timer != null) {
            timer.cancel();
        }

        if (interval == 0)
            return this;

        timer = new Timer();
        return this;
    }

    /**
     * Set to true if the concrete PE class has the {@link ThreadSafe} annotation. The default is false (no annotation).
     * In general, application developers don't need to worry about thread safety in the concrete PEs. In some cases the
     * PE needs to be thread safe to avoid deadlocks. For example , if the application graph has cycles and the queues
     * are allowed to block, then some critical PEs with multiple incoming streams need to be made thread safe to avoid
     * locking the entire PE instance.
     * 
     * @return true if the PE implementation is considered thread safe.
     */
    public boolean isThreadSafe() {
        return isThreadSafe;
    }

    protected void handleInputEvent(Event event) {

        Object object;
        if (isThreadSafe) {
            object = new Object(); // a dummy object TODO improve this.
        } else {
            object = this;
        }

        synchronized (object) {

            /* Dispatch onEvent() method. */
            overloadDispatcher.dispatchEvent(this, event);

            /* Dispatch onTrigger() method. */
            if (haveTriggers && isTrigger(event)) {
                overloadDispatcher.dispatchTrigger(this, event);
            }
        }
    }

    private boolean isTrigger(Event event) {
        return isTrigger(event, event.getClass());
    }

    /**
     * Checks the trigger for this event type. Creates an inactive trigger if no trigger is found after recursively
     * exploring the event class hierarchy. An inactive trigger never triggers.
     * 
     * @return true if trigger is reached, false if trigger is not ready yet or if trigger is inactive
     * 
     */
    private boolean isTrigger(Event event, Class<?> triggerClass) {
        /* Check if there is a trigger for this event type. Create an */
        Trigger trigger = triggers.get(triggerClass);

        if (trigger == null) {
            if (!Event.class.isAssignableFrom(triggerClass)) {
                // reached termination condition
                triggers.put(event.getClass(), new Trigger());
                return false;
            } else {
                // further explore hierarchy
                return isTrigger(event, triggerClass.getSuperclass());
            }
        } else {
            /*
             * Check if it is time to activate the trigger for this event type.
             */
            return trigger.checkAndUpdate();
        }
    }

    private void removeInstanceForKeyInternal(String id) {

        if (id == null)
            return;

        /* First let the PE instance clean after itself. */
        onRemove();

        /* Remove PE instance. */
        peInstances.invalidate(id);
    }

    protected void removeAll() {

        /* Close resources in prototype. */
        if (timer != null) {
            timer.cancel();
            logger.info("Timer stopped.");
        }

        /* Remove all the instances. */
        peInstances.invalidateAll();
    }

    protected void close() {
        removeInstanceForKeyInternal(id);
    }

    private ProcessingElement createPE(String id) {
        ProcessingElement pe = (ProcessingElement) this.clone();
        pe.isPrototype = false;
        pe.id = id;
        pe.triggers = Maps.newHashMap(triggers);
        pe.onCreate();
        logger.trace("Num PE instances: {}.", getNumPEInstances());
        return pe;
    }

    /* This method is called by App just before the application starts. */
    void initPEPrototypeInternal() {

        /* Eagerly create singleton PE. */
        if (isSingleton) {
            try {
                peInstances.get(SINGLETON);
                logger.trace("Created singleton [{}].", getInstanceForKey(SINGLETON));
            } catch (ExecutionException e) {
                logger.error("Problem when trying to create a PE instance.", e);
            }
        }

        /* Start timer. */
        if (timer != null) {
            timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
            logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
                    this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
        }

        /* Check if this PE is annotated as thread safe. */
        if (getClass().isAnnotationPresent(ThreadSafe.class) == true) {

            // TODO: this doesn't seem to be working. isannotationpresent always returns false.

            isThreadSafe = true;
            logger.trace("Annotated with @ThreadSafe");
        }

    }

    /**
     * This method is designed to be used within the package. We make it package-private. The returned instances are all
     * in the same JVM. Do not use it to access remote objects.
     * 
     * @throws ExecutionException
     */
    public ProcessingElement getInstanceForKey(String id) {

        /* Check if instance for key exists, otherwise create one. */
        try {
            if (isSingleton) {
                logger.trace(
                        "Requested a PE instance with key [{}]. The instance is a singleton and will ignore the key. The key should be set to null when requesting a singleton.",
                        id);
                return peInstances.get(SINGLETON);
            }
            return peInstances.get(id);
        } catch (ExecutionException e) {
            logger.error("Problem when trying to create a PE instance.", e);
        }
        return null;
    }

    /**
     * Get all the local instances. See notes in {@link #getInstanceForKey(String) getLocalInstanceForKey}
     */
    public Collection<ProcessingElement> getInstances() {

        return peInstances.asMap().values();
    }

    /**
     * This method returns a remote PE instance for key. TODO: not implemented for cluster configuration yet, use it
     * only in single node configuration. for testing apps.
     * 
     * @return pe instance for key. Null if if doesn't exist.
     */
    public ProcessingElement getRemoteInstancesForKey() {
        logger.warn("The getRemoteInstancesForKey() method is not implemented. Use "
                + "it to test your app in single node configuration only. Should work "
                + "transparently for remote objects once it is implemented.");

        ProcessingElement pe = peInstances.asMap().get(id);
        return pe;
    }

    /**
     * This method returns an immutable map that contains all the PE instances for this prototype. PE instances may be
     * located anywhere in the cluster. Be aware that this could be an expensive operation. TODO: not implemented for
     * cluster configuration yet, use it only in single node configuration. for testing apps.
     */
    public Map<String, ProcessingElement> getRemoteInstances() {

        logger.warn("The getRemoteInstances() method is not implemented. Use "
                + "it to test your app in single node configuration only. Should work "
                + "transparently for remote objects once it is implemented.");

        /*
         * For now we just return a copy as a placeholder. We need to implement a custom map capable of working on an S4
         * cluster as efficiently as possible.
         */
        return ImmutableMap.copyOf(peInstances.asMap());
    }

    /**
     * Unique ID for a PE instance.
     * 
     * @return the id
     */
    public String getId() {
        return id;
    }

    /**
     * The {@code ProcessingElement} prototype for this object.
     * 
     * @return the corresponding {@code ProcessingElement} for this instance.
     */
    public ProcessingElement getPrototype() {
        return pePrototype;
    }

    /**
     * This method exists simply to make <code>clone()</code> protected.
     */
    @Override
    protected Object clone() {
        try {
            Object clone = super.clone();
            return clone;
        } catch (CloneNotSupportedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Helper method to be used by PE implementation classes. Sends an event to all the target streams.
     * 
     */
    protected <T extends Event> void emit(T event, Stream<T>[] streamArray) {

        for (int i = 0; i < streamArray.length; i++) {
            streamArray[i].put(event);
        }
    }

    private class OnTimeTask extends TimerTask {

        @Override
        public void run() {

            for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {

                ProcessingElement peInstance = entry.getValue();

                try {
                    if (isThreadSafe) {
                        peInstance.onTime();
                    } else {
                        synchronized (this) {
                            peInstance.onTime();
                        }
                    }
                } catch (Exception e) {
                    logger.error("Caught exception in timer when calling PE instance [{}] with id [{}].", peInstance,
                            peInstance.id);
                    logger.error("Timer error.", e);
                }
            }
        }
    }

    class Trigger {
        final long intervalInMilliseconds;
        final int intervalInEvents;
        long lastTime;
        int eventCount;
        // inactive triggers never trigger anything, they are used as markers
        boolean active = true;

        Trigger() {
            this.intervalInEvents = 0;
            this.intervalInMilliseconds = 0;
            this.active = false;
        }

        Trigger(int intervalInEvents, long intervalInMilliseconds) {
            this.intervalInEvents = intervalInEvents;
            this.intervalInMilliseconds = intervalInMilliseconds;
        }

        boolean checkAndUpdate() {
            if (active) {
                long timeLapse = System.currentTimeMillis() - lastTime;
                eventCount++;
                lastTime = System.currentTimeMillis();

                if (timeLapse > intervalInMilliseconds || eventCount >= intervalInEvents) {
                    eventCount = 0;
                    return true;
                }
            }
            return false;
        }

        boolean isActive() {
            return active;
        }
    }
}
