[REEF-1527] Refactor RuntimeClock and implement graceful shutdown correctly.
* Refactor RuntimeClock for readability and add more Javadoc comments;
* Fix the bug in RuntimeClock.close() that clears all scheduled events without
processing them;
* Clean up and refactor code for Time and derived classes, add Javadocs;
* Improve logging in RuntimeClock and around;
* Cosmetic refactoring of RuntimeClock unit tests (new unit tests will be in a
separate pull request)
* Avoid loops over entire schedule in .close() and .isIdle() methods to find
client alarms.
JIRA:
[REEF-1527](https://issues.apache.org/jira/browse/REEF-1527) close
Pull Request:
This closes #1096
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index c0d1ef5..b0c8d41 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -618,7 +618,7 @@
LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ",
driverRestartCompleted.getCompletedTime());
try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
- driverRestartCompleted.getCompletedTime().getTimeStamp())) {
+ driverRestartCompleted.getCompletedTime().getTimestamp())) {
if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) {
LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
@@ -639,7 +639,7 @@
@Override
public void onNext(final StopTime time) {
LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
- try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) {
+ try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimestamp())) {
for (final ActiveContext context : contexts.values()) {
context.close();
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
old mode 100755
new mode 100644
index dfe693f..3ad5fe4
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
@@ -76,6 +76,7 @@
LoggingSetup.setupCommonsLogging();
}
+ /** Config parameter to turn on network IO profiling in Wake. */
private final boolean isWakeProfilingEnabled;
/** REEF version - we need it simply to write it to the log. */
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
index 3fd6ffc..416a4f6 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
@@ -151,26 +151,26 @@
public static AvroRuntimeStart toAvroRuntimeStart(final RuntimeStart runtimeStart) {
return AvroRuntimeStart.newBuilder()
- .setTimestamp(runtimeStart.getTimeStamp())
+ .setTimestamp(runtimeStart.getTimestamp())
.build();
}
public static AvroStartTime toAvroStartTime(final StartTime startTime) {
return AvroStartTime.newBuilder()
- .setTimestamp(startTime.getTimeStamp())
+ .setTimestamp(startTime.getTimestamp())
.build();
}
public static AvroStopTime toAvroStopTime(final StopTime stopTime) {
return AvroStopTime.newBuilder()
- .setTimestamp(stopTime.getTimeStamp())
+ .setTimestamp(stopTime.getTimestamp())
.build();
}
public static AvroRuntimeStop toAvroRuntimeStop(final RuntimeStop runtimeStop) {
return AvroRuntimeStop.newBuilder()
.setException(convertThrowableToString(runtimeStop.getException()))
- .setTimestamp(runtimeStop.getTimeStamp())
+ .setTimestamp(runtimeStop.getTimestamp())
.build();
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
index 44e90b5..0150962 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
@@ -18,53 +18,63 @@
*/
package org.apache.reef.wake.time;
+import java.util.Date;
+
/**
- * Time object.
+ * An abstract object that has a timestamp.
+ * That allows us to compare and order objects by the time.
*/
public abstract class Time implements Comparable<Time> {
private final long timestamp;
+ /**
+ * Initialize the internal timestamp. Timestamp remains constant
+ * for the entire lifecycle of the object.
+ * @param timestamp timestamp in milliseconds since the beginning
+ * of the epoch (01/01/1970).
+ */
public Time(final long timestamp) {
this.timestamp = timestamp;
}
+ /**
+ * Get timestamp in milliseconds since the beginning of the epoch (01/01/1970).
+ * @return Object's timestamp in milliseconds since the start of the epoch.
+ */
+ public final long getTimestamp() {
+ return this.timestamp;
+ }
+
+ /**
+ * Get timestamp in milliseconds since the beginning of the epoch (01/01/1970).
+ * @return Object's timestamp in milliseconds since the start of the epoch.
+ * @deprecated [REEF-1532] Prefer using getTimestamp() instead.
+ * Remove after release 0.16.
+ */
public final long getTimeStamp() {
return this.timestamp;
}
@Override
- public final String toString() {
- return this.getClass().getName() + "[" + this.timestamp + "]";
+ public String toString() {
+ return this.getClass().getName()
+ + ":[" + this.timestamp + '|' + new Date(this.timestamp) + ']';
}
@Override
- public final int compareTo(final Time o) {
- if (this.timestamp < o.timestamp) {
- return -1;
- }
- if (this.timestamp > o.timestamp) {
- return 1;
- }
- if (this.hashCode() < o.hashCode()) {
- return -1;
- }
- if (this.hashCode() > o.hashCode()) {
- return 1;
- }
- return 0;
+ public int compareTo(final Time other) {
+ final int cmp = Long.compare(this.timestamp, other.timestamp);
+ return cmp != 0 ? cmp : Integer.compare(this.hashCode(), other.hashCode());
}
@Override
- public final boolean equals(final Object o) {
- if (o instanceof Time) {
- return compareTo((Time) o) == 0;
- }
- return false;
+ public boolean equals(final Object other) {
+ return other instanceof Time && compareTo((Time) other) == 0;
}
@Override
- public final int hashCode() {
+ public int hashCode() {
return super.hashCode();
}
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java
index fb278a4..ac1ed6d 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java
@@ -22,9 +22,10 @@
import org.apache.reef.wake.time.Time;
/**
- * Represents a timer event.
+ * An alarm is a timer event to be invoked at a given time.
+ * Contains a (future) timestamp and the event handler to invoke.
*/
-public abstract class Alarm extends Time {
+public abstract class Alarm extends Time implements Runnable {
private final EventHandler<Alarm> handler;
@@ -33,8 +34,11 @@
this.handler = handler;
}
- public final void handle() {
+ /**
+ * Invoke the event handler and pass a reference to self as a parameter.
+ */
+ @Override
+ public final void run() {
this.handler.onNext(this);
}
-
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
index 72d558e..1ae2ed8 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
@@ -18,36 +18,101 @@
*/
package org.apache.reef.wake.time.runtime;
+import org.apache.reef.wake.time.Time;
+
import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * Logical timer.
+ * Logical timer that is only bound to the timestamps of the events tracked against it.
+ * In such setting, all events occur immediately, i.e. isReady() always return true,
+ * and the duration of the delay to the next event is always 0.
+ * Current time for this timer is always the timestamp of the tracked event that is
+ * the most distant in the future.
*/
public final class LogicalTimer implements Timer {
- private long current = 0;
+ /**
+ * Current time in milliseconds since the beginning of the epoch (01/01/1970),
+ * according to the timer. For this implementation, always keep the largest seen
+ * timestamp (i.e. track the event that is the most distant into the future).
+ */
+ private final AtomicLong current = new AtomicLong(0);
+ /**
+ * Instances of the timer should only be created automatically by Tang.
+ */
@Inject
- LogicalTimer() {
+ private LogicalTimer() {
}
+ /**
+ * Get current time in milliseconds since the beginning of the epoch (01/01/1970).
+ * This timer implementation always returns the timestamp of the most distant
+ * future event ever checked against this timer in getDuration() or isReady() methods.
+ * Return 0 if there were no calls yet to getDuration() or isReady().
+ * @return Timestamp of the latest event (in milliseconds since the start of the epoch).
+ */
@Override
public long getCurrent() {
- return this.current;
+ return this.current.get();
}
+ /**
+ * Get the number of milliseconds between current time as tracked by the Timer implementation
+ * and a given event. This implementation always returns 0 and updates current timer's time
+ * to the timestamp of the most distant future event.
+ * @param time Timestamp in milliseconds.
+ * @return Always returns 0.
+ * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+ * Remove after release 0.16.
+ */
@Override
public long getDuration(final long time) {
- isReady(time);
+ this.isReady(time);
return 0;
}
+ /**
+ * Get the number of milliseconds between current time as tracked by the Timer implementation
+ * and a given event. This implementation always returns 0 and updates current timer's time
+ * to the timestamp of the most distant future event.
+ * @param time Timestamp object that wraps time in milliseconds.
+ * @return Always returns 0.
+ */
@Override
- public boolean isReady(final long time) {
- if (this.current < time) {
- this.current = time;
- }
- return true;
+ public long getDuration(final Time time) {
+ return this.getDuration(time.getTimestamp());
}
+ /**
+ * Check if the event with a given timestamp has occurred, according to the timer.
+ * This implementation always returns true and updates current timer's time to the timestamp
+ * of the most distant future event.
+ * @param time Timestamp in milliseconds.
+ * @return Always returns true.
+ * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+ * Remove after release 0.16.
+ */
+ @Override
+ public boolean isReady(final long time) {
+ while (true) {
+ final long thisTs = this.current.get();
+ if (thisTs >= time || this.current.compareAndSet(thisTs, time)) {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Check if the event with a given timestamp has occurred, according to the timer.
+ * This implementation always returns true and updates current timer's time to the timestamp
+ * of the most distant future event.
+ * @param time Timestamp object that wraps time in milliseconds.
+ * @return Always returns true.
+ */
+ @Override
+ public boolean isReady(final Time time) {
+ return this.isReady(time.getTimestamp());
+ }
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java
index 09ab817..402644a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java
@@ -18,29 +18,78 @@
*/
package org.apache.reef.wake.time.runtime;
+import org.apache.reef.wake.time.Time;
+
import javax.inject.Inject;
/**
- * A system-time based Timer.
+ * Implementation of the Timer that uses the system clock.
*/
public final class RealTimer implements Timer {
+ /**
+ * Instances of the timer should only be created automatically by Tang.
+ */
@Inject
- public RealTimer() {
+ private RealTimer() {
}
+ /**
+ * Get current time in milliseconds since the beginning of the epoch (01/01/1970).
+ * @return Current system time in milliseconds since the start of the epoch.
+ */
@Override
public long getCurrent() {
return System.currentTimeMillis();
}
+ /**
+ * Get the number of milliseconds from current system time to the given event.
+ * Can return a negative number if the event is already in the past.
+ * @param time Timestamp in milliseconds.
+ * @return Difference in milliseconds between the given timestamp and current system time.
+ * The result is a negative number if the timestamp is in the past.
+ * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+ * Remove after release 0.16.
+ */
@Override
public long getDuration(final long time) {
return time - getCurrent();
}
+ /**
+ * Get the number of milliseconds from current system time to the given event.
+ * Can return a negative number if the event is already in the past.
+ * @param time Timestamp object that wraps time in milliseconds.
+ * @return Difference in milliseconds between the given timestamp and current system time.
+ * The result is a negative number if the timestamp is in the past.
+ */
+ @Override
+ public long getDuration(final Time time) {
+ return time.getTimestamp() - getCurrent();
+ }
+
+ /**
+ * Check if the event with a given timestamp has occurred. Return true if the timestamp
+ * equals or less than the current system time, and false if it is still in the future.
+ * @param time Timestamp in milliseconds.
+ * @return False if the given timestamp is still in the future.
+ * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+ * Remove after release 0.16.
+ */
@Override
public boolean isReady(final long time) {
return getDuration(time) <= 0;
}
+
+ /**
+ * Check if the event with a given timestamp has occurred. Return true if the timestamp
+ * equals or less than the current system time, and false if it is still in the future.
+ * @param time Timestamp object that wraps time in milliseconds.
+ * @return False if the given timestamp is still in the future.
+ */
+ @Override
+ public boolean isReady(final Time time) {
+ return getDuration(time) <= 0;
+ }
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
index a0ab566..43a25ac 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
@@ -44,13 +44,23 @@
*/
public final class RuntimeClock implements Clock {
- private static final Logger LOG = Logger.getLogger(Clock.class.toString());
+ private static final Logger LOG = Logger.getLogger(RuntimeClock.class.getName());
+ private static final String CLASS_NAME = RuntimeClock.class.getCanonicalName();
+ /**
+ * Injectable source of current time information.
+ * Usually an instance of RealTimer that wraps the system clock.
+ */
private final Timer timer;
- private final TreeSet<Time> schedule;
+ /**
+ * An ordered set of timed objects, in ascending order of their timestamps.
+ * It also serves as the main synchronization monitor for the class.
+ */
+ private final TreeSet<Time> schedule = new TreeSet<>();
- private final PubSubEventHandler<Time> handlers;
+ /** Event handlers - populated with the injectable parameters provided to the RuntimeClock constructor. */
+ private final PubSubEventHandler<Time> handlers = new PubSubEventHandler<>();
private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler;
private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler;
@@ -58,123 +68,171 @@
private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler;
private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler;
- private Throwable stoppedOnException;
- private boolean closed = false;
+ /**
+ * Timestamp of the last client alarm in the schedule.
+ * We use it to schedule a graceful shutdown event immediately after all client alarms.
+ */
+ private long lastClientAlarm = 0;
+
+ /**
+ * Number of client alarms in the schedule.
+ * We need it to determine whether event loop is idle (i.e. has no client alarms scheduled)
+ */
+ private int numClientAlarms = 0;
+
+ /** Set to true when the clock is closed. */
+ private boolean isClosed = false;
+
+ /** Exception that caused the clock to stop. */
+ private Throwable exceptionCausedStop = null;
@Inject
- RuntimeClock(final Timer timer,
- @Parameter(Clock.StartHandler.class) final InjectionFuture<Set<EventHandler<StartTime>>> startHandler,
- @Parameter(StopHandler.class) final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler,
- @Parameter(Clock.RuntimeStartHandler.class)
- final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler,
- @Parameter(Clock.RuntimeStopHandler.class)
- final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler,
- @Parameter(IdleHandler.class) final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) {
- this.timer = timer;
- this.schedule = new TreeSet<>();
- this.handlers = new PubSubEventHandler<>();
+ private RuntimeClock(
+ final Timer timer,
+ @Parameter(Clock.StartHandler.class)
+ final InjectionFuture<Set<EventHandler<StartTime>>> startHandler,
+ @Parameter(Clock.StopHandler.class)
+ final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler,
+ @Parameter(Clock.RuntimeStartHandler.class)
+ final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler,
+ @Parameter(Clock.RuntimeStopHandler.class)
+ final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler,
+ @Parameter(Clock.IdleHandler.class)
+ final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) {
+ this.timer = timer;
this.startHandler = startHandler;
this.stopHandler = stopHandler;
this.runtimeStartHandler = runtimeStartHandler;
this.runtimeStopHandler = runtimeStopHandler;
this.idleHandler = idleHandler;
- this.stoppedOnException = null;
-
LOG.log(Level.FINE, "RuntimeClock instantiated.");
}
+ /**
+ * Schedule a new Alarm event in `offset` milliseconds into the future,
+ * and supply an event handler to be called at that time.
+ * @param offset Number of milliseconds into the future relative to current time.
+ * @param handler Event handler to be invoked.
+ * @throws IllegalStateException if the clock is already closed.
+ */
@Override
public void scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
+
+ final Time alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler);
+
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "Schedule alarm: {0}", alarm);
+ }
+
synchronized (this.schedule) {
- if (this.closed) {
+
+ if (this.isClosed) {
throw new IllegalStateException("Scheduling alarm on a closed clock");
}
- this.schedule.add(new ClientAlarm(this.timer.getCurrent() + offset, handler));
+ if (alarm.getTimestamp() > this.lastClientAlarm) {
+ this.lastClientAlarm = alarm.getTimestamp();
+ }
+
+ assert this.numClientAlarms >= 0;
+ ++this.numClientAlarms;
+
+ this.schedule.add(alarm);
this.schedule.notifyAll();
}
}
- public void registerEventHandler(final Class<? extends Time> clazz, final EventHandler<Time> handler) {
- this.handlers.subscribe(clazz, handler);
- }
-
- public void scheduleRuntimeAlarm(final int offset, final EventHandler<Alarm> handler) {
- synchronized (this.schedule) {
- this.schedule.add(new RuntimeAlarm(this.timer.getCurrent() + offset, handler));
- this.schedule.notifyAll();
- }
- }
-
+ /**
+ * Stop the clock. Remove all other events from the schedule and fire StopTimer
+ * event immediately. It is recommended to use close() method for graceful shutdown
+ * instead of stop().
+ */
@Override
public void stop() {
this.stop(null);
}
+ /**
+ * Stop the clock on exception.
+ * Remove all other events from the schedule and fire StopTimer event immediately.
+ * @param exception Exception that is the cause for the stop. Can be null.
+ */
@Override
- public void stop(final Throwable stopOnException) {
- LOG.entering(RuntimeClock.class.getCanonicalName(), "stop");
- synchronized (this.schedule) {
- this.schedule.clear();
- this.schedule.add(new StopTime(timer.getCurrent()));
- this.schedule.notifyAll();
- this.closed = true;
- if (this.stoppedOnException == null) {
- this.stoppedOnException = stopOnException;
- }
- }
- LOG.exiting(RuntimeClock.class.getCanonicalName(), "stop");
- }
+ public void stop(final Throwable exception) {
- @Override
- public void close() {
- LOG.entering(RuntimeClock.class.getCanonicalName(), "close");
+ LOG.entering(CLASS_NAME, "stop");
+
synchronized (this.schedule) {
- if (this.closed) {
- LOG.log(Level.INFO, "Clock is already closed");
+
+ if (this.isClosed) {
+ LOG.log(Level.FINEST, "Clock has already been closed");
return;
}
+
+ this.isClosed = true;
+ this.exceptionCausedStop = exception;
+
+ final Time stopEvent = new StopTime(this.timer.getCurrent());
+ LOG.log(Level.FINE, "Stop scheduled immediately: {0}", stopEvent);
+
+ this.numClientAlarms = 0;
+ assert this.numClientAlarms >= 0;
+
this.schedule.clear();
- this.schedule.add(new StopTime(findAcceptableStopTime()));
+ this.schedule.add(stopEvent);
this.schedule.notifyAll();
- this.closed = true;
- LOG.log(Level.INFO, "Clock.close()");
}
- LOG.exiting(RuntimeClock.class.getCanonicalName(), "close");
+
+ LOG.exiting(CLASS_NAME, "stop");
}
/**
- * Finds an acceptable stop time, which is the
- * a time beyond that of any client alarm.
- *
- * @return an acceptable stop time
+ * Wait for all client alarms to finish executing and gracefully shutdown the clock.
*/
- private long findAcceptableStopTime() {
- long time = timer.getCurrent();
- for (final Time t : this.schedule) {
- if (t instanceof ClientAlarm) {
- assert time <= t.getTimeStamp();
- time = t.getTimeStamp();
+ @Override
+ public void close() {
+
+ LOG.entering(CLASS_NAME, "close");
+
+ synchronized (this.schedule) {
+
+ if (this.isClosed) {
+ LOG.log(Level.FINEST, "Clock has already been closed");
+ return;
}
+
+ this.isClosed = true;
+
+ final Time stopEvent = new StopTime(Math.max(this.timer.getCurrent(), this.lastClientAlarm + 1));
+ LOG.log(Level.FINE, "Graceful shutdown scheduled: {0}", stopEvent);
+
+ this.schedule.add(stopEvent);
+ this.schedule.notifyAll();
}
- return time + 1;
+
+ LOG.exiting(CLASS_NAME, "close");
}
-
+ /**
+ * Check if there are no client alarms scheduled.
+ * @return True if there are no client alarms in the schedule, false otherwise.
+ */
@Override
public boolean isIdle() {
synchronized (this.schedule) {
- for (final Time t : this.schedule) {
- if (t instanceof ClientAlarm) {
- return false;
- }
- }
- return true;
+ assert this.numClientAlarms >= 0;
+ return this.numClientAlarms == 0;
}
}
+ /**
+ * Register event handlers for the given event class.
+ * @param eventClass Event type to handle. Must be derived from Time.
+ * @param handlers One or many event handlers that can process given event type.
+ * @param <T> Event type - must be derived from class Time. (i.e. contain a timestamp).
+ */
@SuppressWarnings("checkstyle:hiddenfield")
private <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) {
for (final EventHandler<T> handler : handlers) {
@@ -184,25 +242,35 @@
/**
* Logs the currently running threads.
- *
- * @param level the level used for the log entry
+ * @param level Log level used to write the entry.
* @param prefix put before the comma-separated list of threads
*/
- private void logThreads(final Level level, final String prefix) {
- final StringBuilder sb = new StringBuilder(prefix);
- for (final Thread t : Thread.getAllStackTraces().keySet()) {
- sb.append(t.getName());
- sb.append(", ");
+ private static void logThreads(final Level level, final String prefix) {
+
+ if (LOG.isLoggable(level)) {
+
+ final StringBuilder sb = new StringBuilder(prefix);
+ for (final Thread t : Thread.getAllStackTraces().keySet()) {
+ sb.append(t.getName()).append(", ");
+ }
+
+ LOG.log(level, sb.toString());
}
- LOG.log(level, sb.toString());
}
+ /**
+ * Main event loop.
+ * Set up the event handlers, and go into event loop that polls the schedule and process events in it.
+ */
@Override
public void run() {
- LOG.entering(RuntimeClock.class.getCanonicalName(), "run");
+
+ LOG.entering(CLASS_NAME, "run");
try {
+
LOG.log(Level.FINE, "Subscribe event handlers");
+
subscribe(StartTime.class, this.startHandler.get());
subscribe(StopTime.class, this.stopHandler.get());
subscribe(RuntimeStart.class, this.runtimeStartHandler.get());
@@ -213,66 +281,82 @@
this.handlers.onNext(new RuntimeStart(this.timer.getCurrent()));
LOG.log(Level.FINE, "Initiate start time");
- final StartTime start = new StartTime(this.timer.getCurrent());
- this.handlers.onNext(start);
+ this.handlers.onNext(new StartTime(this.timer.getCurrent()));
while (true) {
- LOG.log(Level.FINEST, "Entering clock main loop iteration.");
+
+ LOG.log(Level.FINEST, "Enter clock main loop.");
+
try {
+
if (this.isIdle()) {
// Handle an idle clock event, without locking this.schedule
this.handlers.onNext(new IdleClock(timer.getCurrent()));
}
- Time time = null;
+ final Time event;
synchronized (this.schedule) {
+
while (this.schedule.isEmpty()) {
this.schedule.wait();
}
assert this.schedule.first() != null;
- // Wait until the first scheduled time is ready
- for (long duration = this.timer.getDuration(this.schedule.first().getTimeStamp());
- duration > 0;
- duration = this.timer.getDuration(this.schedule.first().getTimeStamp())) {
- // note: while I'm waiting, another alarm could be scheduled with a shorter duration
- // so the next time I go around the loop I need to revise my duration
- this.schedule.wait(duration);
+ // Wait until the first scheduled time is ready.
+ // NOTE: while waiting, another alarm could be scheduled with a shorter duration
+ // so the next time I go around the loop I need to revise my duration.
+ while (true) {
+ final long waitDuration = this.timer.getDuration(this.schedule.first());
+ if (waitDuration <= 0) {
+ break;
+ }
+ this.schedule.wait(waitDuration);
}
+
// Remove the event from the schedule and process it:
- time = this.schedule.pollFirst();
- assert time != null;
+ event = this.schedule.pollFirst();
}
- if (time instanceof Alarm) {
- final Alarm alarm = (Alarm) time;
- alarm.handle();
+ LOG.log(Level.FINER, "Process event: {0}", event);
+ assert event != null;
+
+ if (event instanceof Alarm) {
+
+ if (event instanceof ClientAlarm) {
+ --this.numClientAlarms;
+ assert this.numClientAlarms >= 0;
+ }
+
+ ((Alarm) event).run();
+
} else {
- this.handlers.onNext(time);
- if (time instanceof StopTime) {
+
+ this.handlers.onNext(event);
+
+ if (event instanceof StopTime) {
break; // we're done.
}
}
+
} catch (final InterruptedException expected) {
- // waiting interrupted - return to loop
+ LOG.log(Level.FINEST, "Wait interrupted; continue event loop.");
}
}
- if (this.stoppedOnException == null) {
- this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
- } else {
- this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.stoppedOnException));
- }
+
+ this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.exceptionCausedStop));
+
} catch (final Exception e) {
- e.printStackTrace();
+
+ LOG.log(Level.SEVERE, "Error in runtime clock", e);
this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));
+
} finally {
+
logThreads(Level.FINE, "Threads running after exiting the clock main loop: ");
LOG.log(Level.FINE, "Runtime clock exit");
}
- LOG.exiting(RuntimeClock.class.getCanonicalName(), "run");
+ LOG.exiting(CLASS_NAME, "run");
}
-
-
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java
index 9d97aae..1935938 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java
@@ -19,15 +19,59 @@
package org.apache.reef.wake.time.runtime;
import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.time.Time;
/**
* An interface for Timer.
+ * Default implementation uses actual system time.
*/
@DefaultImplementation(RealTimer.class)
public interface Timer {
+
+ /**
+ * Get current time in milliseconds since the beginning of the epoch (01/01/1970).
+ * Note that this time may not necessarily match the actual system time - e.g. in unit tests.
+ * @return Current system time in milliseconds since the start of the epoch.
+ */
long getCurrent();
+ /**
+ * Get the number of milliseconds between current time as tracked by the Timer implementation
+ * and the given event. Can return a negative number if the event is already in the past.
+ * @param time Timestamp in milliseconds.
+ * @return Difference in milliseconds between the given timestamp and the time tracked by the timer.
+ * The result is a negative number if the timestamp is in the past (according to the timer's time).
+ * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+ * Remove after release 0.16.
+ */
long getDuration(final long time);
+ /**
+ * Get the number of milliseconds between current time as tracked by the Timer implementation
+ * and the given event. Can return a negative number if the event is already in the past.
+ * @param time Timestamp object that wraps time in milliseconds.
+ * @return Difference in milliseconds between the given timestamp and the time tracked by the timer.
+ * The result is a negative number if the timestamp is in the past (according to the timer's time).
+ */
+ long getDuration(final Time time);
+
+ /**
+ * Check if the event with a given timestamp has occurred, according to the timer.
+ * Return true if the timestamp is equal or less than the timer's time, and false if
+ * it is still in the (timer's) future.
+ * @param time Timestamp in milliseconds.
+ * @return False if the given timestamp is still in the timer's time future.
+ * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+ * Remove after release 0.16.
+ */
boolean isReady(final long time);
+
+ /**
+ * Check if the event with a given timestamp has occurred, according to the timer.
+ * Return true if the timestamp is equal or less than the timer's time, and false if
+ * it is still in the (timer's) future.
+ * @param time Timestamp object that wraps time in milliseconds.
+ * @return False if the given timestamp is still in the timer's time future.
+ */
+ boolean isReady(final Time time);
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java
index 81c4802..3671cf3 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java
@@ -23,6 +23,7 @@
/**
* An event for client-created alarm.
+ * Contains a timestamp and the event handler to invoke at that time.
*/
public final class ClientAlarm extends Alarm {
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
index c444205..afb6866 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
@@ -18,15 +18,16 @@
*/
package org.apache.reef.wake.test.time;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.LoggingUtils;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.time.Time;
import org.apache.reef.wake.time.event.Alarm;
import org.apache.reef.wake.time.runtime.LogicalTimer;
+import org.apache.reef.wake.time.runtime.RealTimer;
import org.apache.reef.wake.time.runtime.RuntimeClock;
import org.apache.reef.wake.time.runtime.Timer;
import org.junit.Assert;
@@ -45,57 +46,51 @@
*/
public class ClockTest {
- private static RuntimeClock buildClock() throws Exception {
- final JavaConfigurationBuilder builder = Tang.Factory.getTang()
- .newConfigurationBuilder();
+ private static final Tang TANG = Tang.Factory.getTang();
- final Injector injector = Tang.Factory.getTang()
- .newInjector(builder.build());
+ private static RuntimeClock buildClock(
+ final Class<? extends Timer> timerClass) throws InjectionException {
- return injector.getInstance(RuntimeClock.class);
- }
+ final Configuration clockConfig = TANG.newConfigurationBuilder()
+ .bind(Timer.class, timerClass)
+ .build();
- private static RuntimeClock buildLogicalClock() throws Exception {
- final JavaConfigurationBuilder builder = Tang.Factory.getTang()
- .newConfigurationBuilder();
-
- builder.bind(Timer.class, LogicalTimer.class);
-
- final Injector injector = Tang.Factory.getTang()
- .newInjector(builder.build());
- return injector.getInstance(RuntimeClock.class);
+ return TANG.newInjector(clockConfig).getInstance(RuntimeClock.class);
}
@Test
public void testClock() throws Exception {
- LoggingUtils.setLoggingLevel(Level.FINE);
+
+ LoggingUtils.setLoggingLevel(Level.FINEST);
final int minEvents = 40;
final CountDownLatch eventCountLatch = new CountDownLatch(minEvents);
- final RuntimeClock clock = buildClock();
- new Thread(clock).start();
- final RandomAlarmProducer alarmProducer = new RandomAlarmProducer(clock, eventCountLatch);
+ try (final RuntimeClock clock = buildClock(RealTimer.class)) {
- try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) {
- stage.onNext(null);
- Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
- } finally {
- clock.close();
+ new Thread(clock).start();
+
+ final RandomAlarmProducer alarmProducer = new RandomAlarmProducer(clock, eventCountLatch);
+
+ try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) {
+ stage.onNext(null);
+ Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
+ }
}
}
@Test
public void testAlarmRegistrationRaceConditions() throws Exception {
- LoggingUtils.setLoggingLevel(Level.FINE);
- final RuntimeClock clock = buildClock();
- new Thread(clock).start();
+ LoggingUtils.setLoggingLevel(Level.FINEST);
- final EventRecorder earlierAlarmRecorder = new EventRecorder();
- final EventRecorder laterAlarmRecorder = new EventRecorder();
+ try (final RuntimeClock clock = buildClock(RealTimer.class)) {
- try {
+ new Thread(clock).start();
+
+ final EventRecorder earlierAlarmRecorder = new EventRecorder();
+ final EventRecorder laterAlarmRecorder = new EventRecorder();
+
// Schedule an Alarm that's far in the future
clock.scheduleAlarm(5000, laterAlarmRecorder);
Thread.sleep(1000);
@@ -117,72 +112,77 @@
// The later Alarm should have fired, since 6000 > 5000 ms have passed:
Assert.assertEquals(1, laterAlarmRecorder.getEventCount());
- } finally {
- clock.close();
}
}
@Test
public void testMultipleCloseCalls() throws Exception {
- LoggingUtils.setLoggingLevel(Level.FINE);
+
+ LoggingUtils.setLoggingLevel(Level.FINEST);
final int numThreads = 3;
final CountDownLatch eventCountLatch = new CountDownLatch(numThreads);
- final RuntimeClock clock = buildClock();
- new Thread(clock).start();
- final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(new EventHandler<Alarm>() {
- @Override
- public void onNext(final Alarm value) {
- clock.close();
- eventCountLatch.countDown();
- }
- }, numThreads);
+ try (final RuntimeClock clock = buildClock(RealTimer.class)) {
- try {
- for (int i = 0; i < numThreads; ++i) {
- stage.onNext(null);
+ final EventHandler<Alarm> handler = new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm value) {
+ clock.close();
+ eventCountLatch.countDown();
+ }
+ };
+
+ new Thread(clock).start();
+
+ try (final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(handler, numThreads)) {
+
+ for (int i = 0; i < numThreads; ++i) {
+ stage.onNext(null);
+ }
+
+ Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
}
- Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
- } finally {
- stage.close();
- clock.close();
}
}
@Test
public void testSimultaneousAlarms() throws Exception {
- LoggingUtils.setLoggingLevel(Level.FINE);
+
+ LoggingUtils.setLoggingLevel(Level.FINEST);
final int expectedEvent = 2;
final CountDownLatch eventCountLatch = new CountDownLatch(expectedEvent);
- final RuntimeClock clock = buildLogicalClock();
- new Thread(clock).start();
+ try (final RuntimeClock clock = buildClock(LogicalTimer.class)) {
- final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
- try {
+ new Thread(clock).start();
+
+ final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
+
clock.scheduleAlarm(500, alarmRecorder);
clock.scheduleAlarm(500, alarmRecorder);
+
eventCountLatch.await(10, TimeUnit.SECONDS);
+
Assert.assertEquals(expectedEvent, alarmRecorder.getEventCount());
- } finally {
- clock.close();
}
}
@Test
public void testAlarmOrder() throws Exception {
- LoggingUtils.setLoggingLevel(Level.FINE);
+
+ LoggingUtils.setLoggingLevel(Level.FINEST);
final int numAlarms = 10;
final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
- final RuntimeClock clock = buildLogicalClock();
- new Thread(clock).start();
+ try (final RuntimeClock clock = buildClock(LogicalTimer.class)) {
- final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
- try {
+ new Thread(clock).start();
+
+ final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
+
final long[] expected = new long[numAlarms];
for (int i = 0; i < numAlarms; ++i) {
clock.scheduleAlarm(i * 100, alarmRecorder);
@@ -191,15 +191,13 @@
eventCountLatch.await(10, TimeUnit.SECONDS);
- final Long[] actualLong = new Long[numAlarms];
- alarmRecorder.getTimestamps().toArray(actualLong);
+ int i = 0;
final long[] actual = new long[numAlarms];
- for (int i = 0; i < numAlarms; ++i) {
- actual[i] = actualLong[i];
+ for (final long ts : alarmRecorder.getTimestamps()) {
+ actual[i++] = ts;
}
+
Assert.assertArrayEquals(expected, actual);
- } finally {
- clock.close();
}
}
@@ -234,7 +232,7 @@
@Override
public void onNext(final Alarm event) {
- timestamps.add(event.getTimeStamp());
+ timestamps.add(event.getTimestamp());
events.add(event);
if (eventCountLatch != null) {
eventCountLatch.countDown();
diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
index f60c19c..177dabb 100644
--- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
+++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
@@ -101,7 +101,7 @@
*/
public String getStartTime() {
if (startTime != null) {
- return convertTime(startTime.getTimeStamp());
+ return convertTime(startTime.getTimestamp());
}
return null;
}
@@ -113,7 +113,7 @@
*/
public String getStopTime() {
if (stopTime != null) {
- return convertTime(stopTime.getTimeStamp());
+ return convertTime(stopTime.getTimestamp());
}
return null;
}
diff --git a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java
index 0c78e8c..28ef9b8 100644
--- a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java
+++ b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java
@@ -129,7 +129,7 @@
final ReefEventStateManager reefEventStateManager =
this.injector.getInstance(ReefEventStateManager.class);
- Assert.assertEquals(reefEventStateManager.getStopTime(), convertTime(st.getTimeStamp()));
+ Assert.assertEquals(reefEventStateManager.getStopTime(), convertTime(st.getTimestamp()));
}
@Test
@@ -144,7 +144,7 @@
final ReefEventStateManager reefEventStateManager =
this.injector.getInstance(ReefEventStateManager.class);
- Assert.assertEquals(reefEventStateManager.getStartTime(), convertTime(st.getTimeStamp()));
+ Assert.assertEquals(reefEventStateManager.getStartTime(), convertTime(st.getTimestamp()));
}
private String convertTime(final long time) {