blob: 8df82d1eda6470b056d2d877cf0b2713b74728b8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.OnREEF.Driver;
using Xunit;
namespace Org.Apache.REEF.IMRU.Tests
{
public class JobLifecycleManagerTest
{
[Fact]
[Trait("Description", "Verify that JobCancelled event is sent when cancellation signal is detected.")]
public void JobLifeCycleMangerSendsJobCancelledEvent()
{
string expectedMessage = "cancelled";
var observer = JobLifeCycleMangerEventTest(
detector: new SampleJobCancelledDetector(true, expectedMessage))
.FirstOrDefault();
AssertCancelEvent(observer, true, expectedMessage);
}
[Fact]
[Trait("Description", "Verify that JobCancelled Event can be sent to all subscribers in case of multiply observers.")]
public void JobLifeCycleMangerSendsJobCancelledEventToMultiplyObservers()
{
string expectedMessage = "cancelled";
var observers = JobLifeCycleMangerEventTest(
detector: new SampleJobCancelledDetector(true, expectedMessage));
foreach (var observer in observers)
{
AssertCancelEvent(observer, true, expectedMessage);
}
}
[Fact]
[Trait("Description", "Verify that IsCancelled check is performed with specified period.")]
public void JobLifeCycleMangerChecksDetectorPeriodically()
{
string expectedMessage = "cancelled";
int isCancelledCheckCounter = 0;
var observer = JobLifeCycleMangerEventTest(
detector: new SampleJobCancelledDetector(true, expectedMessage, testAction: () => { isCancelledCheckCounter++; }),
signalCheckPeriodSec: 1,
waitForEventPeriodSec: 6)
.FirstOrDefault();
Assert.True(isCancelledCheckCounter >= 5, "Expected 5+ IsCancelled checks in 6 sec (check interval = 1 sec). Actual check counter: " + isCancelledCheckCounter);
AssertCancelEvent(observer, true, expectedMessage);
}
[Fact]
[Trait("Description", "Verify that JobLifecycle manager does not sent any cancellation events if signal is not generated.")]
public void JobLifeCycleMangerNoSignalDoesNotSendEvent()
{
var observer = JobLifeCycleMangerEventTest(
detector: new SampleJobCancelledDetector(false))
.FirstOrDefault();
AssertCancelEvent(observer, false);
}
[Fact]
[Trait("Description", "Verify that no cancellation event is sent if configured detector is null.")]
public void JobLifeCycleMangerDetectorNullDoesNotSendEvent()
{
var observer = JobLifeCycleMangerEventTest(
detector: null)
.FirstOrDefault();
AssertCancelEvent(observer, false);
}
[Fact]
[Trait("Description", "Verify that cancellation checks are not performed if there are no observers.")]
public void JobLifeCycleMangerNoObserversDoesNotCheckForSignal()
{
int isCancelledCheckCounter = 0;
var observer = JobLifeCycleMangerEventTest(
detector: new SampleJobCancelledDetector(true, "cancelled", testAction: () => { isCancelledCheckCounter++; }),
subscribeObserver: false,
signalCheckPeriodSec: 1,
waitForEventPeriodSec: 6)
.FirstOrDefault();
Assert.True(isCancelledCheckCounter == 0, "Expected no checks for cancellation if there are no subscribers. Actual check counter: " + isCancelledCheckCounter);
AssertCancelEvent(observer, false);
}
[Fact]
[Trait("Description", "Verify that manager stops checking for cancellation signal after all observers unsubscribed.")]
public void JobLifeCycleMangerNoCancellationChecksAfterAllObserversUnsubscribed()
{
int isCancelledCheckCounter = 0;
const int waitForEventPeriodSec = 6;
var observer = JobLifeCycleMangerEventTest(
detector: new SampleJobCancelledDetector(true, "cancelled", testAction: () => { isCancelledCheckCounter++; }),
subscribeObserver: false,
signalCheckPeriodSec: 1,
waitForEventPeriodSec: waitForEventPeriodSec)
.FirstOrDefault();
Assert.True(isCancelledCheckCounter == 0, "Expected no checks for cancellation if there are no subscribers. Actual check counter: " + isCancelledCheckCounter);
// subscribe observer - checks should start incrementing
observer.Subscribe();
Thread.Sleep(waitForEventPeriodSec * 1000);
Assert.True(isCancelledCheckCounter > 0, "Expected checks for cancellation after new subscritpion added. Actual check counter: " + isCancelledCheckCounter);
// unsubscribe and verify that checks for cancellation are not incrementing anymore
observer.UnSubscribe();
var counterAfterUnsubscribe = isCancelledCheckCounter;
Thread.Sleep(waitForEventPeriodSec * 1000);
Assert.True(isCancelledCheckCounter == counterAfterUnsubscribe, "Expected no checks for cancellation after all subscribers unsubscribed. Actual check counter: " + isCancelledCheckCounter + " expected Counter to stay at: " + counterAfterUnsubscribe);
}
private IEnumerable<TestObserver> JobLifeCycleMangerEventTest(
IJobCancelledDetector detector,
bool subscribeObserver = true,
int observerCount = 1,
int signalCheckPeriodSec = 1,
int waitForEventPeriodSec = 2)
{
var manager = Activator.CreateInstance(
typeof(JobLifeCycleManager),
BindingFlags.NonPublic | BindingFlags.Instance,
null,
new object[] { detector, signalCheckPeriodSec },
null,
null) as JobLifeCycleManager;
var observers = Enumerable.Range(1, observerCount)
.Select(_ => new TestObserver(manager, subscribeObserver))
.ToList();
Thread.Sleep(waitForEventPeriodSec * 1000);
return observers;
}
private void AssertCancelEvent(TestObserver observer, bool expectedEvent, string expectedMessage = null)
{
if (expectedEvent)
{
Assert.NotNull(observer.LastEvent);
Assert.Same(expectedMessage, observer.LastEvent.Message);
}
else
{
Assert.Null(observer.LastEvent);
}
}
private IDriverStarted NewStartedEvent()
{
// event is not really used by the driver, so can use null here
return null;
}
/// <summary>
/// Test helper class to provide predefined cancel signal for testing
/// </summary>
private class SampleJobCancelledDetector : IJobCancelledDetector
{
private bool _isCancelledResponse;
private string _cancellationMessage;
private Action _actionOnIsCancelledCall;
internal SampleJobCancelledDetector(bool isCancelledResponse, string expectedMessage = null, Action testAction = null)
{
_isCancelledResponse = isCancelledResponse;
_cancellationMessage = expectedMessage;
_actionOnIsCancelledCall = testAction;
}
public bool IsJobCancelled(out string cancellationMessage)
{
if (_actionOnIsCancelledCall != null)
{
_actionOnIsCancelledCall();
}
cancellationMessage = this._cancellationMessage;
return _isCancelledResponse;
}
}
/// <summary>
/// Test helper class to record JobCancelled events from lifecycle manager
/// </summary>
private class TestObserver : IObserver<IJobCancelled>
{
internal IJobCancelled LastEvent { get; private set; }
internal IObservable<IJobCancelled> source { get; private set; }
internal IDisposable subscription { get; private set; }
internal TestObserver(IObservable<IJobCancelled> eventSource, bool autoSubscribe)
{
source = eventSource;
if (autoSubscribe)
{
Subscribe();
}
}
public void OnNext(IJobCancelled value)
{
LastEvent = value;
}
public void OnError(Exception error)
{
}
public void OnCompleted()
{
}
public void Subscribe()
{
subscription = source.Subscribe(this);
}
public void UnSubscribe()
{
subscription.Dispose();
}
}
}
}