| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm; |
| |
| import java.nio.channels.ClosedByInterruptException; |
| import java.util.Comparator; |
| import java.util.Random; |
| import java.util.concurrent.PriorityBlockingQueue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.storm.utils.Time; |
| import org.apache.storm.utils.Utils; |
| |
| /** |
| * The timer defined in this file is very similar to java.util.Timer, except it integrates with Storm's time simulation capabilities. This |
| * lets us test code that does asynchronous work on the timer thread. |
| */ |
| |
| public class StormTimer implements AutoCloseable { |
| |
| //task to run |
| private StormTimerTask task = new StormTimerTask(); |
| |
| /** |
| * Makes a Timer in the form of a StormTimerTask Object. |
| * |
| * @param name name of the timer |
| * @param onKill function to call when timer is killed unexpectedly |
| */ |
| public StormTimer(String name, Thread.UncaughtExceptionHandler onKill) { |
| if (onKill == null) { |
| throw new RuntimeException("onKill func is null!"); |
| } |
| if (name == null) { |
| this.task.setName("timer"); |
| } else { |
| this.task.setName(name); |
| } |
| this.task.setOnKillFunc(onKill); |
| this.task.setActive(true); |
| |
| this.task.setDaemon(true); |
| this.task.setPriority(Thread.MAX_PRIORITY); |
| this.task.start(); |
| } |
| |
| /** |
| * Schedule a function to be executed in the timer. |
| * |
| * @param delaySecs the number of seconds to delay before running the function |
| * @param func the function to run |
| * @param checkActive whether to check is the timer is active |
| * @param jitterMs add jitter to the run |
| */ |
| public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) { |
| scheduleMs(Time.secsToMillisLong(delaySecs), func, checkActive, jitterMs); |
| } |
| |
| public void schedule(int delaySecs, Runnable func) { |
| schedule(delaySecs, func, true, 0); |
| } |
| |
| /** |
| * Same as schedule with millisecond resolution. |
| * |
| * @param delayMs the number of milliseconds to delay before running the function |
| * @param func the function to run |
| * @param checkActive whether to check is the timer is active |
| * @param jitterMs add jitter to the run |
| */ |
| public void scheduleMs(long delayMs, Runnable func, boolean checkActive, int jitterMs) { |
| if (func == null) { |
| throw new RuntimeException("function to schedule is null!"); |
| } |
| if (checkActive) { |
| checkActive(); |
| } |
| String id = Utils.uuid(); |
| long endTimeMs = Time.currentTimeMillis() + delayMs; |
| if (jitterMs > 0) { |
| endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs; |
| } |
| task.add(new QueueEntry(endTimeMs, func, id)); |
| } |
| |
| public void scheduleMs(long delayMs, Runnable func) { |
| scheduleMs(delayMs, func, true, 0); |
| } |
| |
| /** |
| * Schedule a function to run recurrently. |
| * |
| * @param delaySecs the number of seconds to delay before running the function |
| * @param recurSecs the time between each invocation |
| * @param func the function to run |
| */ |
| public void scheduleRecurring(int delaySecs, final int recurSecs, final Runnable func) { |
| schedule(delaySecs, new Runnable() { |
| @Override |
| public void run() { |
| func.run(); |
| // This avoids a race condition with cancel-timer. |
| schedule(recurSecs, this, false, 0); |
| } |
| }); |
| } |
| |
| /** |
| * Schedule a function to run recurrently. |
| * |
| * @param delayMs the number of millis to delay before running the function |
| * @param recurMs the time between each invocation |
| * @param func the function to run |
| */ |
| public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) { |
| scheduleMs(delayMs, new Runnable() { |
| @Override |
| public void run() { |
| func.run(); |
| // This avoids a race condition with cancel-timer. |
| scheduleMs(recurMs, this, true, 0); |
| } |
| }); |
| } |
| |
| /** |
| * Schedule a function to run recurrently with jitter. |
| * |
| * @param delaySecs the number of seconds to delay before running the function |
| * @param recurSecs the time between each invocation |
| * @param jitterMs jitter added to the run |
| * @param func the function to run |
| */ |
| public void scheduleRecurringWithJitter(int delaySecs, final int recurSecs, final int jitterMs, final Runnable func) { |
| schedule(delaySecs, new Runnable() { |
| @Override |
| public void run() { |
| func.run(); |
| // This avoids a race condition with cancel-timer. |
| schedule(recurSecs, this, false, jitterMs); |
| } |
| }); |
| } |
| |
| /** |
| * check if timer is active. |
| */ |
| private void checkActive() { |
| if (!this.task.isActive()) { |
| throw new IllegalStateException("Timer is not active"); |
| } |
| } |
| |
| /** |
| * cancel timer. |
| */ |
| |
| @Override |
| public void close() throws InterruptedException { |
| if (this.task.isActive()) { |
| this.task.setActive(false); |
| this.task.interrupt(); |
| this.task.join(); |
| } |
| } |
| |
| /** |
| * is timer waiting. Used in timer simulation. |
| */ |
| public boolean isTimerWaiting() { |
| return Time.isThreadWaiting(task); |
| } |
| |
| public static class QueueEntry { |
| public final Long endTimeMs; |
| public final Runnable func; |
| public final String id; |
| |
| public QueueEntry(Long endTimeMs, Runnable func, String id) { |
| this.endTimeMs = endTimeMs; |
| this.func = func; |
| this.id = id; |
| } |
| } |
| |
| public static class StormTimerTask extends Thread { |
| |
| //initialCapacity set to 11 since its the default inital capacity of PriorityBlockingQueue |
| private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(11, new Comparator<QueueEntry>() { |
| @Override |
| public int compare(QueueEntry o1, QueueEntry o2) { |
| return o1.endTimeMs.intValue() - o2.endTimeMs.intValue(); |
| } |
| }); |
| |
| // boolean to indicate whether timer is active |
| private AtomicBoolean active = new AtomicBoolean(false); |
| |
| // function to call when timer is killed |
| private Thread.UncaughtExceptionHandler onKill; |
| |
| //random number generator |
| private Random random = new Random(); |
| |
| @Override |
| public void run() { |
| while (this.active.get()) { |
| QueueEntry queueEntry = null; |
| try { |
| queueEntry = this.queue.peek(); |
| if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) { |
| // It is imperative to not run the function |
| // inside the timer lock. Otherwise, it is |
| // possible to deadlock if the fn deals with |
| // other locks, like the submit lock. |
| this.queue.remove(queueEntry); |
| queueEntry.func.run(); |
| } else if (queueEntry != null) { |
| // If any events are scheduled, sleep until |
| // event generation. If any recurring events |
| // are scheduled then we will always go |
| // through this branch, sleeping only the |
| // exact necessary amount of time. We give |
| // an upper bound, e.g. 1000 millis, to the |
| // sleeping time, to limit the response time |
| // for detecting any new event within 1 secs. |
| Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis()))); |
| } else { |
| // Otherwise poll to see if any new event |
| // was scheduled. This is, in essence, the |
| // response time for detecting any new event |
| // schedulings when there are no scheduled |
| // events. |
| Time.sleep(1000); |
| } |
| if (Thread.interrupted()) { |
| this.active.set(false); |
| } |
| } catch (Throwable e) { |
| if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) |
| && !(Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, e))) { |
| // need to set active false before calling onKill() - current implementation does not return. |
| this.setActive(false); |
| this.onKill.uncaughtException(this, e); |
| } |
| } |
| } |
| } |
| |
| public void setOnKillFunc(Thread.UncaughtExceptionHandler onKill) { |
| this.onKill = onKill; |
| } |
| |
| public boolean isActive() { |
| return this.active.get(); |
| } |
| |
| public void setActive(boolean flag) { |
| this.active.set(flag); |
| } |
| |
| public void add(QueueEntry queueEntry) { |
| this.queue.add(queueEntry); |
| } |
| } |
| } |