/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package flex.messaging.util;

import flex.messaging.log.Log;
import flex.messaging.log.LogCategories;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * This class provides a means of managing TimeoutCapable objects. It leverages
 * facilities in the the Java concurrency package to provide a common utility
 * for scheduling timeout Futures and managing the underlying worker thread pools.
 */
public class TimeoutManager {
    private static final String LOG_CATEGORY = LogCategories.TIMEOUT;

    private ScheduledThreadPoolExecutor timeoutService;

    /**
     * Default constructor calls parameterized constructor will a null factory argument.
     */
    public TimeoutManager() {
        this(null);
    }

    /**
     * Constructs a new TimeoutManager using the passed in factory for thread creation.
     * and a single thread of the TimeoutManager.
     *
     * @param tf ThreadFactory
     */
    public TimeoutManager(ThreadFactory tf) {
        this(tf, 1);
    }

    /**
     * Constructs a new TimeoutManager using the passe din factory for thread creation
     * and the passed in number of threads for TimeoutManager to use.
     *
     * @param tf              The ThreadFactory to use.
     * @param numberOfThreads The number of threads for the ThreadFactory to use.
     */
    public TimeoutManager(ThreadFactory tf, int numberOfThreads) {
        if (tf == null) {
            tf = new MonitorThreadFactory();
        }
        if (numberOfThreads < 1)
            numberOfThreads = 1;
        timeoutService = new ScheduledThreadPoolExecutor(numberOfThreads, tf);
    }

    /**
     * Schedule a task to be executed in the future.
     *
     * @param t task to be executed at some future time
     * @return a Future object that enables access to the value(s) returned by the task
     */
    public Future scheduleTimeout(TimeoutCapable t) {
        Future future = null;
        if (t.getTimeoutPeriod() > 0) {
            Runnable timeoutTask = new TimeoutTask(t);
            future = timeoutService.schedule(timeoutTask, t.getTimeoutPeriod(), TimeUnit.MILLISECONDS);
            t.setTimeoutFuture(future);
            if (t instanceof TimeoutAbstractObject) {
                TimeoutAbstractObject timeoutAbstract = (TimeoutAbstractObject) t;
                timeoutAbstract.setTimeoutManager(this);
                timeoutAbstract.setTimeoutTask(timeoutTask);
            }
            if (Log.isDebug())
                Log.getLogger(LOG_CATEGORY).debug("TimeoutManager '" + System.identityHashCode(this) + "' has scheduled instance '" +
                        System.identityHashCode(t) + "' of type '" + t.getClass().getName() + "' to be timed out in " + t.getTimeoutPeriod() + " milliseconds. Task queue size: " + timeoutService.getQueue().size());
        }
        return future;
    }

    /**
     * Cancel the execution of a future task and remove all references to it.
     *
     * @param timeoutAbstract the task to be canceled
     * @return true if cancellation were successful
     */
    public boolean unscheduleTimeout(TimeoutAbstractObject timeoutAbstract) {
        Object toRemove = timeoutAbstract.getTimeoutFuture();
        /*
         * In more recent versions of the backport, they are requiring that we
         * pass in the Future returned by the schedule method.  This should always
         * implement Runnable even in 2.2 but I'm a little paranoid here so just
         * to be sure, if we get a future which is not a runnable we go back to the old
         * code which calls the remove on the instance we passed into the schedule method.
         */
        if (!(toRemove instanceof Runnable))
            toRemove = timeoutAbstract.getTimeoutTask();
        if (timeoutService.remove((Runnable) toRemove)) {
            if (Log.isDebug())
                Log.getLogger(LOG_CATEGORY).debug("TimeoutManager '" + System.identityHashCode(this) + "' has removed the timeout task for instance '" +
                        System.identityHashCode(timeoutAbstract) + "' of type '" + timeoutAbstract.getClass().getName() + "' that has requested its timeout be cancelled. Task queue size: " + timeoutService.getQueue().size());
        } else {
            Future timeoutFuture = timeoutAbstract.getTimeoutFuture();
            timeoutFuture.cancel(false); // Don't interrupt it if it's running.
            if (Log.isDebug())
                Log.getLogger(LOG_CATEGORY).debug("TimeoutManager '" + System.identityHashCode(this) + "' cancelling timeout task for instance '" +
                        System.identityHashCode(timeoutAbstract) + "' of type '" + timeoutAbstract.getClass().getName() + "' that has requested its timeout be cancelled. Task queue size: " + timeoutService.getQueue().size());
            if (timeoutFuture.isDone()) {
                timeoutService.purge(); // Force the service to give up refs to task immediately rather than hanging on to them.
                if (Log.isDebug())
                    Log.getLogger(LOG_CATEGORY).debug("TimeoutManager '" + System.identityHashCode(this) + "' purged queue of any cancelled or completed tasks. Task queue size: " + timeoutService.getQueue().size());
            }
        }

        // to aggressively clean up memory remove the reference from the unscheduled timeout to its 
        // time out object
        Object unscheduledTimeoutTask = timeoutAbstract.getTimeoutTask();
        if (unscheduledTimeoutTask != null && unscheduledTimeoutTask instanceof TimeoutTask)
            ((TimeoutTask) timeoutAbstract.getTimeoutTask()).clearTimeoutCapable();

        return true;
    }

    /**
     * Cancel all outstanding and any future tasks.
     */
    public void shutdown() {
        timeoutService.shutdownNow();
        // shutdownNow() returns List<Runnable> for all unexecuted tasks
        // but we ignore these because we're only queuing dependent tasks
        // for timed execution and they can be safely discarded when the 
        // parent object goes away and shuts down the TimeoutManager it is using.
        // Also, we may need to introduce an interface that encompasses 
        // using either a ScheduledThreadPoolExecutor or a CommonJ Timer;
        // in the case of a CommonJ Timer only a stop() method is provided 
        // which does not return handles to any queued, unexecuted tasks.
    }

    class MonitorThreadFactory implements ThreadFactory {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("TimeoutManager");
            return t;
        }
    }

    class TimeoutTask implements Runnable {
        private TimeoutCapable timeoutObject;


        /**
         * Removes the reference from this timeout task to the object that would
         * have been timed out.  This is useful for memory clean up when timeouts are unscheduled.
         */
        public void clearTimeoutCapable() {
            timeoutObject = null;
        }

        public TimeoutTask(TimeoutCapable timeoutObject) {
            this.timeoutObject = timeoutObject;
        }

        public void run() {
            // Because of the weird clearTimeCapable() in the middle of timeout call, we got NPE in the debug log level.
            // Now copy the reference to local varable
            TimeoutCapable timeoutObject = this.timeoutObject;
            long inactiveMillis = System.currentTimeMillis() - timeoutObject.getLastUse();
            if (inactiveMillis >= timeoutObject.getTimeoutPeriod()) {
                try {
                    timeoutObject.timeout();

                    if (Log.isDebug())
                        Log.getLogger(LOG_CATEGORY).debug("TimeoutManager '" + System.identityHashCode(TimeoutManager.this) + "' has run the timeout task for instance '" +
                                System.identityHashCode(timeoutObject) + "' of type '" + timeoutObject.getClass().getName() + "'. Task queue size: " + timeoutService.getQueue().size());
                } catch (Throwable t) {
                    if (Log.isError())
                        Log.getLogger(LOG_CATEGORY).error("TimeoutManager '" + System.identityHashCode(TimeoutManager.this) + "' encountered an error running the timeout task for instance '" +
                                System.identityHashCode(timeoutObject) + "' of type '" + timeoutObject.getClass().getName() + "'. Task queue size: " + timeoutService.getQueue().size(), t);
                }
            } else {
                // Reschedule timeout and store new Future for cancellation.
                timeoutObject.setTimeoutFuture(timeoutService.schedule(this, (timeoutObject.getTimeoutPeriod() - inactiveMillis), TimeUnit.MILLISECONDS));
                if (Log.isDebug())
                    Log.getLogger(LOG_CATEGORY).debug("TimeoutManager '" + System.identityHashCode(TimeoutManager.this) + "' has rescheduled a timeout for the active instance '" +
                            System.identityHashCode(timeoutObject) + "' of type '" + timeoutObject.getClass().getName() + "'. Task queue size: " + timeoutService.getQueue().size());
            }
        }
    }
}
