[REEF-1527] Refactor RuntimeClock and implement graceful shutdown correctly.

 * Refactor RuntimeClock for readability and add more Javadoc comments;
 * Fix the bug in RuntimeClock.close() that clears all scheduled events without
   processing them;
 * Clean up and refactor code for Time and derived classes, add Javadocs;
 * Improve logging in RuntimeClock and around;
 * Cosmetic refactoring of RuntimeClock unit tests (new unit tests will be in a
   separate pull request)
 * Avoid loops over entire schedule in .close() and .isIdle() methods to find
   client alarms.

JIRA:
  [REEF-1527](https://issues.apache.org/jira/browse/REEF-1527) close

Pull Request:
  This closes #1096
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index c0d1ef5..b0c8d41 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -618,7 +618,7 @@
       LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ",
           driverRestartCompleted.getCompletedTime());
       try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
-          driverRestartCompleted.getCompletedTime().getTimeStamp())) {
+          driverRestartCompleted.getCompletedTime().getTimestamp())) {
         if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) {
           LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
 
@@ -639,7 +639,7 @@
     @Override
     public void onNext(final StopTime time) {
       LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
-      try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) {
+      try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimestamp())) {
         for (final ActiveContext context : contexts.values()) {
           context.close();
         }
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
old mode 100755
new mode 100644
index dfe693f..3ad5fe4
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
@@ -76,6 +76,7 @@
     LoggingSetup.setupCommonsLogging();
   }
 
+  /** Config parameter to turn on network IO profiling in Wake. */
   private final boolean isWakeProfilingEnabled;
 
   /** REEF version - we need it simply to write it to the log. */
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
index 3fd6ffc..416a4f6 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
@@ -151,26 +151,26 @@
 
   public static AvroRuntimeStart toAvroRuntimeStart(final RuntimeStart runtimeStart) {
     return AvroRuntimeStart.newBuilder()
-        .setTimestamp(runtimeStart.getTimeStamp())
+        .setTimestamp(runtimeStart.getTimestamp())
         .build();
   }
 
   public static AvroStartTime toAvroStartTime(final StartTime startTime) {
     return AvroStartTime.newBuilder()
-        .setTimestamp(startTime.getTimeStamp())
+        .setTimestamp(startTime.getTimestamp())
         .build();
   }
 
   public static AvroStopTime toAvroStopTime(final StopTime stopTime) {
     return AvroStopTime.newBuilder()
-        .setTimestamp(stopTime.getTimeStamp())
+        .setTimestamp(stopTime.getTimestamp())
         .build();
   }
 
   public static AvroRuntimeStop toAvroRuntimeStop(final RuntimeStop runtimeStop) {
     return AvroRuntimeStop.newBuilder()
         .setException(convertThrowableToString(runtimeStop.getException()))
-        .setTimestamp(runtimeStop.getTimeStamp())
+        .setTimestamp(runtimeStop.getTimestamp())
         .build();
   }
 
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
index 44e90b5..0150962 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java
@@ -18,53 +18,63 @@
  */
 package org.apache.reef.wake.time;
 
+import java.util.Date;
+
 /**
- * Time object.
+ * An abstract object that has a timestamp.
+ * That allows us to compare and order objects by the time.
  */
 public abstract class Time implements Comparable<Time> {
 
   private final long timestamp;
 
+  /**
+   * Initialize the internal timestamp. Timestamp remains constant
+   * for the entire lifecycle of the object.
+   * @param timestamp timestamp in milliseconds since the beginning
+   * of the epoch (01/01/1970).
+   */
   public Time(final long timestamp) {
     this.timestamp = timestamp;
   }
 
+  /**
+   * Get timestamp in milliseconds since the beginning of the epoch (01/01/1970).
+   * @return Object's timestamp in milliseconds since the start of the epoch.
+   */
+  public final long getTimestamp() {
+    return this.timestamp;
+  }
+
+  /**
+   * Get timestamp in milliseconds since the beginning of the epoch (01/01/1970).
+   * @return Object's timestamp in milliseconds since the start of the epoch.
+   * @deprecated [REEF-1532] Prefer using getTimestamp() instead.
+   * Remove after release 0.16.
+   */
   public final long getTimeStamp() {
     return this.timestamp;
   }
 
   @Override
-  public final String toString() {
-    return this.getClass().getName() + "[" + this.timestamp + "]";
+  public String toString() {
+    return this.getClass().getName()
+        + ":[" + this.timestamp + '|' + new Date(this.timestamp) + ']';
   }
 
   @Override
-  public final int compareTo(final Time o) {
-    if (this.timestamp < o.timestamp) {
-      return -1;
-    }
-    if (this.timestamp > o.timestamp) {
-      return 1;
-    }
-    if (this.hashCode() < o.hashCode()) {
-      return -1;
-    }
-    if (this.hashCode() > o.hashCode()) {
-      return 1;
-    }
-    return 0;
+  public int compareTo(final Time other) {
+    final int cmp = Long.compare(this.timestamp, other.timestamp);
+    return cmp != 0 ? cmp : Integer.compare(this.hashCode(), other.hashCode());
   }
 
   @Override
-  public final boolean equals(final Object o) {
-    if (o instanceof Time) {
-      return compareTo((Time) o) == 0;
-    }
-    return false;
+  public boolean equals(final Object other) {
+    return other instanceof Time && compareTo((Time) other) == 0;
   }
 
   @Override
-  public final int hashCode() {
+  public int hashCode() {
     return super.hashCode();
   }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java
index fb278a4..ac1ed6d 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java
@@ -22,9 +22,10 @@
 import org.apache.reef.wake.time.Time;
 
 /**
- * Represents a timer event.
+ * An alarm is a timer event to be invoked at a given time.
+ * Contains a (future) timestamp and the event handler to invoke.
  */
-public abstract class Alarm extends Time {
+public abstract class Alarm extends Time implements Runnable {
 
   private final EventHandler<Alarm> handler;
 
@@ -33,8 +34,11 @@
     this.handler = handler;
   }
 
-  public final void handle() {
+  /**
+   * Invoke the event handler and pass a reference to self as a parameter.
+   */
+  @Override
+  public final void run() {
     this.handler.onNext(this);
   }
-
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
index 72d558e..1ae2ed8 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java
@@ -18,36 +18,101 @@
  */
 package org.apache.reef.wake.time.runtime;
 
+import org.apache.reef.wake.time.Time;
+
 import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Logical timer.
+ * Logical timer that is only bound to the timestamps of the events tracked against it.
+ * In such setting, all events occur immediately, i.e. isReady() always return true,
+ * and the duration of the delay to the next event is always 0.
+ * Current time for this timer is always the timestamp of the tracked event that is
+ * the most distant in the future.
  */
 public final class LogicalTimer implements Timer {
 
-  private long current = 0;
+  /**
+   * Current time in milliseconds since the beginning of the epoch (01/01/1970),
+   * according to the timer. For this implementation, always keep the largest seen
+   * timestamp (i.e. track the event that is the most distant into the future).
+   */
+  private final AtomicLong current = new AtomicLong(0);
 
+  /**
+   * Instances of the timer should only be created automatically by Tang.
+   */
   @Inject
-  LogicalTimer() {
+  private LogicalTimer() {
   }
 
+  /**
+   * Get current time in milliseconds since the beginning of the epoch (01/01/1970).
+   * This timer implementation always returns the timestamp of the most distant
+   * future event ever checked against this timer in getDuration() or isReady() methods.
+   * Return 0 if there were no calls yet to getDuration() or isReady().
+   * @return Timestamp of the latest event (in milliseconds since the start of the epoch).
+   */
   @Override
   public long getCurrent() {
-    return this.current;
+    return this.current.get();
   }
 
+  /**
+   * Get the number of milliseconds between current time as tracked by the Timer implementation
+   * and a given event. This implementation always returns 0 and updates current timer's time
+   * to the timestamp of the most distant future event.
+   * @param time Timestamp in milliseconds.
+   * @return Always returns 0.
+   * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+   * Remove after release 0.16.
+   */
   @Override
   public long getDuration(final long time) {
-    isReady(time);
+    this.isReady(time);
     return 0;
   }
 
+  /**
+   * Get the number of milliseconds between current time as tracked by the Timer implementation
+   * and a given event. This implementation always returns 0 and updates current timer's time
+   * to the timestamp of the most distant future event.
+   * @param time Timestamp object that wraps time in milliseconds.
+   * @return Always returns 0.
+   */
   @Override
-  public boolean isReady(final long time) {
-    if (this.current < time) {
-      this.current = time;
-    }
-    return true;
+  public long getDuration(final Time time) {
+    return this.getDuration(time.getTimestamp());
   }
 
+  /**
+   * Check if the event with a given timestamp has occurred, according to the timer.
+   * This implementation always returns true and updates current timer's time to the timestamp
+   * of the most distant future event.
+   * @param time Timestamp in milliseconds.
+   * @return Always returns true.
+   * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+   * Remove after release 0.16.
+   */
+  @Override
+  public boolean isReady(final long time) {
+    while (true) {
+      final long thisTs = this.current.get();
+      if (thisTs >= time || this.current.compareAndSet(thisTs, time)) {
+        return true;
+      }
+    }
+  }
+
+  /**
+   * Check if the event with a given timestamp has occurred, according to the timer.
+   * This implementation always returns true and updates current timer's time to the timestamp
+   * of the most distant future event.
+   * @param time Timestamp object that wraps time in milliseconds.
+   * @return Always returns true.
+   */
+  @Override
+  public boolean isReady(final Time time) {
+    return this.isReady(time.getTimestamp());
+  }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java
index 09ab817..402644a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java
@@ -18,29 +18,78 @@
  */
 package org.apache.reef.wake.time.runtime;
 
+import org.apache.reef.wake.time.Time;
+
 import javax.inject.Inject;
 
 /**
- * A system-time based Timer.
+ * Implementation of the Timer that uses the system clock.
  */
 public final class RealTimer implements Timer {
 
+  /**
+   * Instances of the timer should only be created automatically by Tang.
+   */
   @Inject
-  public RealTimer() {
+  private RealTimer() {
   }
 
+  /**
+   * Get current time in milliseconds since the beginning of the epoch (01/01/1970).
+   * @return Current system time in milliseconds since the start of the epoch.
+   */
   @Override
   public long getCurrent() {
     return System.currentTimeMillis();
   }
 
+  /**
+   * Get the number of milliseconds from current system time to the given event.
+   * Can return a negative number if the event is already in the past.
+   * @param time Timestamp in milliseconds.
+   * @return Difference in milliseconds between the given timestamp and current system time.
+   * The result is a negative number if the timestamp is in the past.
+   * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+   * Remove after release 0.16.
+   */
   @Override
   public long getDuration(final long time) {
     return time - getCurrent();
   }
 
+  /**
+   * Get the number of milliseconds from current system time to the given event.
+   * Can return a negative number if the event is already in the past.
+   * @param time Timestamp object that wraps time in milliseconds.
+   * @return Difference in milliseconds between the given timestamp and current system time.
+   * The result is a negative number if the timestamp is in the past.
+   */
+  @Override
+  public long getDuration(final Time time) {
+    return time.getTimestamp() - getCurrent();
+  }
+
+  /**
+   * Check if the event with a given timestamp has occurred. Return true if the timestamp
+   * equals or less than the current system time, and false if it is still in the future.
+   * @param time Timestamp in milliseconds.
+   * @return False if the given timestamp is still in the future.
+   * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+   * Remove after release 0.16.
+   */
   @Override
   public boolean isReady(final long time) {
     return getDuration(time) <= 0;
   }
+
+  /**
+   * Check if the event with a given timestamp has occurred. Return true if the timestamp
+   * equals or less than the current system time, and false if it is still in the future.
+   * @param time Timestamp object that wraps time in milliseconds.
+   * @return False if the given timestamp is still in the future.
+   */
+  @Override
+  public boolean isReady(final Time time) {
+    return getDuration(time) <= 0;
+  }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
index a0ab566..43a25ac 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
@@ -44,13 +44,23 @@
  */
 public final class RuntimeClock implements Clock {
 
-  private static final Logger LOG = Logger.getLogger(Clock.class.toString());
+  private static final Logger LOG = Logger.getLogger(RuntimeClock.class.getName());
+  private static final String CLASS_NAME = RuntimeClock.class.getCanonicalName();
 
+  /**
+   * Injectable source of current time information.
+   * Usually an instance of RealTimer that wraps the system clock.
+   */
   private final Timer timer;
 
-  private final TreeSet<Time> schedule;
+  /**
+   * An ordered set of timed objects, in ascending order of their timestamps.
+   * It also serves as the main synchronization monitor for the class.
+   */
+  private final TreeSet<Time> schedule = new TreeSet<>();
 
-  private final PubSubEventHandler<Time> handlers;
+  /** Event handlers - populated with the injectable parameters provided to the RuntimeClock constructor. */
+  private final PubSubEventHandler<Time> handlers = new PubSubEventHandler<>();
 
   private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler;
   private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler;
@@ -58,123 +68,171 @@
   private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler;
   private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler;
 
-  private Throwable stoppedOnException;
-  private boolean closed = false;
+  /**
+   * Timestamp of the last client alarm in the schedule.
+   * We use it to schedule a graceful shutdown event immediately after all client alarms.
+   */
+  private long lastClientAlarm = 0;
+
+  /**
+   * Number of client alarms in the schedule.
+   * We need it to determine whether event loop is idle (i.e. has no client alarms scheduled)
+   */
+  private int numClientAlarms = 0;
+
+  /** Set to true when the clock is closed. */
+  private boolean isClosed = false;
+
+  /** Exception that caused the clock to stop. */
+  private Throwable exceptionCausedStop = null;
 
   @Inject
-  RuntimeClock(final Timer timer,
-               @Parameter(Clock.StartHandler.class) final InjectionFuture<Set<EventHandler<StartTime>>> startHandler,
-               @Parameter(StopHandler.class) final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler,
-               @Parameter(Clock.RuntimeStartHandler.class)
-               final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler,
-               @Parameter(Clock.RuntimeStopHandler.class)
-               final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler,
-               @Parameter(IdleHandler.class) final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) {
-    this.timer = timer;
-    this.schedule = new TreeSet<>();
-    this.handlers = new PubSubEventHandler<>();
+  private RuntimeClock(
+      final Timer timer,
+      @Parameter(Clock.StartHandler.class)
+          final InjectionFuture<Set<EventHandler<StartTime>>> startHandler,
+      @Parameter(Clock.StopHandler.class)
+          final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler,
+      @Parameter(Clock.RuntimeStartHandler.class)
+          final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler,
+      @Parameter(Clock.RuntimeStopHandler.class)
+          final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler,
+      @Parameter(Clock.IdleHandler.class)
+          final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) {
 
+    this.timer = timer;
     this.startHandler = startHandler;
     this.stopHandler = stopHandler;
     this.runtimeStartHandler = runtimeStartHandler;
     this.runtimeStopHandler = runtimeStopHandler;
     this.idleHandler = idleHandler;
 
-    this.stoppedOnException = null;
-
     LOG.log(Level.FINE, "RuntimeClock instantiated.");
   }
 
+  /**
+   * Schedule a new Alarm event in `offset` milliseconds into the future,
+   * and supply an event handler to be called at that time.
+   * @param offset Number of milliseconds into the future relative to current time.
+   * @param handler Event handler to be invoked.
+   * @throws IllegalStateException if the clock is already closed.
+   */
   @Override
   public void scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
+
+    final Time alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler);
+
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, "Schedule alarm: {0}", alarm);
+    }
+
     synchronized (this.schedule) {
-      if (this.closed) {
+
+      if (this.isClosed) {
         throw new IllegalStateException("Scheduling alarm on a closed clock");
       }
 
-      this.schedule.add(new ClientAlarm(this.timer.getCurrent() + offset, handler));
+      if (alarm.getTimestamp() > this.lastClientAlarm) {
+        this.lastClientAlarm = alarm.getTimestamp();
+      }
+
+      assert this.numClientAlarms >= 0;
+      ++this.numClientAlarms;
+
+      this.schedule.add(alarm);
       this.schedule.notifyAll();
     }
   }
 
-  public void registerEventHandler(final Class<? extends Time> clazz, final EventHandler<Time> handler) {
-    this.handlers.subscribe(clazz, handler);
-  }
-
-  public void scheduleRuntimeAlarm(final int offset, final EventHandler<Alarm> handler) {
-    synchronized (this.schedule) {
-      this.schedule.add(new RuntimeAlarm(this.timer.getCurrent() + offset, handler));
-      this.schedule.notifyAll();
-    }
-  }
-
+  /**
+   * Stop the clock. Remove all other events from the schedule and fire StopTimer
+   * event immediately. It is recommended to use close() method for graceful shutdown
+   * instead of stop().
+   */
   @Override
   public void stop() {
     this.stop(null);
   }
 
+  /**
+   * Stop the clock on exception.
+   * Remove all other events from the schedule and fire StopTimer event immediately.
+   * @param exception Exception that is the cause for the stop. Can be null.
+   */
   @Override
-  public void stop(final Throwable stopOnException) {
-    LOG.entering(RuntimeClock.class.getCanonicalName(), "stop");
-    synchronized (this.schedule) {
-      this.schedule.clear();
-      this.schedule.add(new StopTime(timer.getCurrent()));
-      this.schedule.notifyAll();
-      this.closed = true;
-      if (this.stoppedOnException == null) {
-        this.stoppedOnException = stopOnException;
-      }
-    }
-    LOG.exiting(RuntimeClock.class.getCanonicalName(), "stop");
-  }
+  public void stop(final Throwable exception) {
 
-  @Override
-  public void close() {
-    LOG.entering(RuntimeClock.class.getCanonicalName(), "close");
+    LOG.entering(CLASS_NAME, "stop");
+
     synchronized (this.schedule) {
-      if (this.closed) {
-        LOG.log(Level.INFO, "Clock is already closed");
+
+      if (this.isClosed) {
+        LOG.log(Level.FINEST, "Clock has already been closed");
         return;
       }
+
+      this.isClosed = true;
+      this.exceptionCausedStop = exception;
+
+      final Time stopEvent = new StopTime(this.timer.getCurrent());
+      LOG.log(Level.FINE, "Stop scheduled immediately: {0}", stopEvent);
+
+      this.numClientAlarms = 0;
+      assert this.numClientAlarms >= 0;
+
       this.schedule.clear();
-      this.schedule.add(new StopTime(findAcceptableStopTime()));
+      this.schedule.add(stopEvent);
       this.schedule.notifyAll();
-      this.closed = true;
-      LOG.log(Level.INFO, "Clock.close()");
     }
-    LOG.exiting(RuntimeClock.class.getCanonicalName(), "close");
+
+    LOG.exiting(CLASS_NAME, "stop");
   }
 
   /**
-   * Finds an acceptable stop time, which is the
-   * a time beyond that of any client alarm.
-   *
-   * @return an acceptable stop time
+   * Wait for all client alarms to finish executing and gracefully shutdown the clock.
    */
-  private long findAcceptableStopTime() {
-    long time = timer.getCurrent();
-    for (final Time t : this.schedule) {
-      if (t instanceof ClientAlarm) {
-        assert time <= t.getTimeStamp();
-        time = t.getTimeStamp();
+  @Override
+  public void close() {
+
+    LOG.entering(CLASS_NAME, "close");
+
+    synchronized (this.schedule) {
+
+      if (this.isClosed) {
+        LOG.log(Level.FINEST, "Clock has already been closed");
+        return;
       }
+
+      this.isClosed = true;
+
+      final Time stopEvent = new StopTime(Math.max(this.timer.getCurrent(), this.lastClientAlarm + 1));
+      LOG.log(Level.FINE, "Graceful shutdown scheduled: {0}", stopEvent);
+
+      this.schedule.add(stopEvent);
+      this.schedule.notifyAll();
     }
-    return time + 1;
+
+    LOG.exiting(CLASS_NAME, "close");
   }
 
-
+  /**
+   * Check if there are no client alarms scheduled.
+   * @return True if there are no client alarms in the schedule, false otherwise.
+   */
   @Override
   public boolean isIdle() {
     synchronized (this.schedule) {
-      for (final Time t : this.schedule) {
-        if (t instanceof ClientAlarm) {
-          return false;
-        }
-      }
-      return true;
+      assert this.numClientAlarms >= 0;
+      return this.numClientAlarms == 0;
     }
   }
 
+  /**
+   * Register event handlers for the given event class.
+   * @param eventClass Event type to handle. Must be derived from Time.
+   * @param handlers One or many event handlers that can process given event type.
+   * @param <T> Event type - must be derived from class Time. (i.e. contain a timestamp).
+   */
   @SuppressWarnings("checkstyle:hiddenfield")
   private <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) {
     for (final EventHandler<T> handler : handlers) {
@@ -184,25 +242,35 @@
 
   /**
    * Logs the currently running threads.
-   *
-   * @param level  the level used for the log entry
+   * @param level Log level used to write the entry.
    * @param prefix put before the comma-separated list of threads
    */
-  private void logThreads(final Level level, final String prefix) {
-    final StringBuilder sb = new StringBuilder(prefix);
-    for (final Thread t : Thread.getAllStackTraces().keySet()) {
-      sb.append(t.getName());
-      sb.append(", ");
+  private static void logThreads(final Level level, final String prefix) {
+
+    if (LOG.isLoggable(level)) {
+
+      final StringBuilder sb = new StringBuilder(prefix);
+      for (final Thread t : Thread.getAllStackTraces().keySet()) {
+        sb.append(t.getName()).append(", ");
+      }
+
+      LOG.log(level, sb.toString());
     }
-    LOG.log(level, sb.toString());
   }
 
+  /**
+   * Main event loop.
+   * Set up the event handlers, and go into event loop that polls the schedule and process events in it.
+   */
   @Override
   public void run() {
-    LOG.entering(RuntimeClock.class.getCanonicalName(), "run");
+
+    LOG.entering(CLASS_NAME, "run");
 
     try {
+
       LOG.log(Level.FINE, "Subscribe event handlers");
+
       subscribe(StartTime.class, this.startHandler.get());
       subscribe(StopTime.class, this.stopHandler.get());
       subscribe(RuntimeStart.class, this.runtimeStartHandler.get());
@@ -213,66 +281,82 @@
       this.handlers.onNext(new RuntimeStart(this.timer.getCurrent()));
 
       LOG.log(Level.FINE, "Initiate start time");
-      final StartTime start = new StartTime(this.timer.getCurrent());
-      this.handlers.onNext(start);
+      this.handlers.onNext(new StartTime(this.timer.getCurrent()));
 
       while (true) {
-        LOG.log(Level.FINEST, "Entering clock main loop iteration.");
+
+        LOG.log(Level.FINEST, "Enter clock main loop.");
+
         try {
+
           if (this.isIdle()) {
             // Handle an idle clock event, without locking this.schedule
             this.handlers.onNext(new IdleClock(timer.getCurrent()));
           }
 
-          Time time = null;
+          final Time event;
           synchronized (this.schedule) {
+
             while (this.schedule.isEmpty()) {
               this.schedule.wait();
             }
 
             assert this.schedule.first() != null;
 
-            // Wait until the first scheduled time is ready
-            for (long duration = this.timer.getDuration(this.schedule.first().getTimeStamp());
-                 duration > 0;
-                 duration = this.timer.getDuration(this.schedule.first().getTimeStamp())) {
-              // note: while I'm waiting, another alarm could be scheduled with a shorter duration
-              // so the next time I go around the loop I need to revise my duration
-              this.schedule.wait(duration);
+            // Wait until the first scheduled time is ready.
+            // NOTE: while waiting, another alarm could be scheduled with a shorter duration
+            // so the next time I go around the loop I need to revise my duration.
+            while (true) {
+              final long waitDuration = this.timer.getDuration(this.schedule.first());
+              if (waitDuration <= 0) {
+                break;
+              }
+              this.schedule.wait(waitDuration);
             }
+
             // Remove the event from the schedule and process it:
-            time = this.schedule.pollFirst();
-            assert time != null;
+            event = this.schedule.pollFirst();
           }
 
-          if (time instanceof Alarm) {
-            final Alarm alarm = (Alarm) time;
-            alarm.handle();
+          LOG.log(Level.FINER, "Process event: {0}", event);
+          assert event != null;
+
+          if (event instanceof Alarm) {
+
+            if (event instanceof ClientAlarm) {
+              --this.numClientAlarms;
+              assert this.numClientAlarms >= 0;
+            }
+
+            ((Alarm) event).run();
+
           } else {
-            this.handlers.onNext(time);
-            if (time instanceof StopTime) {
+
+            this.handlers.onNext(event);
+
+            if (event instanceof StopTime) {
               break; // we're done.
             }
           }
+
         } catch (final InterruptedException expected) {
-          // waiting interrupted - return to loop
+          LOG.log(Level.FINEST, "Wait interrupted; continue event loop.");
         }
       }
-      if (this.stoppedOnException == null) {
-        this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
-      } else {
-        this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.stoppedOnException));
-      }
+
+      this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.exceptionCausedStop));
+
     } catch (final Exception e) {
-      e.printStackTrace();
+
+      LOG.log(Level.SEVERE, "Error in runtime clock", e);
       this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));
+
     } finally {
+
       logThreads(Level.FINE, "Threads running after exiting the clock main loop: ");
       LOG.log(Level.FINE, "Runtime clock exit");
     }
-    LOG.exiting(RuntimeClock.class.getCanonicalName(), "run");
 
+    LOG.exiting(CLASS_NAME, "run");
   }
-
-
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java
index 9d97aae..1935938 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java
@@ -19,15 +19,59 @@
 package org.apache.reef.wake.time.runtime;
 
 import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.time.Time;
 
 /**
  * An interface for Timer.
+ * Default implementation uses actual system time.
  */
 @DefaultImplementation(RealTimer.class)
 public interface Timer {
+
+  /**
+   * Get current time in milliseconds since the beginning of the epoch (01/01/1970).
+   * Note that this time may not necessarily match the actual system time - e.g. in unit tests.
+   * @return Current system time in milliseconds since the start of the epoch.
+   */
   long getCurrent();
 
+  /**
+   * Get the number of milliseconds between current time as tracked by the Timer implementation
+   * and the given event. Can return a negative number if the event is already in the past.
+   * @param time Timestamp in milliseconds.
+   * @return Difference in milliseconds between the given timestamp and the time tracked by the timer.
+   * The result is a negative number if the timestamp is in the past (according to the timer's time).
+   * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+   * Remove after release 0.16.
+   */
   long getDuration(final long time);
 
+  /**
+   * Get the number of milliseconds between current time as tracked by the Timer implementation
+   * and the given event. Can return a negative number if the event is already in the past.
+   * @param time Timestamp object that wraps time in milliseconds.
+   * @return Difference in milliseconds between the given timestamp and the time tracked by the timer.
+   * The result is a negative number if the timestamp is in the past (according to the timer's time).
+   */
+  long getDuration(final Time time);
+
+  /**
+   * Check if the event with a given timestamp has occurred, according to the timer.
+   * Return true if the timestamp is equal or less than the timer's time, and false if
+   * it is still in the (timer's) future.
+   * @param time Timestamp in milliseconds.
+   * @return False if the given timestamp is still in the timer's time future.
+   * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp.
+   * Remove after release 0.16.
+   */
   boolean isReady(final long time);
+
+  /**
+   * Check if the event with a given timestamp has occurred, according to the timer.
+   * Return true if the timestamp is equal or less than the timer's time, and false if
+   * it is still in the (timer's) future.
+   * @param time Timestamp object that wraps time in milliseconds.
+   * @return False if the given timestamp is still in the timer's time future.
+   */
+  boolean isReady(final Time time);
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java
index 81c4802..3671cf3 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java
@@ -23,6 +23,7 @@
 
 /**
  * An event for client-created alarm.
+ * Contains a timestamp and the event handler to invoke at that time.
  */
 public final class ClientAlarm extends Alarm {
 
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
index c444205..afb6866 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
@@ -18,15 +18,16 @@
  */
 package org.apache.reef.wake.test.time;
 
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.impl.LoggingUtils;
 import org.apache.reef.wake.impl.ThreadPoolStage;
 import org.apache.reef.wake.time.Time;
 import org.apache.reef.wake.time.event.Alarm;
 import org.apache.reef.wake.time.runtime.LogicalTimer;
+import org.apache.reef.wake.time.runtime.RealTimer;
 import org.apache.reef.wake.time.runtime.RuntimeClock;
 import org.apache.reef.wake.time.runtime.Timer;
 import org.junit.Assert;
@@ -45,57 +46,51 @@
  */
 public class ClockTest {
 
-  private static RuntimeClock buildClock() throws Exception {
-    final JavaConfigurationBuilder builder = Tang.Factory.getTang()
-            .newConfigurationBuilder();
+  private static final Tang TANG = Tang.Factory.getTang();
 
-    final Injector injector = Tang.Factory.getTang()
-            .newInjector(builder.build());
+  private static RuntimeClock buildClock(
+      final Class<? extends Timer> timerClass) throws InjectionException {
 
-    return injector.getInstance(RuntimeClock.class);
-  }
+    final Configuration clockConfig = TANG.newConfigurationBuilder()
+        .bind(Timer.class, timerClass)
+        .build();
 
-  private static RuntimeClock buildLogicalClock() throws Exception {
-    final JavaConfigurationBuilder builder = Tang.Factory.getTang()
-        .newConfigurationBuilder();
-
-    builder.bind(Timer.class, LogicalTimer.class);
-
-    final Injector injector = Tang.Factory.getTang()
-        .newInjector(builder.build());
-    return injector.getInstance(RuntimeClock.class);
+    return TANG.newInjector(clockConfig).getInstance(RuntimeClock.class);
   }
 
   @Test
   public void testClock() throws Exception {
-    LoggingUtils.setLoggingLevel(Level.FINE);
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
 
     final int minEvents = 40;
     final CountDownLatch eventCountLatch = new CountDownLatch(minEvents);
 
-    final RuntimeClock clock = buildClock();
-    new Thread(clock).start();
-    final RandomAlarmProducer alarmProducer = new RandomAlarmProducer(clock, eventCountLatch);
+    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
 
-    try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) {
-      stage.onNext(null);
-      Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
-    } finally {
-      clock.close();
+      new Thread(clock).start();
+
+      final RandomAlarmProducer alarmProducer = new RandomAlarmProducer(clock, eventCountLatch);
+
+      try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) {
+        stage.onNext(null);
+        Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
+      }
     }
   }
 
   @Test
   public void testAlarmRegistrationRaceConditions() throws Exception {
-    LoggingUtils.setLoggingLevel(Level.FINE);
 
-    final RuntimeClock clock = buildClock();
-    new Thread(clock).start();
+    LoggingUtils.setLoggingLevel(Level.FINEST);
 
-    final EventRecorder earlierAlarmRecorder = new EventRecorder();
-    final EventRecorder laterAlarmRecorder = new EventRecorder();
+    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
 
-    try {
+      new Thread(clock).start();
+
+      final EventRecorder earlierAlarmRecorder = new EventRecorder();
+      final EventRecorder laterAlarmRecorder = new EventRecorder();
+
       // Schedule an Alarm that's far in the future
       clock.scheduleAlarm(5000, laterAlarmRecorder);
       Thread.sleep(1000);
@@ -117,72 +112,77 @@
 
       // The later Alarm should have fired, since 6000 > 5000 ms have passed:
       Assert.assertEquals(1, laterAlarmRecorder.getEventCount());
-    } finally {
-      clock.close();
     }
   }
 
   @Test
   public void testMultipleCloseCalls() throws Exception {
-    LoggingUtils.setLoggingLevel(Level.FINE);
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
 
     final int numThreads = 3;
     final CountDownLatch eventCountLatch = new CountDownLatch(numThreads);
 
-    final RuntimeClock clock = buildClock();
-    new Thread(clock).start();
-    final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(new EventHandler<Alarm>() {
-      @Override
-      public void onNext(final Alarm value) {
-        clock.close();
-        eventCountLatch.countDown();
-      }
-    }, numThreads);
+    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
 
-    try {
-      for (int i = 0; i < numThreads; ++i) {
-        stage.onNext(null);
+      final EventHandler<Alarm> handler = new EventHandler<Alarm>() {
+        @Override
+        public void onNext(final Alarm value) {
+          clock.close();
+          eventCountLatch.countDown();
+        }
+      };
+
+      new Thread(clock).start();
+
+      try (final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(handler, numThreads)) {
+
+        for (int i = 0; i < numThreads; ++i) {
+          stage.onNext(null);
+        }
+
+        Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
       }
-      Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
-    } finally {
-      stage.close();
-      clock.close();
     }
   }
 
   @Test
   public void testSimultaneousAlarms() throws Exception {
-    LoggingUtils.setLoggingLevel(Level.FINE);
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
 
     final int expectedEvent = 2;
     final CountDownLatch eventCountLatch = new CountDownLatch(expectedEvent);
 
-    final RuntimeClock clock = buildLogicalClock();
-    new Thread(clock).start();
+    try (final RuntimeClock clock = buildClock(LogicalTimer.class)) {
 
-    final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
-    try {
+      new Thread(clock).start();
+
+      final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
+
       clock.scheduleAlarm(500, alarmRecorder);
       clock.scheduleAlarm(500, alarmRecorder);
+
       eventCountLatch.await(10, TimeUnit.SECONDS);
+
       Assert.assertEquals(expectedEvent, alarmRecorder.getEventCount());
-    } finally {
-      clock.close();
     }
   }
 
   @Test
   public void testAlarmOrder() throws Exception {
-    LoggingUtils.setLoggingLevel(Level.FINE);
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
 
     final int numAlarms = 10;
     final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
 
-    final RuntimeClock clock = buildLogicalClock();
-    new Thread(clock).start();
+    try (final RuntimeClock clock = buildClock(LogicalTimer.class)) {
 
-    final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
-    try {
+      new Thread(clock).start();
+
+      final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
+
       final long[] expected = new long[numAlarms];
       for (int i = 0; i < numAlarms; ++i) {
         clock.scheduleAlarm(i * 100, alarmRecorder);
@@ -191,15 +191,13 @@
 
       eventCountLatch.await(10, TimeUnit.SECONDS);
 
-      final Long[] actualLong = new Long[numAlarms];
-      alarmRecorder.getTimestamps().toArray(actualLong);
+      int i = 0;
       final long[] actual = new long[numAlarms];
-      for (int i = 0; i < numAlarms; ++i) {
-        actual[i] = actualLong[i];
+      for (final long ts : alarmRecorder.getTimestamps()) {
+        actual[i++] = ts;
       }
+
       Assert.assertArrayEquals(expected, actual);
-    } finally {
-      clock.close();
     }
   }
 
@@ -234,7 +232,7 @@
 
     @Override
     public void onNext(final Alarm event) {
-      timestamps.add(event.getTimeStamp());
+      timestamps.add(event.getTimestamp());
       events.add(event);
       if (eventCountLatch != null) {
         eventCountLatch.countDown();
diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
index f60c19c..177dabb 100644
--- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
+++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
@@ -101,7 +101,7 @@
    */
   public String getStartTime() {
     if (startTime != null) {
-      return convertTime(startTime.getTimeStamp());
+      return convertTime(startTime.getTimestamp());
     }
     return null;
   }
@@ -113,7 +113,7 @@
    */
   public String getStopTime() {
     if (stopTime != null) {
-      return convertTime(stopTime.getTimeStamp());
+      return convertTime(stopTime.getTimestamp());
     }
     return null;
   }
diff --git a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java
index 0c78e8c..28ef9b8 100644
--- a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java
+++ b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java
@@ -129,7 +129,7 @@
     final ReefEventStateManager reefEventStateManager =
         this.injector.getInstance(ReefEventStateManager.class);
 
-    Assert.assertEquals(reefEventStateManager.getStopTime(), convertTime(st.getTimeStamp()));
+    Assert.assertEquals(reefEventStateManager.getStopTime(), convertTime(st.getTimestamp()));
   }
 
   @Test
@@ -144,7 +144,7 @@
 
     final ReefEventStateManager reefEventStateManager =
         this.injector.getInstance(ReefEventStateManager.class);
-    Assert.assertEquals(reefEventStateManager.getStartTime(), convertTime(st.getTimeStamp()));
+    Assert.assertEquals(reefEventStateManager.getStartTime(), convertTime(st.getTimestamp()));
   }
 
   private String convertTime(final long time) {