blob: 697891a6e8cdbd01d14fe270ce19009b4464a28f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Tests.Compute
{
using System;
using System.Collections.Generic;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Compute;
using NUnit.Framework;
/// <summary>
/// Tests class.
/// </summary>
[Category(TestUtils.CategoryIntensive)]
public class ComputeMultithreadedTest : AbstractTaskTest
{
/** */
private const int Duration = 5;
/** */
private const int ThreadNum = 4;
/** */
private static IList<Action<ICompute>> _actions;
/// <summary>
/// Constructor.
/// </summary>
public ComputeMultithreadedTest() : base(false) { }
/// <summary>
/// Set-up routine.
/// </summary>
[SetUp]
public void SetUp()
{
_actions = new List<Action<ICompute>>
{
compute => { compute.Apply(new My1ArgClosure(), "zzzz"); },
compute => { compute.Broadcast(new My1ArgClosure(), "zzzz"); },
compute => { compute.Broadcast(new MyNoArgClosure("zzzz")); },
compute => { compute.Call(new MyNoArgClosure("zzzz")); },
compute => { compute.Execute(new StringLengthEmptyTask(), "zzzz"); },
compute =>
{
compute.Apply(new My1ArgClosure(), new List<string> {"zzzz", "a", "b"}, new MyReducer());
}
};
}
/// <summary>
/// Tear-down routine.
/// </summary>
[TearDown]
public void TearDown()
{
_actions.Clear();
}
/// <summary>
/// Tests all task types in parallel.
/// </summary>
[Test]
public void TestAllTaskTypeAtSameTime()
{
Assert.AreEqual(_actions.Count, 6);
var compute = Grid1.GetCompute();
TestUtils.RunMultiThreaded(() =>
{
_actions[TestUtils.Random.Next(_actions.Count)](compute);
}, ThreadNum, Duration * 2);
}
/// <summary>
///
/// </summary>
[Test]
public void TestSingleTaskType0()
{
Assert.AreEqual(_actions.Count, 6);
TestUtils.RunMultiThreaded(() => _actions[0](Grid1.GetCompute()), ThreadNum, Duration);
}
/// <summary>
///
/// </summary>
[Test]
public void TestSingleTaskType1()
{
Assert.AreEqual(_actions.Count, 6);
TestUtils.RunMultiThreaded(() => _actions[1](Grid1.GetCompute()), ThreadNum, Duration);
}
/// <summary>
///
/// </summary>
[Test]
public void TestSingleTaskType2()
{
Assert.AreEqual(_actions.Count, 6);
TestUtils.RunMultiThreaded(() => _actions[2](Grid1.GetCompute()), ThreadNum, Duration);
}
/// <summary>
///
/// </summary>
[Test]
public void TestSingleTaskType3()
{
Assert.AreEqual(_actions.Count, 6);
TestUtils.RunMultiThreaded(() => _actions[3](Grid1.GetCompute()), ThreadNum, Duration);
}
/// <summary>
///
/// </summary>
[Test]
public void TestSingleTaskType4()
{
Assert.AreEqual(_actions.Count, 6);
TestUtils.RunMultiThreaded(() => _actions[ThreadNum](Grid1.GetCompute()), ThreadNum, Duration);
}
/// <summary>
///
/// </summary>
[Test]
public void TestSingleTaskType5()
{
Assert.AreEqual(_actions.Count, 6);
TestUtils.RunMultiThreaded(() => _actions[5](Grid1.GetCompute()), ThreadNum, Duration);
}
}
/// <summary>
/// Test class.
/// </summary>
[Serializable]
public class My1ArgClosure : IComputeFunc<string, int>
{
/** <inheritDoc /> */
public int Invoke(string s)
{
return s.Length;
}
}
/// <summary>
/// Test class.
/// </summary>
[Serializable]
public class MyNoArgClosure : IComputeFunc<int>
{
/** */
private readonly string _s;
/// <summary>
///
/// </summary>
/// <param name="s"></param>
public MyNoArgClosure(string s)
{
_s = s;
}
/** <inheritDoc /> */
public int Invoke()
{
return _s.Length;
}
}
/// <summary>
///
/// </summary>
public class StringLengthEmptyTask : IComputeTask<string, int, int>
{
/** <inheritDoc /> */
public IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode> subgrid, string arg)
{
var res = new Dictionary<IComputeJob<int>, IClusterNode>();
var job = new StringLengthEmptyJob(arg);
IClusterNode node = subgrid[TestUtils.Random.Next(subgrid.Count)];
res.Add(job, node);
return res;
}
/** <inheritDoc /> */
public ComputeJobResultPolicy OnResult(IComputeJobResult<int> res, IList<IComputeJobResult<int>> rcvd)
{
return ComputeJobResultPolicy.Wait;
}
/** <inheritDoc /> */
public int Reduce(IList<IComputeJobResult<int>> results)
{
return results.Count == 0 ? 0 : results[0].Data;
}
}
/// <summary>
/// Test class.
/// </summary>
[Serializable]
public class StringLengthEmptyJob: IComputeJob<int>
{
/** */
private string _s;
/// <summary>
///
/// </summary>
/// <param name="s"></param>
public StringLengthEmptyJob(string s)
{
_s = s;
}
/** <inheritDoc /> */
public int Execute()
{
return _s.Length;
}
/** <inheritDoc /> */
public void Cancel()
{
// No-op
}
}
public class MyReducer : IComputeReducer<int, int>
{
/** */
private int _res;
public bool Collect(int res)
{
_res += res;
return true;
}
public int Reduce()
{
return _res;
}
}
}