blob: e8ece72344ed6ceb0fe0aef7c051f59ed2ff83d0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NSubstitute;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Events;
using Org.Apache.REEF.Common.Runtime.Evaluator;
using Org.Apache.REEF.Common.Runtime.Evaluator.Context;
using Org.Apache.REEF.Common.Runtime.Evaluator.Utils;
using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Common.Tasks.Events;
using Org.Apache.REEF.Evaluator.Tests.TestUtils;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities;
using Xunit;
namespace Org.Apache.REEF.Evaluator.Tests
{
public sealed class EvaluatorServiceTests
{
[Fact]
[Trait("Priority", "0")]
[Trait("Category", "Unit")]
public void TestServiceInstantiatedAndDisposed()
{
var serviceConfiguration = ServiceConfiguration.ConfigurationModule
.Set(ServiceConfiguration.Services, GenericType<TestService>.Class)
.Build();
var serviceInjector = TangFactory.GetTang().NewInjector(serviceConfiguration);
var contextConfig = GetContextConfiguration();
TestService testService;
using (var contextRuntime = new ContextRuntime(serviceInjector, contextConfig, Optional<ContextRuntime>.Empty()))
{
var servicesFromInjector = serviceInjector.GetNamedInstance<ServicesSet, ISet<object>>();
testService = servicesFromInjector.Single() as TestService;
Assert.NotNull(testService);
if (testService == null)
{
// Not possible
return;
}
var testServiceFromInjector = serviceInjector.GetInstance<TestService>();
Assert.True(ReferenceEquals(testService, testServiceFromInjector));
var contextTestService = contextRuntime.ContextInjector.GetInstance<TestService>();
Assert.True(ReferenceEquals(contextTestService, testServiceFromInjector));
}
Assert.True(testService.Disposed);
}
[Fact]
[Trait("Priority", "0")]
[Trait("Category", "Unit")]
public void TestServiceContextEventHandlersTriggered()
{
var launcher = GetRootContextLauncher(
GetContextConfiguration(), GetServiceConfiguration(), Optional<IConfiguration>.Empty());
IInjector serviceInjector = null;
IInjector contextInjector = null;
using (var rootContext = launcher.GetRootContext())
{
serviceInjector = rootContext.ServiceInjector;
contextInjector = rootContext.ContextInjector;
Assert.NotNull(serviceInjector);
Assert.NotNull(contextInjector);
}
var serviceContextStartHandlers =
serviceInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>();
var contextContextStartHandlers =
contextInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>();
Assert.Equal(1, serviceContextStartHandlers.Count);
Assert.Equal(2, contextContextStartHandlers.Count);
var serviceContextStartHandler = serviceContextStartHandlers.First() as TestServiceEventHandlers;
Assert.True(contextContextStartHandlers.Contains(serviceContextStartHandler));
var serviceContextStopHandlers =
serviceInjector.GetNamedInstance<ContextConfigurationOptions.StopHandlers, ISet<IObserver<IContextStop>>>();
var contextContextStopHandlers =
contextInjector.GetNamedInstance<ContextConfigurationOptions.StopHandlers, ISet<IObserver<IContextStop>>>();
Assert.Equal(1, serviceContextStopHandlers.Count);
Assert.Equal(2, contextContextStopHandlers.Count);
var serviceContextStopHandler = serviceContextStopHandlers.First() as TestServiceEventHandlers;
Assert.True(contextContextStopHandlers.Contains(serviceContextStopHandler));
foreach (var contextStartHandler in contextContextStartHandlers.Select(h => h as ITestContextEventHandler))
{
Assert.NotNull(contextStartHandler);
Assert.Equal(1, contextStartHandler.ContextStartInvoked);
Assert.Equal(1, contextStartHandler.ContextStopInvoked);
}
foreach (var contextStopHandler in contextContextStopHandlers.Select(h => h as ITestContextEventHandler))
{
Assert.NotNull(contextStopHandler);
Assert.Equal(1, contextStopHandler.ContextStartInvoked);
Assert.Equal(1, contextStopHandler.ContextStopInvoked);
}
}
[Fact]
[Trait("Priority", "0")]
[Trait("Category", "Unit")]
public void TestServiceContextEventHandlersTriggeredSuccessiveContexts()
{
var launcher = GetRootContextLauncher(
GetContextConfiguration(), GetServiceConfiguration(), Optional<IConfiguration>.Empty());
IInjector serviceInjector = null;
IInjector firstContextInjector = null;
IInjector secondContextInjector = null;
using (var rootContext = launcher.GetRootContext())
{
serviceInjector = rootContext.ServiceInjector;
firstContextInjector = rootContext.ContextInjector;
using (var childContext = rootContext.SpawnChildContext(GetContextConfiguration()))
{
secondContextInjector = childContext.ContextInjector;
}
Assert.NotNull(serviceInjector);
Assert.NotNull(firstContextInjector);
Assert.NotNull(secondContextInjector);
}
var serviceContextStartHandlers =
serviceInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>();
var firstContextContextStartHandlers =
firstContextInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>();
var secondContextContextStartHandlers =
secondContextInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>();
Assert.Equal(1, serviceContextStartHandlers.Count);
Assert.Equal(2, firstContextContextStartHandlers.Count);
Assert.Equal(2, secondContextContextStartHandlers.Count);
var intersectSet = new HashSet<IObserver<IContextStart>>(serviceContextStartHandlers);
intersectSet.IntersectWith(firstContextContextStartHandlers);
intersectSet.IntersectWith(secondContextContextStartHandlers);
var unionSet = new HashSet<IObserver<IContextStart>>(serviceContextStartHandlers);
unionSet.UnionWith(firstContextContextStartHandlers);
unionSet.UnionWith(secondContextContextStartHandlers);
Assert.Equal(1, intersectSet.Count);
Assert.Equal(3, unionSet.Count);
var serviceContextHandler = serviceContextStartHandlers.Single() as ITestContextEventHandler;
var unionContextHandlerSet = new HashSet<ITestContextEventHandler>(
unionSet.Select(h => h as ITestContextEventHandler).Where(h => h != null));
Assert.Equal(unionSet.Count, unionContextHandlerSet.Count);
Assert.True(unionContextHandlerSet.Contains(serviceContextHandler));
foreach (var handler in unionContextHandlerSet.Where(h => h != null))
{
if (ReferenceEquals(handler, serviceContextHandler))
{
Assert.Equal(2, handler.ContextStartInvoked);
Assert.Equal(2, handler.ContextStopInvoked);
}
else
{
Assert.Equal(1, handler.ContextStartInvoked);
Assert.Equal(1, handler.ContextStopInvoked);
}
}
}
[Fact]
[Trait("Priority", "0")]
[Trait("Category", "Unit")]
public void TestServiceTaskEventHandlersTriggered()
{
RunTasksAndVerifyEventHandlers(1);
}
[Fact]
[Trait("Priority", "0")]
[Trait("Category", "Unit")]
public void TestServiceTaskEventHandlersTriggeredSuccessiveTasks()
{
RunTasksAndVerifyEventHandlers(5);
}
private static void RunTasksAndVerifyEventHandlers(int tasksRun)
{
var launcher = GetRootContextLauncher(
GetContextConfiguration(), GetServiceConfiguration(), Optional<IConfiguration>.Of(GetTaskConfiguration()));
IInjector serviceInjector = null;
using (var rootContext = launcher.GetRootContext())
{
serviceInjector = rootContext.ServiceInjector;
for (var i = 0; i < tasksRun; i++)
{
rootContext.StartTask(launcher.RootTaskConfig.Value).Join();
}
Assert.NotNull(serviceInjector);
}
var serviceTaskStartHandlers =
serviceInjector.GetNamedInstance<TaskConfigurationOptions.StartHandlers, ISet<IObserver<ITaskStart>>>();
Assert.Equal(1, serviceTaskStartHandlers.Count);
var serviceTaskStartHandler = serviceTaskStartHandlers.First() as TestServiceEventHandlers;
var serviceTaskStopHandlers =
serviceInjector.GetNamedInstance<TaskConfigurationOptions.StopHandlers, ISet<IObserver<ITaskStop>>>();
Assert.Equal(1, serviceTaskStopHandlers.Count);
var serviceTaskStopHandler = serviceTaskStopHandlers.First() as TestServiceEventHandlers;
Assert.Equal(serviceTaskStopHandler, serviceTaskStartHandler);
Assert.NotNull(serviceTaskStartHandler);
if (serviceTaskStartHandler == null || serviceTaskStopHandler == null)
{
// Get rid of warning.
throw new Exception();
}
Assert.Equal(tasksRun, serviceTaskStartHandler.TaskStartInvoked);
Assert.Equal(tasksRun, serviceTaskStopHandler.TaskStopInvoked);
}
private static RootContextLauncher GetRootContextLauncher(
IConfiguration contextConfig, IConfiguration serviceConfig, Optional<IConfiguration> taskConfig)
{
var injector = TangFactory.GetTang().NewInjector();
var serializer = injector.GetInstance<AvroConfigurationSerializer>();
var contextConfigStr = serializer.ToString(contextConfig);
var serviceConfigStr = serializer.ToString(serviceConfig);
var taskConfigStr = Optional<string>.Empty();
if (taskConfig.IsPresent())
{
taskConfigStr = Optional<string>.Of(serializer.ToString(taskConfig.Value));
}
var contextLauncherConfigBuilder = TangFactory.GetTang().NewConfigurationBuilder()
.BindNamedParameter<RootContextConfiguration, string>(GenericType<RootContextConfiguration>.Class, contextConfigStr)
.BindNamedParameter<RootServiceConfiguration, string>(GenericType<RootServiceConfiguration>.Class, serviceConfigStr);
if (taskConfigStr.IsPresent())
{
contextLauncherConfigBuilder = contextLauncherConfigBuilder
.BindNamedParameter<InitialTaskConfiguration, string>(GenericType<InitialTaskConfiguration>.Class, taskConfigStr.Value);
}
injector = injector.ForkInjector(contextLauncherConfigBuilder.Build());
var heartbeatManager = Substitute.For<IHeartBeatManager>();
injector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, heartbeatManager);
return injector.GetInstance<RootContextLauncher>();
}
private static IConfiguration GetTaskConfiguration()
{
return TaskConfiguration.ConfigurationModule
.Set(TaskConfiguration.Identifier, "ID")
.Set(TaskConfiguration.Task, GenericType<SimpleTestTask>.Class)
.Set(TaskConfiguration.OnTaskStart, GenericType<TestTaskEventHandlers>.Class)
.Set(TaskConfiguration.OnTaskStop, GenericType<TestTaskEventHandlers>.Class)
.Build();
}
private static IConfiguration GetContextConfiguration()
{
return ContextConfiguration.ConfigurationModule
.Set(ContextConfiguration.Identifier, "ID")
.Set(ContextConfiguration.OnContextStart, GenericType<TestContextEventHandlers>.Class)
.Set(ContextConfiguration.OnContextStop, GenericType<TestContextEventHandlers>.Class)
.Build();
}
private static IConfiguration GetServiceConfiguration()
{
return ServiceConfiguration.ConfigurationModule
.Set(ServiceConfiguration.OnContextStarted, GenericType<TestServiceEventHandlers>.Class)
.Set(ServiceConfiguration.OnContextStop, GenericType<TestServiceEventHandlers>.Class)
.Set(ServiceConfiguration.OnTaskStarted, GenericType<TestServiceEventHandlers>.Class)
.Set(ServiceConfiguration.OnTaskStop, GenericType<TestServiceEventHandlers>.Class)
.Build();
}
private interface ITestContextEventHandler : IObserver<IContextStart>, IObserver<IContextStop>
{
int ContextStartInvoked { get; }
int ContextStopInvoked { get; }
}
private interface ITestTaskEventHandler : IObserver<ITaskStart>, IObserver<ITaskStop>
{
int TaskStartInvoked { get; }
int TaskStopInvoked { get; }
}
private sealed class TestServiceEventHandlers : ITestContextEventHandler, ITestTaskEventHandler
{
public int ContextStartInvoked { get; private set; }
public int ContextStopInvoked { get; private set; }
public int TaskStartInvoked { get; private set; }
public int TaskStopInvoked { get; private set; }
[Inject]
private TestServiceEventHandlers()
{
}
public void OnNext(IContextStart value)
{
ContextStartInvoked++;
}
public void OnNext(IContextStop value)
{
ContextStopInvoked++;
}
public void OnNext(ITaskStart value)
{
TaskStartInvoked++;
}
public void OnNext(ITaskStop value)
{
TaskStopInvoked++;
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
private class TestContextEventHandlers : ITestContextEventHandler
{
public int ContextStartInvoked { get; private set; }
public int ContextStopInvoked { get; private set; }
[Inject]
private TestContextEventHandlers()
{
}
public void OnNext(IContextStart value)
{
ContextStartInvoked++;
}
public void OnNext(IContextStop value)
{
ContextStopInvoked++;
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
private class TestTaskEventHandlers : ITestTaskEventHandler
{
public int TaskStartInvoked { get; private set; }
public int TaskStopInvoked { get; private set; }
[Inject]
private TestTaskEventHandlers()
{
}
public void OnNext(ITaskStart value)
{
TaskStartInvoked++;
}
public void OnNext(ITaskStop value)
{
TaskStopInvoked++;
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
}
}