blob: 3bfc11bd3dfadb0370278b44647ed65873460c92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.functor.aggregator;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* An aggregator which automatically resets the aggregated data at regular
* intervals and sends a notification when it is about to do so, so listeners
* can decide to gather the information before it is being reset (and log it
* etc). This allows for smaller memory footprints for instance in the case of
* List-based aggregators, as regularly the list is emptied. Also it allows for
* the call to <code>evaluate</code> to represent an "aggregated" value over a
* certain period of time. Note that you can still have a regular aggregator
* extending this class by specifying an interval less than or equal to zero.
* The regular flush/reset will be triggered from a timer which will always be
* started as a daemon thread (so it will stop when there are no more non-daemon
* threads in the JVM); this class allows 2 types of timers:
* <ul>
* <li>(default) per instance <code>Timer</code> -- each instance of this class
* will create a new <code>Timer</code> and this <code>Timer</code> will have a
* single <code>TimerTask</code> scheduled, which is the one that resets this
* <code>Aggregator</code> regularly and sends notifications. This way, when the
* <code>Aggregator</code> instance is destroyed, the <code>Timer</code> goes as
* well.</li>
* <li>shared <code>Timer</code> instance -- this class will create a static
* instance of <code>Timer</code> which can be shared by other instances of this
* class. While this is a bit more effective from a memory and thread management
* point of view, it has the downside that if the <code>TimerTask</code>'s are
* not managed properly this can create memory leaks. So if you decide to take
* this route make sure when you are finished with this instance, to always stop
* the timer at the end.</li>
* </ul>
* <p>
* <b>Synchronization</b>: This class provides a thread safe framework so when
* {@link #doAdd(Object)}, {@link #reset()} and {@link #evaluate()} is called,
* access is synchronized via a read-write lock. {@link #evaluate()} is
* considered a read operation and {@link #doAdd(Object)} and {@link #reset()}
* are considered write operations.
* </p>
*
* @param <T>
* type of data to aggregate
*/
public abstract class AbstractTimedAggregator<T> implements Aggregator<T> {
/**
* Stores a list to all objects which are listening for time events
* generated by this object. If there is no timer programmed (e.g.
* {@link #interval} is set to 0) this list will be <code>null</code>. Under
* the cover, this will use a <code>CopyOnWriteArrayList</code> since there
* aren't too many updates expected to this list.
*
* @see #interval
* @see #timer
* @see TimedAggregatorListener
*/
private List<TimedAggregatorListener<T>> timerListeners;
/**
* As per {@link #timer} javadoc, if the interval specified is zero or less
* there will be no <code>Timer</code> created/assigned to this instance.
* This constant is defined to make it easier to read code which creates
* instances of this class and doesn't assign them a timer.
*/
public static final long NO_TIMER = 0L;
/**
* Name of the shared timer which will run all the TimerTasks resulted from
* creating instances of this class which are set to used the shared timer.
* This is useful when looking at thread dumps. For instances which use
* their own timer task, the name will be
* <code>TIMER_NAME + hashCode()</code>.
*/
public static final String TIMER_NAME = "TimedSummarizerMainTimer";
/**
* The main shared timer which will execute all the <code>TimerTasks</code>
* resulted from instances of this class which chose to use the shared
* timer. Note that this <code>Timer</code> is started as a daemon thread so
* it will stop when there are no more non-daemon threads.
*
* @see #timer
*/
private static final Timer MAIN_TIMER = new Timer(TIMER_NAME, true);
/**
* The timer instance for this instance. Can point to {@link #MAIN_TIMER} if
* shared timer was chosen in constructor or a newly created instance of
* <code>Timer</code> which is private to this instance only.
*
* @see #MAIN_TIMER
*/
private Timer timer;
/**
* Interval in milliseconds we flush the result of the "summary". This will
* be used to set up our <code>TimerTask</code> and schedule it with the
* <code>Timer</code>. Every time the timer kicks in after this interval, it
* will call {@link #timer()}. If this is set to a value of zero or less, no
* timer will be created.
*/
private long interval;
/**
* This is the task that is created when a new instance of this class is
* created. Once created this task will be scheduled with the {@link #timer}
* . Calling {@link #stop()} cancels this task and also will set it to null
* (so it can be recycled by the garbage collection), otherwise, until that
* point this will store a reference to a valid <code>TimerTask</code>
* instance.
*/
private TimerTask task;
/**
* Lock used internally to synchronize access to {@link #add(Object)},
* {@link #reset()} and {@link #evaluate()}. Locks for writing when
* {@link #add(Object)} and {@link #reset()} is called and for reading when
* {@link #evaluate()} is called.
*
* @see #add(Object)
* @see #evaluate()
* @see #reset()
*/
private ReadWriteLock dataLock;
/**
* Default constructor -- creates an instance of this aggregator with no
* <code>Timer</code>. Equivalent to
* <code>AbstractTimedAggregator(NO_TIMER)</code>.
*
* @see #AbstractTimedAggregator(long)
*/
public AbstractTimedAggregator() {
this(NO_TIMER);
}
/**
* Creates an aggregator which has a timer at the specified interval
* (miliseconds) and uses its own timer rather than the shared
* {@link #MAIN_TIMER}. Equivalent to
* <code>AbstractTimedAggregator(interval,false)</code>.
*
* @param interval
* interval in miliseconds to set the timer for.
* @see #interval
* @see #timer
* @see #AbstractTimedAggregator(long, boolean)
*/
public AbstractTimedAggregator(long interval) {
this(interval, false);
}
/**
* Creates an aggregator which has a timer at the specified interval and
* also allows control over using the {@link #MAIN_TIMER shared timer} or
* its own per-instance timer.
*
* @param interval
* interval in miliseconds to set the timer for.
* @param useSharedTimer
* if set to <code>true</code>, {@link #timer} will be set to
* {@link #TIMER_NAME}, otherwise a new instance of
* <code>Timer</code> will be created.
*/
public AbstractTimedAggregator(long interval, boolean useSharedTimer) {
if (interval <= NO_TIMER) {
// not using timer
this.interval = NO_TIMER;
this.timer = null;
this.task = null;
this.timerListeners = null;
} else {
// we have been requested to use timers
this.interval = interval;
this.timerListeners = new CopyOnWriteArrayList<TimedAggregatorListener<T>>();
if (useSharedTimer) {
this.timer = MAIN_TIMER;
} else {
this.timer = new Timer(TIMER_NAME + hashCode(), true);
}
// having set up the timer, create the task
this.task = new TimerTask() {
@Override
public void run() {
timer();
}
};
this.timer.scheduleAtFixedRate(this.task, this.interval, this.interval);
}
this.dataLock = new ReentrantReadWriteLock();
}
/**
* Getter for {@link #interval}.
*
* @return Current value of {@link #interval}.
*/
public final long getInterval() {
return interval;
}
/**
* Adds the data to this aggregator. This function first locks
* {@link #dataLock} for writing then calls {@link #doAdd(Object)}, which
* allows subclasses to perform the actual adding to the aggregator and then
* at the end it unlocks {@link #dataLock}.
*
* @param data
* Data to be added to the aggregator.
* @see #doAdd(Object)
* @see #dataLock
*/
public final void add(T data) {
dataLock.writeLock().lock();
try {
doAdd(data);
} finally {
dataLock.writeLock().unlock();
}
}
/**
* Function provided to allow subclasses to perform the actual adding of the
* data to the aggregator. This function is wrapped by {@link #add(Object)}
* so that access to any internal data series (implemented by subclasses)
* via {@link #add(Object)} or {@link #evaluate()} or {@link #reset()} is
* prohibited during this call, as a <b>write</b> lock is acquired prior to
* this function call to ensure this function is the only one which has
* access to the data.
*
* @param data
* Data to be aggregated
* @see #add(Object)
*/
protected abstract void doAdd(T data);
/**
* Aggregates all the data this object has been "fed" via calls to
* {@link #add(Object)}. Note that this object delegates the call to
* {@link #doEvaluate()} after it secured read-only access to
* {@link #dataLock} -- so any data series access can be safely read
* (however, subclasses should NOT try to modify any data series they might
* implement at this point!). The lock is released after
* {@link #doEvaluate()} returns.
*
* @return result of aggregating the data, as returned by
* {@link #doEvaluate()}
* @see #doEvaluate()
*/
public final T evaluate() {
dataLock.readLock().lock();
try {
return doEvaluate();
} finally {
dataLock.readLock().unlock();
}
}
/**
* Allows subclasses to perform the actual evaluation of the aggregated
* result in a thread-safe manner. When this function is called,
* <b>write</b> access to data (via {@link #add(Object)} and
* {@link #reset()}) is prohibited until this function finishes. However,
* please note that other read access (via calls to the same
* {@link #evaluate()}) is possible.
*
* @return Result of evaluating the aggregated data
*/
protected abstract T doEvaluate();
/**
* Resets this aggregator.This function first locks {@link #dataLock} for
* writing then calls {@link #doReset()}, which allows subclasses to perform
* the actual resetting of the aggregator and then at the end it unlocks
* {@link #dataLock}.
*
* @see #doReset()
*/
public final void reset() {
dataLock.writeLock().lock();
try {
doReset();
} finally {
dataLock.writeLock().unlock();
}
}
/**
* Function provided to allow subclasses to perform the actual reset of the
* aggregator. This function is wrapped by {@link #reset()} so that access
* to data (via {@link #add(Object)} or {@link #evaluate()} or
* {@link #reset()}) is prohibited during this call, as a <b>write</b> lock
* is acquired prior to this function call to ensure this function is the
* only one which has access to the data.
*/
protected abstract void doReset();
/**
* Retrieves the size of the currently-stored data series. This function
* first locks {@link #dataLock} for reading then calls
* {@link #retrieveDataSize()}, which allows subclasses to compute the data
* series size and then at the end it unlocks {@link #dataLock}.
*
* @return Size of the current data series, which will be aggregated at the
* next call to {@link #evaluate()}
*/
public final int getDataSize() {
dataLock.readLock().lock();
try {
return retrieveDataSize();
} finally {
dataLock.readLock().unlock();
}
}
/**
* Function provided to allow subclasses to retrieve the actual size of the
* data series. This function is wrapped by {@link #getDataSize()} so that
* access to data (via {@link #add(Object)} or {@link #reset()}) is
* prohibited during this call, as a <b>read</b> lock is acquired prior to
* this function call. (However, calls to {@link #evaluate()} are allowed as
* that locks for reading too.)
*
* @return Size of the current data series. Zero means no data stored.
*/
protected abstract int retrieveDataSize();
/**
* Retrieves <b>an unmodifiable copy</b> of the {@link #timerListeners timer
* listeners}. Used for testing.
*
* @return <code>Collections.unmodifiableList(timerListeners)</code> if
* {@link #timerListeners} is <b>not</b> <code>null</code>, or
* <code>null</code> otherwise.
*/
final List<TimedAggregatorListener<T>> getTimerListeners() {
if (timerListeners == null) {
return null;
}
return Collections.unmodifiableList(timerListeners);
}
/**
* If this <code>Aggregator</code> has been started with timer support, it
* will add the given listener, so it receives
* {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object)
* timer events}. If no timer support has been configured for this
* Aggregator, this call has no effect.
*
* @param listener
* Listener to be added to received timer events from this
* aggregator.
* @see #timerListeners
*/
public final void addTimerListener(TimedAggregatorListener<T> listener) {
if (timerListeners == null) {
return;
}
timerListeners.add(listener);
}
/**
* Removes a listener from the timer listeners list if previously added. If
* this Aggregator has been configured with no timer support, this call will
* always return <code>false</code>.
*
* @param listener
* Listener to be removed from the list. NullPointerException
* thrown if this is null.
* @return <code>true</code> if this Aggregator has timer support and the
* listener passed in was previously added (via
* {@link #addTimerListener(TimedAggregatorListener)}) or false if
* either the Aggregator has no timer support or it has timer
* support but the listener was never registered with this
* Aggregator.
* @see #timerListeners
*/
public final boolean removeTimerListener(TimedAggregatorListener<T> listener) {
if (timerListeners == null) {
return false;
}
return timerListeners.remove(listener);
}
/**
* Computes the current aggregated value (by calling {@link #evaluate()},
* resets this aggregator then notifies all listeners. Go through all the
* {@link #timerListeners} and sends
* {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object)
* notification messages} to each of them. Does nothing if
* {@link #timerListeners} is <code>null</code>. Please note that
* {@link #evaluate()} is called only once at the beginning of this
* function, and only if there are listeners configured, then this value is
* passed to every notification. This is in order to ensure all listeners
* receive the same value -- the value of the evaluation prior to resetting
* it.
*/
private void timer() {
if (timerListeners != null) {
// if we have listeners, notify them
T aggregated = evaluate(); // NOTE: shouldn't evaluate() and reset()
// be done atomically here?
reset();
for (TimedAggregatorListener<T> i : timerListeners) {
i.onTimer(this, aggregated);
}
} else {
reset();
}
}
/**
* Checks whether this instance has a timer associated with it or not. If
* there is a timer for this Aggregator, then the {@link #task} member
* should be set to a non-null value.
*
* @return <code>true</code> if {@link #task} is not null,
* <code>false</code> otherwise (in which case there is no timer).
*/
public final boolean isTimerEnabled() {
return (task != null);
}
/**
* Checks whether this instance uses its own timer or {@link #MAIN_TIMER the
* shared timer} for scheduling {@link #task the timer task}.
*
* @return <code>true</code> if <code>timer == MAIN_TIMER</code> or
* <code>false</code> otherwise.
*/
public final boolean isSharedTimer() {
return (timer == MAIN_TIMER);
}
/**
* Cancels the current timer task (if set) -- which means from there on the
* data will not be reset anymore. Also, if {@link #timer} is not set to
* {@link #MAIN_TIMER the shared timer} then it will be cancelled as well
* Also releases all the listeners from the {@link #timerListeners list}.
*/
public final void stop() {
// cancel the task first
if (task != null) {
task.cancel();
task = null;
timer.purge(); // remove the reference to this task
}
// then the timer if needed
if (timer != null && timer != MAIN_TIMER) {
timer.cancel();
}
timer = null;
// finally remove the elements from the listeners list
if (timerListeners != null) {
timerListeners.clear();
}
}
@Override
protected final void finalize() throws Throwable {
// if we're going in the garbage, make sure we clean up
stop();
}
@Override
public String toString() {
return AbstractTimedAggregator.class.getName();
}
}