blob: 653479d79424e41598955abc941c11ed17e06499 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Tests.Compute
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Compute;
using NUnit.Framework;
/// <summary>
/// Cancellation tests.
/// </summary>
public class CancellationTest : SpringTestBase
{
/** */
private const int MillisecondsTimeout = 50;
public CancellationTest()
: base("Config/Compute/compute-grid1.xml", "Config/Compute/compute-grid2.xml")
{
// No-op.
}
[Test]
public void TestTask()
{
TestTask((c, t) => c.ExecuteAsync(new Task(), t));
}
[Test]
public void TestTaskWithArg()
{
TestTask((c, t) => c.ExecuteAsync(new Task(), 1, t));
}
[Test]
public void TestTaskByType()
{
TestTask((c, t) => c.ExecuteAsync<int, IList<IComputeJobResult<int>>>(typeof(Task), t));
}
[Test]
public void TestTaskByTypeWithArg()
{
TestTask((c, t) => c.ExecuteAsync<object, int, IList<IComputeJobResult<int>>>(typeof(Task), 1, t));
}
[Test]
public void TestJavaTask()
{
using (var cts = new CancellationTokenSource())
{
var task = Compute.ExecuteJavaTaskAsync<object>(ComputeApiTest.BroadcastTask, null, cts.Token);
Assert.IsFalse(task.IsCanceled);
cts.Cancel();
Assert.IsTrue(task.IsCanceled);
// Pass cancelled token
Assert.IsTrue(
Compute.ExecuteJavaTaskAsync<object>(ComputeApiTest.BroadcastTask, null, cts.Token).IsCanceled);
}
}
[Test]
public void TestClosures()
{
TestClosure((c, t) => c.BroadcastAsync(new ComputeAction(), t));
TestClosure((c, t) => c.BroadcastAsync(new ComputeFunc(), t));
TestClosure((c, t) => c.BroadcastAsync(new ComputeBiFunc(), 10, t));
TestClosure((c, t) => c.AffinityRunAsync("default", 0, new ComputeAction(), t));
TestClosure((c, t) => c.RunAsync(new ComputeAction(), t));
TestClosure((c, t) => c.RunAsync(Enumerable.Range(1, 10).Select(x => new ComputeAction()), t));
TestClosure((c, t) => c.CallAsync(new ComputeFunc(), t));
TestClosure((c, t) => c.CallAsync(Enumerable.Range(1, 10).Select(x => new ComputeFunc()), t));
TestClosure((c, t) => c.CallAsync(Enumerable.Range(1, 10).Select(x => new ComputeFunc()),
new ComputeReducer(), t));
TestClosure((c, t) => c.AffinityCallAsync("default", 0, new ComputeFunc(), t));
TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), 10, t));
TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), Enumerable.Range(1, 100), t));
TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), Enumerable.Range(1, 100), new ComputeReducer(), t));
}
private void TestTask(Func<ICompute, CancellationToken, System.Threading.Tasks.Task> runner)
{
Job.Cancelled = false;
Job.StartEvent.Reset();
Job.CancelEvent.Reset();
using (var cts = new CancellationTokenSource())
{
var task = runner(Compute, cts.Token);
Assert.IsFalse(task.IsCanceled);
Job.StartEvent.Wait();
cts.Cancel();
TestUtils.WaitForTrueCondition(() => task.IsCanceled);
// Pass cancelled token
Assert.IsTrue(runner(Compute, cts.Token).IsCanceled);
}
Assert.IsTrue(TestUtils.WaitForCondition(() => Job.Cancelled, 5000));
}
private void TestClosure(Func<ICompute, CancellationToken, System.Threading.Tasks.Task> runner, int delay = 0)
{
using (var cts = new CancellationTokenSource())
{
var task = runner(Compute, cts.Token);
Thread.Sleep(delay);
Assert.IsFalse(task.IsCanceled);
cts.Cancel();
TestUtils.WaitForTrueCondition(() => task.IsCanceled);
// Pass cancelled token
Assert.IsTrue(runner(Compute, cts.Token).IsCanceled);
}
}
private class Task : IComputeTask<int, IList<IComputeJobResult<int>>>
{
public IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode> subgrid, object arg)
{
return subgrid.Where(x => !x.IsLocal).Take(1).ToDictionary(x => (IComputeJob<int>)new Job(), x => x);
}
public ComputeJobResultPolicy OnResult(IComputeJobResult<int> res, IList<IComputeJobResult<int>> rcvd)
{
return ComputeJobResultPolicy.Wait;
}
public IList<IComputeJobResult<int>> Reduce(IList<IComputeJobResult<int>> results)
{
Assert.Fail("Reduce should not be called on a cancelled task.");
return results;
}
}
[Serializable]
private class Job : IComputeJob<int>
{
public static readonly ManualResetEventSlim StartEvent = new ManualResetEventSlim(false);
public static readonly ManualResetEventSlim CancelEvent = new ManualResetEventSlim(false);
public static volatile bool Cancelled;
public int Execute()
{
StartEvent.Set();
CancelEvent.Wait();
return 1;
}
public void Cancel()
{
CancelEvent.Set();
Cancelled = true;
}
}
[Serializable]
private class ComputeBiFunc : IComputeFunc<int, int>
{
public int Invoke(int arg)
{
Thread.Sleep(MillisecondsTimeout);
return arg;
}
}
private class ComputeReducer : IComputeReducer<int, int>
{
public bool Collect(int res)
{
return true;
}
public int Reduce()
{
return 0;
}
}
}
}