blob: 7eddf439a1adc19b34cbc6a6bea95b64843de735 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal;
import java.lang.ref.WeakReference;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Instances of this class are like {@link Timer}, but are associated with a DistributedSystem,
* which can be
* cancelled as a group with {@link #cancelTimers(DistributedSystem)}.
*
* @see Timer
* @see TimerTask
*
*/
public class SystemTimer {
private static final Logger logger = LogService.getLogger();
private static final boolean isIBM =
"IBM Corporation".equals(System.getProperty("java.vm.vendor"));
/**
* the underlying {@link Timer}
*/
private final Timer timer;
/**
* True if this timer has been cancelled
*/
private volatile boolean cancelled = false;
/**
* the DistributedSystem to which this timer belongs
*/
private final DistributedSystem distributedSystem;
@Override
public String toString() {
return "SystemTimer["
+ "system = " + distributedSystem
+ "]";
}
/**
* Map of all of the timers in the system
*/
@MakeNotStatic
private static final HashMap<DistributedSystem, Set<WeakReference<SystemTimer>>> distributedSystemTimers =
new HashMap<>();
/**
* Add the given timer is in the given DistributedSystem. Used only by constructors.
*
* @param system DistributedSystem to add the timer to
* @param systemTimer timer to add
*/
private static void addTimer(DistributedSystem system, SystemTimer systemTimer) {
Set<WeakReference<SystemTimer>> timers;
synchronized (distributedSystemTimers) {
timers = distributedSystemTimers.get(system);
if (timers == null) {
timers = new HashSet<>();
distributedSystemTimers.put(system, timers);
}
}
WeakReference<SystemTimer> wr = new WeakReference<>(systemTimer);
synchronized (timers) {
timers.add(wr);
}
}
/**
* Return the current number of DistributedSystems with timers
*/
public static int distributedSystemCount() {
synchronized (distributedSystemTimers) {
return distributedSystemTimers.size();
}
}
/**
* time that the last sweep was done
*
* @see #sweepAllTimers
*/
@MakeNotStatic
private static long lastSweepAllTime = 0;
/**
* Interval, in milliseconds, to sweep all timers, measured from when the last sweep finished
*
* @see #sweepAllTimers
*/
private static final long SWEEP_ALL_INTERVAL = 2 * 60 * 1000; // 2 minutes
/**
* Manually garbage collect {@link #distributedSystemTimers}, if it hasn't happened in a while.
*
* @see #lastSweepAllTime
*/
private static void sweepAllTimers() {
if (System.currentTimeMillis() < lastSweepAllTime + SWEEP_ALL_INTERVAL) {
// Too soon.
return;
}
final boolean isDebugEnabled = logger.isTraceEnabled();
synchronized (distributedSystemTimers) {
Iterator<Map.Entry<DistributedSystem, Set<WeakReference<SystemTimer>>>> allSystemsIterator =
distributedSystemTimers.entrySet().iterator();
while (allSystemsIterator.hasNext()) {
Map.Entry<DistributedSystem, Set<WeakReference<SystemTimer>>> entry =
allSystemsIterator.next();
Set<WeakReference<SystemTimer>> timers = entry.getValue();
synchronized (timers) {
Iterator<WeakReference<SystemTimer>> timersIterator = timers.iterator();
while (timersIterator.hasNext()) {
WeakReference<SystemTimer> wr = timersIterator.next();
SystemTimer st = wr.get();
if (st == null || st.isCancelled()) {
timersIterator.remove();
}
}
if (timers.size() == 0) {
allSystemsIterator.remove();
}
}
}
}
// Collect time at END of sweep. It means an extra call to the system
// timer, but makes this potentially less active.
lastSweepAllTime = System.currentTimeMillis();
}
/**
* Remove given timer.
*
* @param timerToRemove timer to remove
*
* @see #cancel()
*/
private static void removeTimer(SystemTimer timerToRemove) {
synchronized (distributedSystemTimers) {
// Get the timers for the distributed system
Set<WeakReference<SystemTimer>> timers =
distributedSystemTimers.get(timerToRemove.distributedSystem);
if (timers == null) {
return; // already gone
}
synchronized (timers) {
Iterator<WeakReference<SystemTimer>> timersIterator = timers.iterator();
while (timersIterator.hasNext()) {
WeakReference<SystemTimer> ref = timersIterator.next();
SystemTimer timer = ref.get();
if (timer == null) {
timersIterator.remove();
} else if (timer == timerToRemove) {
timersIterator.remove();
break;
} else if (timer.isCancelled()) {
timersIterator.remove();
}
}
if (timers.size() == 0) {
distributedSystemTimers.remove(timerToRemove.distributedSystem); // last reference
}
}
}
sweepAllTimers(); // Occasionally check global list
}
/**
* Cancel all outstanding timers
*
* @param system the DistributedSystem whose timers should be cancelled
*/
public static void cancelTimers(DistributedSystem system) {
Set<WeakReference<SystemTimer>> timers;
synchronized (distributedSystemTimers) {
timers = distributedSystemTimers.get(system);
if (timers == null) {
return; // already cancelled
}
// Remove before releasing synchronization, so any fresh timer ends up
// in a new set with same key
distributedSystemTimers.remove(system);
} // synchronized
// cancel all of the timers
synchronized (timers) {
for (WeakReference<SystemTimer> wr : timers) {
SystemTimer st = wr.get();
// it.remove(); Not necessary, we're emptying the list...
if (st != null) {
st.cancelled = true; // for safety :-)
st.timer.cancel(); // st.cancel() would just search for it again
}
}
}
}
public int timerPurge() {
// Fix 39585, IBM's java.util.timer's purge() has stack overflow issue
if (isIBM) {
return 0;
}
return this.timer.purge();
}
/**
* @see Timer#Timer(boolean)
* @param distributedSystem the DistributedSystem to which this timer belongs
*/
public SystemTimer(DistributedSystem distributedSystem) {
this.timer = new Timer(true);
this.distributedSystem = distributedSystem;
addTimer(distributedSystem, this);
}
private void checkCancelled() throws IllegalStateException {
if (this.cancelled) {
throw new IllegalStateException("This timer has been cancelled.");
}
}
/**
* @see Timer#schedule(TimerTask, long)
*/
public void schedule(SystemTimerTask task, long delay) {
checkCancelled();
timer.schedule(task, delay);
}
/**
* @see Timer#schedule(TimerTask, Date)
*/
public void schedule(SystemTimerTask task, Date time) {
checkCancelled();
timer.schedule(task, time);
}
/**
* @see Timer#scheduleAtFixedRate(TimerTask, long, long)
*/
public void scheduleAtFixedRate(SystemTimerTask task, long delay, long period) {
checkCancelled();
timer.scheduleAtFixedRate(task, delay, period);
}
/**
* @see Timer#schedule(TimerTask, long, long)
*/
public void schedule(SystemTimerTask task, long delay, long period) {
checkCancelled();
timer.schedule(task, delay, period);
}
/**
* @see Timer#cancel()
*/
public void cancel() {
this.cancelled = true;
timer.cancel();
removeTimer(this);
}
/**
* has this timer been cancelled?
*/
public boolean isCancelled() {
return cancelled;
}
/**
* Cover class to track behavior of scheduled tasks
*
* @see TimerTask
*/
public abstract static class SystemTimerTask extends TimerTask {
protected static final Logger logger = LogService.getLogger();
private volatile boolean cancelled;
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean cancel() {
cancelled = true;
return super.cancel();
}
/**
* This is your executed action
*/
public abstract void run2();
/**
* Does debug logging, catches critical errors, then delegates to {@link #run2()}
*/
@Override
public void run() {
try {
this.run2();
} catch (CancelException ignore) {
// ignore: TimerThreads can fire during or near cache closure
} catch (Throwable t) {
logger.warn(String.format("Timer task <%s> encountered exception", this), t);
// Don't rethrow, it will just get eaten and kill the timer
}
}
}
}