blob: 546ca588386f72fe547358b393623dc4f15028f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// ReSharper disable UnusedMember.Local
// ReSharper disable UnusedMember.Global
// ReSharper disable UnusedAutoPropertyAccessor.Global
// ReSharper disable MemberCanBePrivate.Global
// ReSharper disable UnusedAutoPropertyAccessor.Local
// ReSharper disable UnassignedField.Global
namespace Apache.Ignite.Core.Tests.Compute
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Compute;
using Apache.Ignite.Core.Resource;
using NUnit.Framework;
/// <summary>
/// Test resource injections in tasks and jobs.
/// </summary>
public class ResourceTaskTest : AbstractTaskTest
{
/// <summary>
/// Constructor.
/// </summary>
public ResourceTaskTest() : base(false) { }
/// <summary>
/// Constructor.
/// </summary>
/// <param name="fork">Fork flag.</param>
protected ResourceTaskTest(bool fork) : base(fork) { }
/// <summary>
/// Test Ignite injection into the task.
/// </summary>
[Test]
public void TestTaskInjection()
{
int res = Grid1.GetCompute().Execute(new InjectionTask(), 0);
Assert.AreEqual(GetServerCount(), res);
}
/// <summary>
/// Test Ignite injection into the task.
/// </summary>
[Test]
public void TestTaskInjectionBinarizable()
{
int res = Grid1.GetCompute().Execute(new InjectionTaskBinarizable(), 0);
Assert.AreEqual(GetServerCount(), res);
}
/// <summary>
/// Test Ignite injection into the closure.
/// </summary>
[Test]
public void TestClosureInjection()
{
var res = Grid1.GetCompute().Broadcast(new InjectionClosure(), 1);
Assert.AreEqual(GetServerCount(), res.Sum());
}
/// <summary>
/// Test Ignite injection into reducer.
/// </summary>
[Test]
public void TestReducerInjection()
{
int res = Grid1.GetCompute().Apply(new InjectionClosure(), new List<int> { 1, 1, 1 }, new InjectionReducer());
Assert.AreEqual(3, res);
}
/// <summary>
/// Test no-result-cache attribute.
/// </summary>
[Test]
public void TestNoResultCache()
{
int res = Grid1.GetCompute().Execute(new NoResultCacheTask(), 0);
Assert.AreEqual(GetServerCount(), res);
}
/// <summary>
/// Injection task.
/// </summary>
public class InjectionTask : Injectee, IComputeTask<object, int, int>
{
/** <inheritDoc /> */
public IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode> subgrid, object arg)
{
CheckInjection();
return subgrid.ToDictionary(x => (IComputeJob<int>) new InjectionJob(), x => x);
}
/** <inheritDoc /> */
public ComputeJobResultPolicy OnResult(IComputeJobResult<int> res, IList<IComputeJobResult<int>> rcvd)
{
return ComputeJobResultPolicy.Wait;
}
/** <inheritDoc /> */
public int Reduce(IList<IComputeJobResult<int>> results)
{
return results.Sum(res => res.Data);
}
}
/// <summary>
/// Injection task.
/// </summary>
private class InjectionTaskBinarizable : Injectee, IComputeTask<object, int, int>
{
/** <inheritDoc /> */
public IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode> subgrid, object arg)
{
CheckInjection();
return subgrid.ToDictionary(x => (IComputeJob<int>) new InjectionJobBinarizable(), x => x);
}
/** <inheritDoc /> */
public ComputeJobResultPolicy OnResult(IComputeJobResult<int> res, IList<IComputeJobResult<int>> rcvd)
{
return ComputeJobResultPolicy.Wait;
}
/** <inheritDoc /> */
public int Reduce(IList<IComputeJobResult<int>> results)
{
return results.Sum(res => res.Data);
}
}
/// <summary>
/// Binarizable job.
/// </summary>
public class InjectionJobBinarizable : InjectionJob, IBinarizable
{
public void WriteBinary(IBinaryWriter writer)
{
// No-op.
}
public void ReadBinary(IBinaryReader reader)
{
// No-op.
}
}
/// <summary>
/// Injection job.
/// </summary>
[Serializable]
public class InjectionJob : Injectee, IComputeJob<int>
{
/// <summary>
///
/// </summary>
public InjectionJob()
{
// No-op.
}
/// <summary>
///
/// </summary>
/// <param name="info"></param>
/// <param name="context"></param>
protected InjectionJob(SerializationInfo info, StreamingContext context) : base(info, context)
{
// No-op.
}
/** <inheritDoc /> */
public int Execute()
{
CheckInjection();
return 1;
}
public void Cancel()
{
// No-op.
}
}
/// <summary>
/// Injection closure.
/// </summary>
[Serializable]
public class InjectionClosure : IComputeFunc<int, int>
{
/** */
[InstanceResource]
private static IIgnite _staticGrid1;
/** */
[InstanceResource]
public static IIgnite StaticGrid2;
/// <summary>
///
/// </summary>
[InstanceResource]
public static IIgnite StaticPropGrid1
{
get { return _staticGrid1; }
set { _staticGrid1 = value; }
}
/// <summary>
///
/// </summary>
[InstanceResource]
private static IIgnite StaticPropGrid2
{
get { return StaticGrid2; }
set { StaticGrid2 = value; }
}
/// <summary>
///
/// </summary>
/// <param name="grid"></param>
[InstanceResource]
public static void StaticMethod1(IIgnite grid)
{
_staticGrid1 = grid;
}
/// <summary>
///
/// </summary>
/// <param name="grid"></param>
[InstanceResource]
private static void StaticMethod2(IIgnite grid)
{
StaticGrid2 = grid;
}
/** */
[InstanceResource]
private readonly IIgnite _grid1 = null;
/** */
[InstanceResource]
public IIgnite Grid2;
/** */
private IIgnite _mthdGrid1;
/** */
private IIgnite _mthdGrid2;
/// <summary>
///
/// </summary>
[InstanceResource]
public IIgnite PropGrid1
{
get;
set;
}
/// <summary>
///
/// </summary>
[InstanceResource]
private IIgnite PropGrid2
{
get;
set;
}
/// <summary>
///
/// </summary>
/// <param name="grid"></param>
[InstanceResource]
public void Method1(IIgnite grid)
{
_mthdGrid1 = grid;
}
/// <summary>
///
/// </summary>
/// <param name="grid"></param>
[InstanceResource]
private void Method2(IIgnite grid)
{
_mthdGrid2 = grid;
}
/// <summary>
/// Check Ignite injections.
/// </summary>
protected void CheckInjection()
{
Assert.IsTrue(_staticGrid1 == null);
Assert.IsTrue(StaticGrid2 == null);
Assert.IsTrue(_grid1 != null);
Assert.IsTrue(Grid2 == _grid1);
Assert.IsTrue(PropGrid1 == _grid1);
Assert.IsTrue(PropGrid2 == _grid1);
Assert.IsTrue(_mthdGrid1 == _grid1);
Assert.IsTrue(_mthdGrid2 == _grid1);
}
/** <inheritDoc /> */
public int Invoke(int arg)
{
CheckInjection();
return arg;
}
}
/// <summary>
/// Injection reducer.
/// </summary>
public class InjectionReducer : Injectee, IComputeReducer<int, int>
{
/** Collected results. */
private readonly ICollection<int> _ress = new List<int>();
/** <inheritDoc /> */
public bool Collect(int res)
{
CheckInjection();
lock (_ress)
{
_ress.Add(res);
}
return true;
}
/** <inheritDoc /> */
public int Reduce()
{
CheckInjection();
lock (_ress)
{
return _ress.Sum();
}
}
}
/// <summary>
/// Injectee.
/// </summary>
[Serializable]
public class Injectee : ISerializable
{
/** */
[InstanceResource]
private static IIgnite _staticGrid1;
/** */
[InstanceResource]
public static IIgnite StaticGrid2;
/// <summary>
///
/// </summary>
[InstanceResource]
public static IIgnite StaticPropGrid1
{
get { return _staticGrid1; }
set { _staticGrid1 = value; }
}
/// <summary>
///
/// </summary>
[InstanceResource]
private static IIgnite StaticPropGrid2
{
get { return StaticGrid2; }
set { StaticGrid2 = value; }
}
/// <summary>
///
/// </summary>
/// <param name="grid"></param>
[InstanceResource]
public static void StaticMethod1(IIgnite grid)
{
_staticGrid1 = grid;
}
/// <summary>
///
/// </summary>
/// <param name="grid"></param>
[InstanceResource]
private static void StaticMethod2(IIgnite grid)
{
StaticGrid2 = grid;
}
/// <summary>
///
/// </summary>
protected Injectee()
{
// No-op.
}
/// <summary>
///
/// </summary>
/// <param name="info"></param>
/// <param name="context"></param>
protected Injectee(SerializationInfo info, StreamingContext context)
{
// No-op.
}
/** */
[InstanceResource]
private readonly IIgnite _grid1 = null;
/** */
[InstanceResource]
public IIgnite Grid2;
/** */
private IIgnite _mthdGrid1;
/** */
private IIgnite _mthdGrid2;
/// <summary>
///
/// </summary>
[InstanceResource]
public IIgnite PropGrid1
{
get;
set;
}
/// <summary>
///
/// </summary>
[InstanceResource]
private IIgnite PropGrid2
{
get;
set;
}
/// <summary>
///
/// </summary>
/// <param name="grid"></param>
[InstanceResource]
public void Method1(IIgnite grid)
{
_mthdGrid1 = grid;
}
/// <summary>
///
/// </summary>
/// <param name="grid"></param>
[InstanceResource]
private void Method2(IIgnite grid)
{
_mthdGrid2 = grid;
}
/// <summary>
/// Check Ignite injections.
/// </summary>
protected void CheckInjection()
{
Assert.IsTrue(_staticGrid1 == null);
Assert.IsTrue(StaticGrid2 == null);
Assert.IsTrue(_grid1 != null);
Assert.IsTrue(Grid2 == _grid1);
Assert.IsTrue(PropGrid1 == _grid1);
Assert.IsTrue(PropGrid2 == _grid1);
Assert.IsTrue(_mthdGrid1 == _grid1);
Assert.IsTrue(_mthdGrid2 == _grid1);
}
/** <inheritDoc /> */
public void GetObjectData(SerializationInfo info, StreamingContext context)
{
// No-op.
}
}
/// <summary>
///
/// </summary>
[ComputeTaskNoResultCache]
public class NoResultCacheTask : IComputeTask<int, int, int>
{
/** Sum. */
private int _sum;
/** <inheritDoc /> */
public IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode> subgrid, int arg)
{
return subgrid.ToDictionary(x => (IComputeJob<int>) new NoResultCacheJob(), x => x);
}
/** <inheritDoc /> */
public ComputeJobResultPolicy OnResult(IComputeJobResult<int> res, IList<IComputeJobResult<int>> rcvd)
{
Assert.IsTrue(rcvd != null);
Assert.IsTrue(rcvd.Count == 0);
_sum += res.Data;
return ComputeJobResultPolicy.Wait;
}
/** <inheritDoc /> */
public int Reduce(IList<IComputeJobResult<int>> results)
{
Assert.IsTrue(results != null);
Assert.IsTrue(results.Count == 0);
return _sum;
}
}
/// <summary>
///
/// </summary>
[Serializable]
public class NoResultCacheJob : IComputeJob<int>
{
/** <inheritDoc /> */
public int Execute()
{
return 1;
}
public void Cancel()
{
// No-op.
}
}
}
}