blob: bfc91c128d9d17f1edaf543c512987e02588fc75 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using Org.Apache.REEF.Common.Events;
using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Examples.AllHandlers;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
using Xunit;
namespace Org.Apache.REEF.Tests.Functional.Bridge
{
[Collection("FunctionalTests")]
public sealed class TestContextStack : ReefFunctionalTest
{
private const string ContextOneId = "Context1";
private const string ContextTwoId = "Context2";
private const string TaskValidationMessage = "TaskValidationMessage";
private const string ClosedContextValidationMessage = "ClosedContextValidationMessage";
private static readonly Logger Logger = Logger.GetLogger(typeof(TestContextStack));
/// <summary>
/// Does a simple test of whether a context can be submitted on top of another context.
/// </summary>
[Fact]
public void TestContextStackingOnLocalRuntime()
{
string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
TestRun(DriverConfigurations(GenericType<ActiveContextSubmitContextHandler>.Class),
typeof(ContextStackHandlers), 1, "testContextStack", "local", testFolder);
ValidateSuccessForLocalRuntime(2, testFolder: testFolder);
ValidateMessageSuccessfullyLoggedForDriver(TaskValidationMessage, testFolder);
ValidateMessageSuccessfullyLoggedForDriver(ClosedContextValidationMessage, testFolder);
CleanUp(testFolder);
}
/// <summary>
/// Does a simple test of whether a context can be submitted on top of another context
/// using SubmitContextAndService.
/// </summary>
[Fact]
public void TestContextStackingWithServiceOnLocalRuntime()
{
string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
TestRun(DriverConfigurations(GenericType<ActiveContextSubmitContextAndServiceHandler>.Class),
typeof(ContextStackHandlers), 1, "testContextAndServiceStack", "local", testFolder);
ValidateSuccessForLocalRuntime(2, testFolder: testFolder);
ValidateMessageSuccessfullyLoggedForDriver(TaskValidationMessage, testFolder);
ValidateMessageSuccessfullyLoggedForDriver(ClosedContextValidationMessage, testFolder);
CleanUp(testFolder);
}
public IConfiguration DriverConfigurations<T>(GenericType<T> activeContextHandlerType) where T : IObserver<IActiveContext>
{
return TangFactory.GetTang().NewConfigurationBuilder(
DriverConfiguration.ConfigurationModule
.Set(DriverConfiguration.OnDriverStarted, GenericType<ContextStackHandlers>.Class)
.Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ContextStackHandlers>.Class)
.Set(DriverConfiguration.OnContextActive, activeContextHandlerType)
.Set(DriverConfiguration.OnTaskMessage, GenericType<HelloTaskMessageHandler>.Class)
.Set(DriverConfiguration.OnTaskCompleted, GenericType<ContextStackHandlers>.Class)
.Set(DriverConfiguration.OnContextClosed, GenericType<ContextStackHandlers>.Class)
.Build())
.Build();
}
/// <summary>
/// ActiveContext Handler that stacks 2 contexts and submits a Task on the second context.
/// </summary>
private sealed class ActiveContextSubmitContextHandler : IObserver<IActiveContext>
{
[Inject]
private ActiveContextSubmitContextHandler()
{
}
public void OnNext(IActiveContext value)
{
Logger.Log(Level.Verbose, "ContextId: " + value.Id);
switch (value.Id)
{
case ContextOneId:
var contextConfig =
Common.Context.ContextConfiguration.ConfigurationModule.Set(
Common.Context.ContextConfiguration.Identifier, ContextTwoId)
.Build();
var stackingContextConfig =
TangFactory.GetTang()
.NewConfigurationBuilder()
.BindImplementation(GenericType<IInjectableInterface>.Class,
GenericType<InjectableInterfaceImpl>.Class)
.Build();
Assert.False(value.ParentId.IsPresent());
value.SubmitContext(Configurations.Merge(stackingContextConfig, contextConfig));
break;
case ContextTwoId:
Assert.True(value.ParentId.IsPresent());
Assert.Equal(value.ParentId.Value, ContextOneId);
value.SubmitTask(
TaskConfiguration.ConfigurationModule.Set(TaskConfiguration.Identifier, "contextStackTestTask")
.Set(TaskConfiguration.Task, GenericType<TestContextStackTask>.Class)
.Build());
break;
default:
throw new Exception("Unexpected ContextId: " + value.Id);
}
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
/// <summary>
/// Context Start Handler that invokes Start on the injected TestService.
/// </summary>
private sealed class TestContextStackContextStartHandler : IObserver<IContextStart>
{
private readonly TestService _service;
[Inject]
private TestContextStackContextStartHandler(TestService service)
{
_service = service;
}
public void OnNext(IContextStart value)
{
_service.Start();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
/// <summary>
/// ActiveContext Handler that stacks 2 contexts.
/// Primarily used to test out the functionality of SubmitContextAndService.
/// The ActiveContext Handler starts the TestService with a ContextStartHandler on the second Context.
/// </summary>
private sealed class ActiveContextSubmitContextAndServiceHandler : IObserver<IActiveContext>
{
[Inject]
private ActiveContextSubmitContextAndServiceHandler()
{
}
public void OnNext(IActiveContext value)
{
Logger.Log(Level.Verbose, "ContextId: " + value.Id);
switch (value.Id)
{
case ContextOneId:
var contextConfig =
Common.Context.ContextConfiguration.ConfigurationModule
.Set(Common.Context.ContextConfiguration.Identifier, ContextTwoId)
.Set(Common.Context.ContextConfiguration.OnContextStart, GenericType<TestContextStackContextStartHandler>.Class)
.Build();
var stackingContextConfig =
TangFactory.GetTang()
.NewConfigurationBuilder()
.BindImplementation(GenericType<IInjectableInterface>.Class,
GenericType<InjectableInterfaceImpl>.Class)
.Build();
Assert.False(value.ParentId.IsPresent());
var stackingContextServiceConfig =
ServiceConfiguration.ConfigurationModule
.Set(ServiceConfiguration.Services, GenericType<TestService>.Class)
.Build();
value.SubmitContextAndService(
Configurations.Merge(stackingContextConfig, contextConfig), stackingContextServiceConfig);
break;
case ContextTwoId:
Assert.True(value.ParentId.IsPresent());
Assert.Equal(value.ParentId.Value, ContextOneId);
value.SubmitTask(
TaskConfiguration.ConfigurationModule.Set(TaskConfiguration.Identifier, "contextServiceStackTestTask")
.Set(TaskConfiguration.Task, GenericType<TestContextAndServiceStackTask>.Class)
.Build());
break;
default:
throw new Exception("Unexpected ContextId: " + value.Id);
}
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
/// <summary>
/// A simple Service class.
/// </summary>
private sealed class TestService
{
[Inject]
private TestService()
{
Started = false;
}
public void Start()
{
Started = true;
}
/// <summary>
/// Returns whether the Start function has been called or not.
/// </summary>
public bool Started { get; private set; }
}
/// <summary>
/// Basic handlers used to verify that Contexts are indeed stacked.
/// </summary>
private sealed class ContextStackHandlers :
IObserver<IDriverStarted>,
IObserver<IAllocatedEvaluator>,
IObserver<ICompletedTask>,
IObserver<IClosedContext>
{
private readonly IEvaluatorRequestor _requestor;
[Inject]
private ContextStackHandlers(IEvaluatorRequestor evaluatorRequestor)
{
_requestor = evaluatorRequestor;
}
public void OnNext(IDriverStarted value)
{
_requestor.Submit(_requestor.NewBuilder().Build());
}
public void OnNext(IAllocatedEvaluator value)
{
value.SubmitContext(Common.Context.ContextConfiguration.ConfigurationModule
.Set(Common.Context.ContextConfiguration.Identifier, ContextOneId)
.Build());
}
public void OnNext(ICompletedTask value)
{
Logger.Log(Level.Info, TaskValidationMessage);
value.ActiveContext.Dispose();
}
public void OnNext(IClosedContext value)
{
Logger.Log(Level.Info, ClosedContextValidationMessage);
Assert.Equal(value.Id, ContextTwoId);
Assert.True(value.ParentId.IsPresent());
Assert.Equal(value.ParentId.Value, ContextOneId);
Assert.Equal(value.ParentContext.Id, ContextOneId);
value.ParentContext.Dispose();
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
/// <summary>
/// A Task to ensure that an object configured in the second context configuration
/// is properly injected.
/// </summary>
private sealed class TestContextStackTask : ITask
{
[Inject]
private TestContextStackTask(IInjectableInterface injectableInterface)
{
Assert.NotNull(injectableInterface);
Assert.True(injectableInterface is InjectableInterfaceImpl);
}
public void Dispose()
{
}
public byte[] Call(byte[] memento)
{
return null;
}
}
/// <summary>
/// A Task to ensure that an object configured in the second context configuration
/// is properly injected.
/// </summary>
private sealed class TestContextAndServiceStackTask : ITask
{
[Inject]
private TestContextAndServiceStackTask(IInjectableInterface injectableInterface, TestService service)
{
Assert.NotNull(injectableInterface);
Assert.True(injectableInterface is InjectableInterfaceImpl);
Assert.NotNull(service);
Assert.True(service.Started);
}
public void Dispose()
{
}
public byte[] Call(byte[] memento)
{
return null;
}
}
/// <summary>
/// Empty interface to check whether Context configurations are
/// set correctly or not on context stacking.
/// </summary>
private interface IInjectableInterface
{
}
/// <summary>
/// An implementation of <see cref="IInjectableInterface"/>.
/// </summary>
private sealed class InjectableInterfaceImpl : IInjectableInterface
{
[Inject]
private InjectableInterfaceImpl()
{
}
}
}
}