| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.commons.functor.aggregator; |
| |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| /** |
| * An aggregator which automatically resets the aggregated data at regular |
| * intervals and sends a notification when it is about to do so, so listeners |
| * can decide to gather the information before it is being reset (and log it |
| * etc). This allows for smaller memory footprints for instance in the case of |
| * List-based aggregators, as regularly the list is emptied. Also it allows for |
| * the call to <code>evaluate</code> to represent an "aggregated" value over a |
| * certain period of time. Note that you can still have a regular aggregator |
| * extending this class by specifying an interval less than or equal to zero. |
| * The regular flush/reset will be triggered from a timer which will always be |
| * started as a daemon thread (so it will stop when there are no more non-daemon |
| * threads in the JVM); this class allows 2 types of timers: |
| * <ul> |
| * <li>(default) per instance <code>Timer</code> -- each instance of this class |
| * will create a new <code>Timer</code> and this <code>Timer</code> will have a |
| * single <code>TimerTask</code> scheduled, which is the one that resets this |
| * <code>Aggregator</code> regularly and sends notifications. This way, when the |
| * <code>Aggregator</code> instance is destroyed, the <code>Timer</code> goes as |
| * well.</li> |
| * <li>shared <code>Timer</code> instance -- this class will create a static |
| * instance of <code>Timer</code> which can be shared by other instances of this |
| * class. While this is a bit more effective from a memory and thread management |
| * point of view, it has the downside that if the <code>TimerTask</code>'s are |
| * not managed properly this can create memory leaks. So if you decide to take |
| * this route make sure when you are finished with this instance, to always stop |
| * the timer at the end.</li> |
| * </ul> |
| * <p> |
| * <b>Synchronization</b>: This class provides a thread safe framework so when |
| * {@link #doAdd(Object)}, {@link #reset()} and {@link #evaluate()} is called, |
| * access is synchronized via a read-write lock. {@link #evaluate()} is |
| * considered a read operation and {@link #doAdd(Object)} and {@link #reset()} |
| * are considered write operations. |
| * </p> |
| * |
| * @param <T> |
| * type of data to aggregate |
| */ |
| public abstract class AbstractTimedAggregator<T> implements Aggregator<T> { |
| /** |
| * Stores a list to all objects which are listening for time events |
| * generated by this object. If there is no timer programmed (e.g. |
| * {@link #interval} is set to 0) this list will be <code>null</code>. Under |
| * the cover, this will use a <code>CopyOnWriteArrayList</code> since there |
| * aren't too many updates expected to this list. |
| * |
| * @see #interval |
| * @see #timer |
| * @see TimedAggregatorListener |
| */ |
| private List<TimedAggregatorListener<T>> timerListeners; |
| |
| /** |
| * As per {@link #timer} javadoc, if the interval specified is zero or less |
| * there will be no <code>Timer</code> created/assigned to this instance. |
| * This constant is defined to make it easier to read code which creates |
| * instances of this class and doesn't assign them a timer. |
| */ |
| public static final long NO_TIMER = 0L; |
| |
| /** |
| * Name of the shared timer which will run all the TimerTasks resulted from |
| * creating instances of this class which are set to used the shared timer. |
| * This is useful when looking at thread dumps. For instances which use |
| * their own timer task, the name will be |
| * <code>TIMER_NAME + hashCode()</code>. |
| */ |
| public static final String TIMER_NAME = "TimedSummarizerMainTimer"; |
| |
| /** |
| * The main shared timer which will execute all the <code>TimerTasks</code> |
| * resulted from instances of this class which chose to use the shared |
| * timer. Note that this <code>Timer</code> is started as a daemon thread so |
| * it will stop when there are no more non-daemon threads. |
| * |
| * @see #timer |
| */ |
| private static final Timer MAIN_TIMER = new Timer(TIMER_NAME, true); |
| |
| /** |
| * The timer instance for this instance. Can point to {@link #MAIN_TIMER} if |
| * shared timer was chosen in constructor or a newly created instance of |
| * <code>Timer</code> which is private to this instance only. |
| * |
| * @see #MAIN_TIMER |
| */ |
| private Timer timer; |
| |
| /** |
| * Interval in milliseconds we flush the result of the "summary". This will |
| * be used to set up our <code>TimerTask</code> and schedule it with the |
| * <code>Timer</code>. Every time the timer kicks in after this interval, it |
| * will call {@link #timer()}. If this is set to a value of zero or less, no |
| * timer will be created. |
| */ |
| private long interval; |
| |
| /** |
| * This is the task that is created when a new instance of this class is |
| * created. Once created this task will be scheduled with the {@link #timer} |
| * . Calling {@link #stop()} cancels this task and also will set it to null |
| * (so it can be recycled by the garbage collection), otherwise, until that |
| * point this will store a reference to a valid <code>TimerTask</code> |
| * instance. |
| */ |
| private TimerTask task; |
| |
| /** |
| * Lock used internally to synchronize access to {@link #add(Object)}, |
| * {@link #reset()} and {@link #evaluate()}. Locks for writing when |
| * {@link #add(Object)} and {@link #reset()} is called and for reading when |
| * {@link #evaluate()} is called. |
| * |
| * @see #add(Object) |
| * @see #evaluate() |
| * @see #reset() |
| */ |
| private ReadWriteLock dataLock; |
| |
| /** |
| * Default constructor -- creates an instance of this aggregator with no |
| * <code>Timer</code>. Equivalent to |
| * <code>AbstractTimedAggregator(NO_TIMER)</code>. |
| * |
| * @see #AbstractTimedAggregator(long) |
| */ |
| public AbstractTimedAggregator() { |
| this(NO_TIMER); |
| } |
| |
| /** |
| * Creates an aggregator which has a timer at the specified interval |
| * (miliseconds) and uses its own timer rather than the shared |
| * {@link #MAIN_TIMER}. Equivalent to |
| * <code>AbstractTimedAggregator(interval,false)</code>. |
| * |
| * @param interval |
| * interval in miliseconds to set the timer for. |
| * @see #interval |
| * @see #timer |
| * @see #AbstractTimedAggregator(long, boolean) |
| */ |
| public AbstractTimedAggregator(long interval) { |
| this(interval, false); |
| } |
| |
| /** |
| * Creates an aggregator which has a timer at the specified interval and |
| * also allows control over using the {@link #MAIN_TIMER shared timer} or |
| * its own per-instance timer. |
| * |
| * @param interval |
| * interval in miliseconds to set the timer for. |
| * @param useSharedTimer |
| * if set to <code>true</code>, {@link #timer} will be set to |
| * {@link #TIMER_NAME}, otherwise a new instance of |
| * <code>Timer</code> will be created. |
| */ |
| public AbstractTimedAggregator(long interval, boolean useSharedTimer) { |
| if (interval <= NO_TIMER) { |
| // not using timer |
| this.interval = NO_TIMER; |
| this.timer = null; |
| this.task = null; |
| this.timerListeners = null; |
| } else { |
| // we have been requested to use timers |
| this.interval = interval; |
| this.timerListeners = new CopyOnWriteArrayList<TimedAggregatorListener<T>>(); |
| if (useSharedTimer) { |
| this.timer = MAIN_TIMER; |
| } else { |
| this.timer = new Timer(TIMER_NAME + hashCode(), true); |
| } |
| // having set up the timer, create the task |
| this.task = new TimerTask() { |
| @Override |
| public void run() { |
| timer(); |
| } |
| }; |
| this.timer.scheduleAtFixedRate(this.task, this.interval, this.interval); |
| } |
| this.dataLock = new ReentrantReadWriteLock(); |
| } |
| |
| /** |
| * Getter for {@link #interval}. |
| * |
| * @return Current value of {@link #interval}. |
| */ |
| public final long getInterval() { |
| return interval; |
| } |
| |
| /** |
| * Adds the data to this aggregator. This function first locks |
| * {@link #dataLock} for writing then calls {@link #doAdd(Object)}, which |
| * allows subclasses to perform the actual adding to the aggregator and then |
| * at the end it unlocks {@link #dataLock}. |
| * |
| * @param data |
| * Data to be added to the aggregator. |
| * @see #doAdd(Object) |
| * @see #dataLock |
| */ |
| public final void add(T data) { |
| dataLock.writeLock().lock(); |
| try { |
| doAdd(data); |
| } finally { |
| dataLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Function provided to allow subclasses to perform the actual adding of the |
| * data to the aggregator. This function is wrapped by {@link #add(Object)} |
| * so that access to any internal data series (implemented by subclasses) |
| * via {@link #add(Object)} or {@link #evaluate()} or {@link #reset()} is |
| * prohibited during this call, as a <b>write</b> lock is acquired prior to |
| * this function call to ensure this function is the only one which has |
| * access to the data. |
| * |
| * @param data |
| * Data to be aggregated |
| * @see #add(Object) |
| */ |
| protected abstract void doAdd(T data); |
| |
| /** |
| * Aggregates all the data this object has been "fed" via calls to |
| * {@link #add(Object)}. Note that this object delegates the call to |
| * {@link #doEvaluate()} after it secured read-only access to |
| * {@link #dataLock} -- so any data series access can be safely read |
| * (however, subclasses should NOT try to modify any data series they might |
| * implement at this point!). The lock is released after |
| * {@link #doEvaluate()} returns. |
| * |
| * @return result of aggregating the data, as returned by |
| * {@link #doEvaluate()} |
| * @see #doEvaluate() |
| */ |
| public final T evaluate() { |
| dataLock.readLock().lock(); |
| try { |
| return doEvaluate(); |
| } finally { |
| dataLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Allows subclasses to perform the actual evaluation of the aggregated |
| * result in a thread-safe manner. When this function is called, |
| * <b>write</b> access to data (via {@link #add(Object)} and |
| * {@link #reset()}) is prohibited until this function finishes. However, |
| * please note that other read access (via calls to the same |
| * {@link #evaluate()}) is possible. |
| * |
| * @return Result of evaluating the aggregated data |
| */ |
| protected abstract T doEvaluate(); |
| |
| /** |
| * Resets this aggregator.This function first locks {@link #dataLock} for |
| * writing then calls {@link #doReset()}, which allows subclasses to perform |
| * the actual resetting of the aggregator and then at the end it unlocks |
| * {@link #dataLock}. |
| * |
| * @see #doReset() |
| */ |
| public final void reset() { |
| dataLock.writeLock().lock(); |
| try { |
| doReset(); |
| } finally { |
| dataLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Function provided to allow subclasses to perform the actual reset of the |
| * aggregator. This function is wrapped by {@link #reset()} so that access |
| * to data (via {@link #add(Object)} or {@link #evaluate()} or |
| * {@link #reset()}) is prohibited during this call, as a <b>write</b> lock |
| * is acquired prior to this function call to ensure this function is the |
| * only one which has access to the data. |
| */ |
| protected abstract void doReset(); |
| |
| /** |
| * Retrieves the size of the currently-stored data series. This function |
| * first locks {@link #dataLock} for reading then calls |
| * {@link #retrieveDataSize()}, which allows subclasses to compute the data |
| * series size and then at the end it unlocks {@link #dataLock}. |
| * |
| * @return Size of the current data series, which will be aggregated at the |
| * next call to {@link #evaluate()} |
| */ |
| public final int getDataSize() { |
| dataLock.readLock().lock(); |
| try { |
| return retrieveDataSize(); |
| } finally { |
| dataLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Function provided to allow subclasses to retrieve the actual size of the |
| * data series. This function is wrapped by {@link #getDataSize()} so that |
| * access to data (via {@link #add(Object)} or {@link #reset()}) is |
| * prohibited during this call, as a <b>read</b> lock is acquired prior to |
| * this function call. (However, calls to {@link #evaluate()} are allowed as |
| * that locks for reading too.) |
| * |
| * @return Size of the current data series. Zero means no data stored. |
| */ |
| protected abstract int retrieveDataSize(); |
| |
| /** |
| * Retrieves <b>an unmodifiable copy</b> of the {@link #timerListeners timer |
| * listeners}. Used for testing. |
| * |
| * @return <code>Collections.unmodifiableList(timerListeners)</code> if |
| * {@link #timerListeners} is <b>not</b> <code>null</code>, or |
| * <code>null</code> otherwise. |
| */ |
| final List<TimedAggregatorListener<T>> getTimerListeners() { |
| if (timerListeners == null) { |
| return null; |
| } |
| return Collections.unmodifiableList(timerListeners); |
| } |
| |
| /** |
| * If this <code>Aggregator</code> has been started with timer support, it |
| * will add the given listener, so it receives |
| * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object) |
| * timer events}. If no timer support has been configured for this |
| * Aggregator, this call has no effect. |
| * |
| * @param listener |
| * Listener to be added to received timer events from this |
| * aggregator. |
| * @see #timerListeners |
| */ |
| public final void addTimerListener(TimedAggregatorListener<T> listener) { |
| if (timerListeners == null) { |
| return; |
| } |
| timerListeners.add(listener); |
| } |
| |
| /** |
| * Removes a listener from the timer listeners list if previously added. If |
| * this Aggregator has been configured with no timer support, this call will |
| * always return <code>false</code>. |
| * |
| * @param listener |
| * Listener to be removed from the list. NullPointerException |
| * thrown if this is null. |
| * @return <code>true</code> if this Aggregator has timer support and the |
| * listener passed in was previously added (via |
| * {@link #addTimerListener(TimedAggregatorListener)}) or false if |
| * either the Aggregator has no timer support or it has timer |
| * support but the listener was never registered with this |
| * Aggregator. |
| * @see #timerListeners |
| */ |
| public final boolean removeTimerListener(TimedAggregatorListener<T> listener) { |
| if (timerListeners == null) { |
| return false; |
| } |
| return timerListeners.remove(listener); |
| } |
| |
| /** |
| * Computes the current aggregated value (by calling {@link #evaluate()}, |
| * resets this aggregator then notifies all listeners. Go through all the |
| * {@link #timerListeners} and sends |
| * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object) |
| * notification messages} to each of them. Does nothing if |
| * {@link #timerListeners} is <code>null</code>. Please note that |
| * {@link #evaluate()} is called only once at the beginning of this |
| * function, and only if there are listeners configured, then this value is |
| * passed to every notification. This is in order to ensure all listeners |
| * receive the same value -- the value of the evaluation prior to resetting |
| * it. |
| */ |
| private void timer() { |
| if (timerListeners != null) { |
| // if we have listeners, notify them |
| T aggregated = evaluate(); // NOTE: shouldn't evaluate() and reset() |
| // be done atomically here? |
| reset(); |
| for (TimedAggregatorListener<T> i : timerListeners) { |
| i.onTimer(this, aggregated); |
| } |
| } else { |
| reset(); |
| } |
| } |
| |
| /** |
| * Checks whether this instance has a timer associated with it or not. If |
| * there is a timer for this Aggregator, then the {@link #task} member |
| * should be set to a non-null value. |
| * |
| * @return <code>true</code> if {@link #task} is not null, |
| * <code>false</code> otherwise (in which case there is no timer). |
| */ |
| public final boolean isTimerEnabled() { |
| return (task != null); |
| } |
| |
| /** |
| * Checks whether this instance uses its own timer or {@link #MAIN_TIMER the |
| * shared timer} for scheduling {@link #task the timer task}. |
| * |
| * @return <code>true</code> if <code>timer == MAIN_TIMER</code> or |
| * <code>false</code> otherwise. |
| */ |
| public final boolean isSharedTimer() { |
| return (timer == MAIN_TIMER); |
| } |
| |
| /** |
| * Cancels the current timer task (if set) -- which means from there on the |
| * data will not be reset anymore. Also, if {@link #timer} is not set to |
| * {@link #MAIN_TIMER the shared timer} then it will be cancelled as well |
| * Also releases all the listeners from the {@link #timerListeners list}. |
| */ |
| public final void stop() { |
| // cancel the task first |
| if (task != null) { |
| task.cancel(); |
| task = null; |
| timer.purge(); // remove the reference to this task |
| } |
| // then the timer if needed |
| if (timer != null && timer != MAIN_TIMER) { |
| timer.cancel(); |
| } |
| timer = null; |
| // finally remove the elements from the listeners list |
| if (timerListeners != null) { |
| timerListeners.clear(); |
| } |
| } |
| |
| @Override |
| protected final void finalize() throws Throwable { |
| // if we're going in the garbage, make sure we clean up |
| stop(); |
| } |
| |
| @Override |
| public String toString() { |
| return AbstractTimedAggregator.class.getName(); |
| } |
| } |