blob: 599e6d97f0057e1e3c3638b9807c645e430bfe53 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Timers;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.IMRU.OnREEF.Parameters;
namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
/// <summary>
/// JobLifecycleManager orchestrates job cancellation flow.
/// If job cancellation detector is configured in job definition, the manager starts a timer and periodically checks for cancellation signal.
/// if cancellation signal is detected, the manager creates JobCancelled event and propagates the event to all subscribers.
/// the manager is used by IMRU driver to enable job cancellation based on the jobCancellationConfiguration in job definition.
/// </summary>
internal sealed class JobLifeCycleManager :
IDisposable,
IJobLifecycleManager
{
private static readonly Logger Logger = Logger.GetLogger(typeof(JobLifeCycleManager));
private Timer _timer;
private readonly int _timerIntervalSec;
private readonly IJobCancelledDetector _cancellationDetector;
private readonly object _disposeLock = new object();
private readonly List<IObserver<IJobCancelled>> _observers = new List<IObserver<IJobCancelled>>();
[Inject]
private JobLifeCycleManager(
IJobCancelledDetector cancelletionDetector,
[Parameter(typeof(SleepIntervalParameter))] int sleepIntervalSec)
{
_cancellationDetector = cancelletionDetector;
_timerIntervalSec = sleepIntervalSec;
InitTimer();
}
~JobLifeCycleManager()
{
Dispose();
}
private void InitTimer()
{
if (_cancellationDetector == null)
{
Logger.Log(Level.Info, "Cancellation detector is null - no need to start Timer for job lifecycle manager");
return;
}
if (_cancellationDetector is JobCancellationDetectorAlwaysFalse)
{
Logger.Log(Level.Info, "Cancellation detector is default - no need to start Timer for job lifecycle manager.");
return;
}
if (_timerIntervalSec <= 0)
{
Logger.Log(Level.Info, "Timer interval ({0}) is not positive - can't start Timer for job lifecycle manager.", _timerIntervalSec);
return;
}
Logger.Log(Level.Info, "initializing timer to monitor job status. _timer interval: {0}, cancellation detector: {1}", _timerIntervalSec, _cancellationDetector);
// start timer to monitor cancellation signal
_timer = new Timer(_timerIntervalSec * 1000);
_timer.Elapsed += OnTimer;
_timer.AutoReset = true;
}
public IDisposable Subscribe(IObserver<IJobCancelled> observer)
{
Logger.Log(Level.Info, "Adding subscriber: {0}", observer);
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (_observers)
{
if (!_observers.Contains(observer))
{
_observers.Add(observer);
EnsureTimerStarted();
}
}
return new AnonymousDisposable(() => Unsubscribe(observer));
}
public void Dispose()
{
if (_timer != null)
{
lock (_disposeLock)
{
if (_timer != null)
{
_timer.Stop();
_timer.Dispose();
_timer = null;
}
}
}
}
public void OnError(Exception error)
{
}
public void OnCompleted()
{
}
private void Unsubscribe(IObserver<IJobCancelled> observer)
{
Logger.Log(Level.Info, "Removing subscriber: {0}", observer);
lock (_observers)
{
_observers.Remove(observer);
if (!_observers.Any())
{
EnsureTimerStopped();
}
}
}
private void EnsureTimerStarted()
{
Logger.Log(Level.Info, "Ensure Timer STARTED. Current timer enabled state: {0}", GetTimerEnabledState());
// _timer can be null if initialization detected it will not work as expected - corresponding logs created
// or after dispose, in both case ignore the timer start.
if (_timer != null && !_timer.Enabled)
{
_timer.Start();
Logger.Log(Level.Info, "Timer started");
}
}
private string GetTimerEnabledState()
{
return _timer == null ? "timer is null" : _timer.Enabled.ToString();
}
private void EnsureTimerStopped()
{
Logger.Log(Level.Info, "Ensure Timer STOPPED. Current timer enabled state: {0}", GetTimerEnabledState());
if (_timer != null)
{
_timer.Stop();
Logger.Log(Level.Info, "Timer stopped");
}
}
private void OnTimer(object source, ElapsedEventArgs e)
{
lock (_observers)
{
if (!_observers.Any())
{
Logger.Log(Level.Info,
"There are no observers for cancellation event: skipping cancellation detection");
return;
}
}
string cancellationMessage = null;
if (IsJobCancelled(out cancellationMessage))
{
Logger.Log(
Level.Info,
"Detected Job cancellation ({0}): sending JobCancelled event to observers: {1}",
cancellationMessage,
ToCsvString(_observers));
var cancelEvent = new JobCancelled(DateTime.Now, cancellationMessage);
lock (_observers)
{
_observers.ForEach(o => o.OnNext(cancelEvent));
}
}
}
private static string ToCsvString<T>(IEnumerable<T> list)
{
if (list == null)
{
return "null";
}
return string.Join(",", list.Take(10).Select(m => m == null ? "null" : m.ToString()));
}
private bool IsJobCancelled(out string cancellationMessage)
{
var isCancelled = false;
cancellationMessage = null;
try
{
isCancelled = _cancellationDetector != null && _cancellationDetector.IsJobCancelled(out cancellationMessage);
}
catch (Exception ex)
{
Logger.Log(Level.Error, "IsCancelled check failed. Exception:{0}", ex);
isCancelled = false;
}
return isCancelled;
}
private class AnonymousDisposable : IDisposable
{
private Action DisposeAction { get; set; }
public AnonymousDisposable(Action disposeAction)
{
DisposeAction = disposeAction;
}
public void Dispose()
{
DisposeAction();
}
}
}
}