| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Threading; |
| using Org.Apache.REEF.Tang.Annotations; |
| using Org.Apache.REEF.Tang.Implementations.InjectionPlan; |
| using Org.Apache.REEF.Utilities; |
| using Org.Apache.REEF.Utilities.Collections; |
| using Org.Apache.REEF.Utilities.Diagnostics; |
| using Org.Apache.REEF.Utilities.Logging; |
| using Org.Apache.REEF.Wake.RX.Impl; |
| using Org.Apache.REEF.Wake.Time.Event; |
| using Org.Apache.REEF.Wake.Time.Runtime.Event; |
| |
| namespace Org.Apache.REEF.Wake.Time.Runtime |
| { |
| public sealed class RuntimeClock : IClock |
| { |
| private static readonly Logger LOGGER = Logger.GetLogger(typeof(RuntimeClock)); |
| |
| private static int numberOfInstantiations = 0; |
| private readonly ITimer _timer; |
| private readonly PubSubSubject<Time> _handlers; |
| private readonly PriorityQueue<Time> _schedule; |
| |
| // TODO[REEF-1373]: Remove the the _old* handlers |
| private readonly IInjectionFuture<ISet<IObserver<StartTime>>> _oldStartHandler; |
| private readonly IInjectionFuture<ISet<IObserver<StopTime>>> _oldStopHandler; |
| private readonly IInjectionFuture<ISet<IObserver<RuntimeStart>>> _oldRuntimeStartHandler; |
| private readonly IInjectionFuture<ISet<IObserver<RuntimeStop>>> _oldRuntimeStopHandler; |
| private readonly IInjectionFuture<ISet<IObserver<IdleClock>>> _oldIdleHandler; |
| |
| private readonly IInjectionFuture<ISet<IObserver<StartTime>>> _startHandler; |
| private readonly IInjectionFuture<ISet<IObserver<StopTime>>> _stopHandler; |
| private readonly IInjectionFuture<ISet<IObserver<RuntimeStart>>> _runtimeStartHandler; |
| private readonly IInjectionFuture<ISet<IObserver<RuntimeStop>>> _runtimeStopHandler; |
| private readonly IInjectionFuture<ISet<IObserver<IdleClock>>> _idleHandler; |
| |
| private bool _disposed; |
| |
| /// <summary> |
| /// Create a new RuntimeClock with injectable IObservers |
| /// </summary> |
| /// <param name="timer">The runtime clock timer</param> |
| /// <param name="startHandler">The start handler</param> |
| /// <param name="stopHandler">The stop handler</param> |
| /// <param name="runtimeStartHandler">The runtime start handler</param> |
| /// <param name="runtimeStopHandler">The runtime stop handler</param> |
| /// <param name="idleHandler">The idle handler</param> |
| /// <param name="oldStartHandler">Start handlers prior to REEF-1372</param> |
| /// <param name="oldStopHandler">Stop handlers prior to REEF-1372</param> |
| /// <param name="oldRuntimeStartHandler">Runtime start handlers prior to REEF-1372</param> |
| /// <param name="oldRuntimeStopHandler">Runtime stop handlers prior to REEF-1372</param> |
| /// <param name="oldIdleHandler">Idle handlers prior to REEF-1372</param> |
| #pragma warning disable 618 |
| [Inject] |
| private RuntimeClock( |
| ITimer timer, |
| |
| // TODO[REEF-1373]: Remove the _old* handlers |
| [Parameter(typeof(StartHandler))] IInjectionFuture<ISet<IObserver<StartTime>>> oldStartHandler, |
| [Parameter(typeof(StopHandler))] IInjectionFuture<ISet<IObserver<StopTime>>> oldStopHandler, |
| [Parameter(typeof(RuntimeStartHandler))] IInjectionFuture<ISet<IObserver<RuntimeStart>>> oldRuntimeStartHandler, |
| [Parameter(typeof(RuntimeStopHandler))] IInjectionFuture<ISet<IObserver<RuntimeStop>>> oldRuntimeStopHandler, |
| [Parameter(typeof(IdleHandler))] IInjectionFuture<ISet<IObserver<IdleClock>>> oldIdleHandler, |
| |
| [Parameter(typeof(Parameters.StartHandler))] IInjectionFuture<ISet<IObserver<StartTime>>> startHandler, |
| [Parameter(typeof(Parameters.StopHandler))] IInjectionFuture<ISet<IObserver<StopTime>>> stopHandler, |
| [Parameter(typeof(Parameters.RuntimeStartHandler))] IInjectionFuture<ISet<IObserver<RuntimeStart>>> runtimeStartHandler, |
| [Parameter(typeof(Parameters.RuntimeStopHandler))] IInjectionFuture<ISet<IObserver<RuntimeStop>>> runtimeStopHandler, |
| [Parameter(typeof(Parameters.IdleHandler))] IInjectionFuture<ISet<IObserver<IdleClock>>> idleHandler) |
| { |
| #pragma warning restore 618 |
| _timer = timer; |
| _schedule = new PriorityQueue<Time>(); |
| _handlers = new PubSubSubject<Time>(); |
| |
| _startHandler = startHandler; |
| _stopHandler = stopHandler; |
| _runtimeStartHandler = runtimeStartHandler; |
| _runtimeStopHandler = runtimeStopHandler; |
| _idleHandler = idleHandler; |
| |
| // TODO[REEF-1373]: Remove the _old* handlers |
| _oldStartHandler = oldStartHandler; |
| _oldStopHandler = oldStopHandler; |
| _oldRuntimeStartHandler = oldRuntimeStartHandler; |
| _oldRuntimeStopHandler = oldRuntimeStopHandler; |
| _oldIdleHandler = oldIdleHandler; |
| |
| ++numberOfInstantiations; |
| if (numberOfInstantiations > 1) |
| { |
| LOGGER.Log(Level.Warning, "Instantiated `RuntimeClock` instance number " + numberOfInstantiations); |
| } |
| } |
| |
| /// <summary> |
| /// Schedule a TimerEvent at the given future offset |
| /// </summary> |
| /// <param name="offset">The offset in the future to schedule the alarm, in msec</param> |
| /// <param name="handler">The IObserver to to be called</param> |
| public override void ScheduleAlarm(long offset, IObserver<Alarm> handler) |
| { |
| if (_disposed) |
| { |
| return; |
| } |
| if (handler == null) |
| { |
| Exceptions.Throw(new ArgumentNullException("handler"), LOGGER); |
| } |
| |
| lock (_schedule) |
| { |
| _schedule.Add(new ClientAlarm(_timer.CurrentTime + offset, handler)); |
| Monitor.PulseAll(_schedule); |
| } |
| } |
| |
| /// <summary> |
| /// Clock is idle if it has no future alarms set |
| /// </summary> |
| /// <returns>True if no future alarms are set, otherwise false</returns> |
| public override bool IsIdle() |
| { |
| lock (_schedule) |
| { |
| return _schedule.Count == 0; |
| } |
| } |
| |
| /// <summary> |
| /// Dispose of the clock and all scheduled alarms |
| /// </summary> |
| public override void Dispose() |
| { |
| lock (_schedule) |
| { |
| _schedule.Clear(); |
| _schedule.Add(new StopTime(_timer.CurrentTime)); |
| Monitor.PulseAll(_schedule); |
| _disposed = true; |
| } |
| } |
| |
| /// <summary> |
| /// Register the IObserver for the particular Time event. |
| /// </summary> |
| /// <param name="observer">The handler to register</param> |
| [Obsolete("Will be removed in REEF 0.16")] |
| public void RegisterObserver<U>(IObserver<U> observer) where U : Time |
| { |
| if (_disposed) |
| { |
| return; |
| } |
| |
| _handlers.Subscribe(observer); |
| } |
| |
| /// <summary> |
| /// Start the RuntimeClock. |
| /// Clock will continue to run and handle events until it has been disposed. |
| /// </summary> |
| public override void Run() |
| { |
| SubscribeHandlers(); |
| |
| var runtimeException = Optional<Exception>.Empty(); |
| try |
| { |
| _handlers.OnNext(new RuntimeStart(_timer.CurrentTime)); |
| _handlers.OnNext(new StartTime(_timer.CurrentTime)); |
| |
| while (true) |
| { |
| lock (_schedule) |
| { |
| if (IsIdle()) |
| { |
| _handlers.OnNext(new IdleClock(_timer.CurrentTime)); |
| } |
| |
| // Blocks and releases lock until it receives the next event |
| Time alarm = GetNextEvent(); |
| ProcessEvent(alarm); |
| |
| if (alarm is StopTime) |
| { |
| break; |
| } |
| } |
| } |
| } |
| catch (Exception e) |
| { |
| runtimeException = Optional<Exception>.Of(new SystemException("Caught Exception in clock, failing the Evaluator.", e)); |
| } |
| |
| var runtimeStop = runtimeException.IsPresent() |
| ? new RuntimeStop(_timer.CurrentTime, runtimeException.Value) |
| : new RuntimeStop(_timer.CurrentTime); |
| |
| _handlers.OnNext(runtimeStop); |
| } |
| |
| /// <summary> |
| /// Register the event handlers |
| /// </summary> |
| private void SubscribeHandlers() |
| { |
| // TODO[REEF-1373]: Remove the subscriptions of the _old* handlers |
| Subscribe(_oldStartHandler.Get()); |
| Subscribe(_oldStopHandler.Get()); |
| Subscribe(_oldRuntimeStartHandler.Get()); |
| Subscribe(_oldRuntimeStopHandler.Get()); |
| Subscribe(_oldIdleHandler.Get()); |
| |
| Subscribe(_startHandler.Get()); |
| Subscribe(_stopHandler.Get()); |
| Subscribe(_runtimeStartHandler.Get()); |
| Subscribe(_runtimeStopHandler.Get()); |
| Subscribe(_idleHandler.Get()); |
| } |
| |
| /// <summary> |
| /// Subscribe a set of IObservers for a particular Time event |
| /// </summary> |
| /// <param name="observers">The set of observers to subscribe</param> |
| private void Subscribe<U>(ISet<IObserver<U>> observers) where U : Time |
| { |
| foreach (IObserver<U> observer in observers) |
| { |
| _handlers.Subscribe(observer); |
| } |
| } |
| |
| /// <summary> |
| /// Wait until the first scheduled alarm is ready to be handled |
| /// Assumes that we have a lock on the _schedule SortedSet |
| /// </summary> |
| private Time GetNextEvent() |
| { |
| // Wait for an alarm to be scheduled on the condition variable Count |
| while (_schedule.Count == 0) |
| { |
| Monitor.Wait(_schedule); |
| } |
| |
| // Once the alarm is scheduled, wait for the prescribed amount of time. |
| // If a new alarm is scheduled with a shorter duration, Wait will preempt |
| // and duration will update to reflect the new alarm's timestamp |
| for (long duration = _timer.GetDuration(_schedule.First().TimeStamp); |
| duration > 0; |
| duration = _timer.GetDuration(_schedule.First().TimeStamp)) |
| { |
| Monitor.Wait(_schedule, TimeSpan.FromMilliseconds(duration)); |
| } |
| |
| return _schedule.Dequeue(); |
| } |
| |
| /// <summary> |
| /// Process the next Time event. |
| /// </summary> |
| /// <param name="time">The Time event to handle</param> |
| private void ProcessEvent(Time time) |
| { |
| if (time is Alarm) |
| { |
| Alarm alarm = (Alarm)time; |
| alarm.Handle(); |
| } |
| else |
| { |
| _handlers.OnNext(time); |
| } |
| } |
| } |
| } |