blob: b3abe0e7f235276f12e137fbde5d158e24054fd9 [file] [log] [blame]
package backtype.storm.utils;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
public class Time {
public static Logger LOG = Logger.getLogger(Time.class);
private static AtomicBoolean simulating = new AtomicBoolean(false);
//TODO: should probably use weak references here or something
private static Map<Thread, AtomicLong> threadSleepTimes;
private static final Object sleepTimesLock = new Object();
private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing?
public static void startSimulating() {
simulating.set(true);
simulatedCurrTimeMs = new AtomicLong(0);
threadSleepTimes = new ConcurrentHashMap<Thread, AtomicLong>();
}
public static void stopSimulating() {
simulating.set(false);
threadSleepTimes = null;
}
public static boolean isSimulating() {
return simulating.get();
}
public static void sleepUntil(long targetTimeMs) throws InterruptedException {
if(simulating.get()) {
synchronized(sleepTimesLock) {
threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs));
}
while(simulatedCurrTimeMs.get() < targetTimeMs) {
Thread.sleep(10);
}
synchronized(sleepTimesLock) {
threadSleepTimes.remove(Thread.currentThread());
}
} else {
long sleepTime = targetTimeMs-currentTimeMillis();
if(sleepTime>0)
Thread.sleep(sleepTime);
}
}
public static void sleep(long ms) throws InterruptedException {
sleepUntil(currentTimeMillis()+ms);
}
public static long currentTimeMillis() {
if(simulating.get()) {
return simulatedCurrTimeMs.get();
} else {
return System.currentTimeMillis();
}
}
public static void advanceTime(long ms) {
if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms);
}
public static boolean isThreadWaiting(Thread t) {
if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
AtomicLong time;
synchronized(sleepTimesLock) {
time = threadSleepTimes.get(t);
}
return time!=null && currentTimeMillis() < time.longValue();
}
}