blob: a68384b12c0bc33302d5354615f04ab4aa670d19 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm;
import java.nio.channels.ClosedByInterruptException;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
/**
* The timer defined in this file is very similar to java.util.Timer, except it integrates with Storm's time simulation capabilities. This
* lets us test code that does asynchronous work on the timer thread.
*/
public class StormTimer implements AutoCloseable {
//task to run
private StormTimerTask task = new StormTimerTask();
/**
* Makes a Timer in the form of a StormTimerTask Object.
*
* @param name name of the timer
* @param onKill function to call when timer is killed unexpectedly
*/
public StormTimer(String name, Thread.UncaughtExceptionHandler onKill) {
if (onKill == null) {
throw new RuntimeException("onKill func is null!");
}
if (name == null) {
this.task.setName("timer");
} else {
this.task.setName(name);
}
this.task.setOnKillFunc(onKill);
this.task.setActive(true);
this.task.setDaemon(true);
this.task.setPriority(Thread.MAX_PRIORITY);
this.task.start();
}
/**
* Schedule a function to be executed in the timer.
*
* @param delaySecs the number of seconds to delay before running the function
* @param func the function to run
* @param checkActive whether to check is the timer is active
* @param jitterMs add jitter to the run
*/
public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) {
scheduleMs(Time.secsToMillisLong(delaySecs), func, checkActive, jitterMs);
}
public void schedule(int delaySecs, Runnable func) {
schedule(delaySecs, func, true, 0);
}
/**
* Same as schedule with millisecond resolution.
*
* @param delayMs the number of milliseconds to delay before running the function
* @param func the function to run
* @param checkActive whether to check is the timer is active
* @param jitterMs add jitter to the run
*/
public void scheduleMs(long delayMs, Runnable func, boolean checkActive, int jitterMs) {
if (func == null) {
throw new RuntimeException("function to schedule is null!");
}
if (checkActive) {
checkActive();
}
String id = Utils.uuid();
long endTimeMs = Time.currentTimeMillis() + delayMs;
if (jitterMs > 0) {
endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs;
}
task.add(new QueueEntry(endTimeMs, func, id));
}
public void scheduleMs(long delayMs, Runnable func) {
scheduleMs(delayMs, func, true, 0);
}
/**
* Schedule a function to run recurrently.
*
* @param delaySecs the number of seconds to delay before running the function
* @param recurSecs the time between each invocation
* @param func the function to run
*/
public void scheduleRecurring(int delaySecs, final int recurSecs, final Runnable func) {
schedule(delaySecs, new Runnable() {
@Override
public void run() {
func.run();
// This avoids a race condition with cancel-timer.
schedule(recurSecs, this, false, 0);
}
});
}
/**
* Schedule a function to run recurrently.
*
* @param delayMs the number of millis to delay before running the function
* @param recurMs the time between each invocation
* @param func the function to run
*/
public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) {
scheduleMs(delayMs, new Runnable() {
@Override
public void run() {
func.run();
// This avoids a race condition with cancel-timer.
scheduleMs(recurMs, this, true, 0);
}
});
}
/**
* Schedule a function to run recurrently with jitter.
*
* @param delaySecs the number of seconds to delay before running the function
* @param recurSecs the time between each invocation
* @param jitterMs jitter added to the run
* @param func the function to run
*/
public void scheduleRecurringWithJitter(int delaySecs, final int recurSecs, final int jitterMs, final Runnable func) {
schedule(delaySecs, new Runnable() {
@Override
public void run() {
func.run();
// This avoids a race condition with cancel-timer.
schedule(recurSecs, this, false, jitterMs);
}
});
}
/**
* check if timer is active.
*/
private void checkActive() {
if (!this.task.isActive()) {
throw new IllegalStateException("Timer is not active");
}
}
/**
* cancel timer.
*/
@Override
public void close() throws InterruptedException {
if (this.task.isActive()) {
this.task.setActive(false);
this.task.interrupt();
this.task.join();
}
}
/**
* is timer waiting. Used in timer simulation.
*/
public boolean isTimerWaiting() {
return Time.isThreadWaiting(task);
}
public static class QueueEntry {
public final Long endTimeMs;
public final Runnable func;
public final String id;
public QueueEntry(Long endTimeMs, Runnable func, String id) {
this.endTimeMs = endTimeMs;
this.func = func;
this.id = id;
}
}
public static class StormTimerTask extends Thread {
//initialCapacity set to 11 since its the default inital capacity of PriorityBlockingQueue
private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(11, new Comparator<QueueEntry>() {
@Override
public int compare(QueueEntry o1, QueueEntry o2) {
return o1.endTimeMs.intValue() - o2.endTimeMs.intValue();
}
});
// boolean to indicate whether timer is active
private AtomicBoolean active = new AtomicBoolean(false);
// function to call when timer is killed
private Thread.UncaughtExceptionHandler onKill;
//random number generator
private Random random = new Random();
@Override
public void run() {
while (this.active.get()) {
QueueEntry queueEntry = null;
try {
queueEntry = this.queue.peek();
if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
// It is imperative to not run the function
// inside the timer lock. Otherwise, it is
// possible to deadlock if the fn deals with
// other locks, like the submit lock.
this.queue.remove(queueEntry);
queueEntry.func.run();
} else if (queueEntry != null) {
// If any events are scheduled, sleep until
// event generation. If any recurring events
// are scheduled then we will always go
// through this branch, sleeping only the
// exact necessary amount of time. We give
// an upper bound, e.g. 1000 millis, to the
// sleeping time, to limit the response time
// for detecting any new event within 1 secs.
Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis())));
} else {
// Otherwise poll to see if any new event
// was scheduled. This is, in essence, the
// response time for detecting any new event
// schedulings when there are no scheduled
// events.
Time.sleep(1000);
}
if (Thread.interrupted()) {
this.active.set(false);
}
} catch (Throwable e) {
if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))
&& !(Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, e))) {
// need to set active false before calling onKill() - current implementation does not return.
this.setActive(false);
this.onKill.uncaughtException(this, e);
}
}
}
}
public void setOnKillFunc(Thread.UncaughtExceptionHandler onKill) {
this.onKill = onKill;
}
public boolean isActive() {
return this.active.get();
}
public void setActive(boolean flag) {
this.active.set(flag);
}
public void add(QueueEntry queueEntry) {
this.queue.add(queueEntry);
}
}
}