blob: 294ecabd091815c63ad78ec896b1b726157e34c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Tests.Dataload
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Datastream;
using NUnit.Framework;
/// <summary>
/// Data streamer tests.
/// </summary>
public sealed class DataStreamerTest
{
/** Cache name. */
private const string CacheName = "partitioned";
/** Node. */
private IIgnite _grid;
/** Node 2. */
private IIgnite _grid2;
/** Cache. */
private ICache<int, int?> _cache;
/// <summary>
/// Initialization routine.
/// </summary>
[TestFixtureSetUp]
public void InitClient()
{
_grid = Ignition.Start(TestUtils.GetTestConfiguration());
_grid2 = Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration())
{
IgniteInstanceName = "grid1"
});
_cache = _grid.CreateCache<int, int?>(CacheName);
}
/// <summary>
/// Fixture teardown.
/// </summary>
[TestFixtureTearDown]
public void StopGrids()
{
Ignition.StopAll(true);
}
/// <summary>
///
/// </summary>
[SetUp]
public void BeforeTest()
{
Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
for (int i = 0; i < 100; i++)
_cache.Remove(i);
}
[TearDown]
public void AfterTest()
{
TestUtils.AssertHandleRegistryIsEmpty(1000, _grid);
}
/// <summary>
/// Test data streamer property configuration. Ensures that at least no exceptions are thrown.
/// </summary>
[Test]
public void TestPropertyPropagation()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
Assert.AreEqual(CacheName, ldr.CacheName);
Assert.AreEqual(0, ldr.AutoFlushFrequency);
Assert.IsFalse(ldr.AllowOverwrite);
ldr.AllowOverwrite = true;
Assert.IsTrue(ldr.AllowOverwrite);
ldr.AllowOverwrite = false;
Assert.IsFalse(ldr.AllowOverwrite);
Assert.IsFalse(ldr.SkipStore);
ldr.SkipStore = true;
Assert.IsTrue(ldr.SkipStore);
ldr.SkipStore = false;
Assert.IsFalse(ldr.SkipStore);
Assert.AreEqual(DataStreamerDefaults.DefaultPerNodeBufferSize, ldr.PerNodeBufferSize);
ldr.PerNodeBufferSize = 1;
Assert.AreEqual(1, ldr.PerNodeBufferSize);
ldr.PerNodeBufferSize = 2;
Assert.AreEqual(2, ldr.PerNodeBufferSize);
Assert.AreEqual(DataStreamerDefaults.DefaultPerThreadBufferSize, ldr.PerThreadBufferSize);
ldr.PerThreadBufferSize = 1;
Assert.AreEqual(1, ldr.PerThreadBufferSize);
ldr.PerThreadBufferSize = 2;
Assert.AreEqual(2, ldr.PerThreadBufferSize);
Assert.AreEqual(0, ldr.PerNodeParallelOperations);
var ops = DataStreamerDefaults.DefaultParallelOperationsMultiplier *
IgniteConfiguration.DefaultThreadPoolSize;
ldr.PerNodeParallelOperations = ops;
Assert.AreEqual(ops, ldr.PerNodeParallelOperations);
ldr.PerNodeParallelOperations = 2;
Assert.AreEqual(2, ldr.PerNodeParallelOperations);
Assert.AreEqual(DataStreamerDefaults.DefaultTimeout, ldr.Timeout);
ldr.Timeout = TimeSpan.MaxValue;
Assert.AreEqual(TimeSpan.MaxValue, ldr.Timeout);
ldr.Timeout = TimeSpan.FromSeconds(1.5);
Assert.AreEqual(1.5, ldr.Timeout.TotalSeconds);
}
}
/// <summary>
/// Test data add/remove.
/// </summary>
[Test]
public void TestAddRemove()
{
IDataStreamer<int, int> ldr;
using (ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
Assert.IsFalse(ldr.Task.IsCompleted);
ldr.AllowOverwrite = true;
// Additions.
var task = ldr.AddData(1, 1);
ldr.Flush();
Assert.AreEqual(1, _cache.Get(1));
Assert.IsTrue(task.IsCompleted);
Assert.IsFalse(ldr.Task.IsCompleted);
task = ldr.AddData(new KeyValuePair<int, int>(2, 2));
ldr.Flush();
Assert.AreEqual(2, _cache.Get(2));
Assert.IsTrue(task.IsCompleted);
task = ldr.AddData(new [] { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
ldr.Flush();
Assert.AreEqual(3, _cache.Get(3));
Assert.AreEqual(4, _cache.Get(4));
Assert.IsTrue(task.IsCompleted);
// Removal.
task = ldr.RemoveData(1);
ldr.Flush();
Assert.IsFalse(_cache.ContainsKey(1));
Assert.IsTrue(task.IsCompleted);
// Mixed.
ldr.AddData(5, 5);
ldr.RemoveData(2);
ldr.AddData(new KeyValuePair<int, int>(7, 7));
ldr.AddData(6, 6);
ldr.RemoveData(4);
ldr.AddData(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(9, 9), new KeyValuePair<int, int>(10, 10) });
ldr.AddData(new KeyValuePair<int, int>(8, 8));
ldr.RemoveData(3);
ldr.AddData(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(11, 11), new KeyValuePair<int, int>(12, 12) });
ldr.Flush();
for (int i = 2; i < 5; i++)
Assert.IsFalse(_cache.ContainsKey(i));
for (int i = 5; i < 13; i++)
Assert.AreEqual(i, _cache.Get(i));
}
Assert.IsTrue(ldr.Task.IsCompleted);
}
/// <summary>
/// Tests object graphs with loops.
/// </summary>
[Test]
public void TestObjectGraphs()
{
var obj1 = new Container();
var obj2 = new Container();
var obj3 = new Container();
var obj4 = new Container();
obj1.Inner = obj2;
obj2.Inner = obj1;
obj3.Inner = obj1;
obj4.Inner = new Container();
using (var ldr = _grid.GetDataStreamer<int, Container>(CacheName))
{
ldr.AllowOverwrite = true;
ldr.AddData(1, obj1);
ldr.AddData(2, obj2);
ldr.AddData(3, obj3);
ldr.AddData(4, obj4);
}
var cache = _grid.GetCache<int, Container>(CacheName);
var res = cache[1];
Assert.AreEqual(res, res.Inner.Inner);
Assert.IsNotNull(cache[2].Inner);
Assert.IsNotNull(cache[2].Inner.Inner);
Assert.IsNotNull(cache[3].Inner);
Assert.IsNotNull(cache[3].Inner.Inner);
Assert.IsNotNull(cache[4].Inner);
Assert.IsNull(cache[4].Inner.Inner);
}
/// <summary>
/// Test "tryFlush".
/// </summary>
[Test]
public void TestTryFlush()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
var fut = ldr.AddData(1, 1);
ldr.TryFlush();
fut.Wait();
Assert.AreEqual(1, _cache.Get(1));
}
}
/// <summary>
/// Test buffer size adjustments.
/// </summary>
[Test]
public void TestBufferSize()
{
using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
const int timeout = 5000;
var part1 = GetPrimaryPartitionKeys(_grid, 4);
var part2 = GetPrimaryPartitionKeys(_grid2, 4);
var task = ldr.AddData(part1[0], part1[0]);
Thread.Sleep(100);
Assert.IsFalse(task.IsCompleted);
ldr.PerNodeBufferSize = 2;
ldr.PerThreadBufferSize = 1;
ldr.AddData(part2[0], part2[0]);
ldr.AddData(part1[1], part1[1]);
Assert.IsTrue(ldr.AddData(part2[1], part2[1]).Wait(timeout));
Assert.IsTrue(task.Wait(timeout));
Assert.AreEqual(part1[0], _cache.Get(part1[0]));
Assert.AreEqual(part1[1], _cache.Get(part1[1]));
Assert.AreEqual(part2[0], _cache.Get(part2[0]));
Assert.AreEqual(part2[1], _cache.Get(part2[1]));
Assert.IsTrue(ldr.AddData(new[]
{
new KeyValuePair<int, int>(part1[2], part1[2]),
new KeyValuePair<int, int>(part1[3], part1[3]),
new KeyValuePair<int, int>(part2[2], part2[2]),
new KeyValuePair<int, int>(part2[3], part2[3])
}).Wait(timeout));
Assert.AreEqual(part1[2], _cache.Get(part1[2]));
Assert.AreEqual(part1[3], _cache.Get(part1[3]));
Assert.AreEqual(part2[2], _cache.Get(part2[2]));
Assert.AreEqual(part2[3], _cache.Get(part2[3]));
}
}
/// <summary>
/// Gets the primary partition keys.
/// </summary>
private static int[] GetPrimaryPartitionKeys(IIgnite ignite, int count)
{
var affinity = ignite.GetAffinity(CacheName);
var localNode = ignite.GetCluster().GetLocalNode();
var part = affinity.GetPrimaryPartitions(localNode).First();
return Enumerable.Range(0, int.MaxValue)
.Where(k => affinity.GetPartition(k) == part)
.Take(count)
.ToArray();
}
/// <summary>
/// Test close.
/// </summary>
[Test]
public void TestClose()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
var fut = ldr.AddData(1, 1);
ldr.Close(false);
fut.Wait();
Assert.AreEqual(1, _cache.Get(1));
}
}
/// <summary>
/// Test close with cancellation.
/// </summary>
[Test]
public void TestCancel()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
var fut = ldr.AddData(1, 1);
ldr.Close(true);
fut.Wait();
Assert.IsFalse(_cache.ContainsKey(1));
}
}
/// <summary>
/// Tests that streamer gets collected when there are no references to it.
/// </summary>
[Test]
[Ignore("IGNITE-8731")]
public void TestFinalizer()
{
var streamer = _grid.GetDataStreamer<int, int>(CacheName);
var streamerRef = new WeakReference(streamer);
Assert.IsNotNull(streamerRef.Target);
// ReSharper disable once RedundantAssignment
streamer = null;
GC.Collect();
GC.WaitForPendingFinalizers();
Assert.IsNull(streamerRef.Target);
}
/// <summary>
/// Test auto-flush feature.
/// </summary>
[Test]
public void TestAutoFlush()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
// Test auto flush turning on.
var fut = ldr.AddData(1, 1);
Thread.Sleep(100);
Assert.IsFalse(fut.IsCompleted);
ldr.AutoFlushFrequency = 1000;
fut.Wait();
// Test forced flush after frequency change.
fut = ldr.AddData(2, 2);
ldr.AutoFlushFrequency = long.MaxValue;
fut.Wait();
// Test another forced flush after frequency change.
fut = ldr.AddData(3, 3);
ldr.AutoFlushFrequency = 1000;
fut.Wait();
// Test flush before stop.
fut = ldr.AddData(4, 4);
ldr.AutoFlushFrequency = 0;
fut.Wait();
// Test flush after second turn on.
fut = ldr.AddData(5, 5);
ldr.AutoFlushFrequency = 1000;
fut.Wait();
Assert.AreEqual(1, _cache.Get(1));
Assert.AreEqual(2, _cache.Get(2));
Assert.AreEqual(3, _cache.Get(3));
Assert.AreEqual(4, _cache.Get(4));
Assert.AreEqual(5, _cache.Get(5));
}
}
/// <summary>
/// Test multithreaded behavior.
/// </summary>
[Test]
[Category(TestUtils.CategoryIntensive)]
public void TestMultithreaded()
{
int entriesPerThread = 100000;
int threadCnt = 8;
for (int i = 0; i < 5; i++)
{
_cache.Clear();
Assert.AreEqual(0, _cache.GetSize());
Stopwatch watch = new Stopwatch();
watch.Start();
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
ldr.PerNodeBufferSize = 1024;
int ctr = 0;
TestUtils.RunMultiThreaded(() =>
{
int threadIdx = Interlocked.Increment(ref ctr);
int startIdx = (threadIdx - 1) * entriesPerThread;
int endIdx = startIdx + entriesPerThread;
for (int j = startIdx; j < endIdx; j++)
{
// ReSharper disable once AccessToDisposedClosure
ldr.AddData(j, j);
if (j % 100000 == 0)
Console.WriteLine("Put [thread=" + threadIdx + ", cnt=" + j + ']');
}
}, threadCnt);
}
Console.WriteLine("Iteration " + i + ": " + watch.ElapsedMilliseconds);
watch.Reset();
for (int j = 0; j < threadCnt * entriesPerThread; j++)
Assert.AreEqual(j, j);
}
}
/// <summary>
/// Tests custom receiver.
/// </summary>
[Test]
public void TestStreamReceiver()
{
TestStreamReceiver(new StreamReceiverBinarizable());
TestStreamReceiver(new StreamReceiverSerializable());
}
/// <summary>
/// Tests StreamVisitor.
/// </summary>
[Test]
public void TestStreamVisitor()
{
#if !NETCOREAPP // Serializing delegates is not supported on this platform.
TestStreamReceiver(new StreamVisitor<int, int>((c, e) => c.Put(e.Key, e.Value + 1)));
#endif
}
/// <summary>
/// Tests StreamTransformer.
/// </summary>
[Test]
public void TestStreamTransformer()
{
TestStreamReceiver(new StreamTransformer<int, int, int, int>(new EntryProcessorSerializable()));
TestStreamReceiver(new StreamTransformer<int, int, int, int>(new EntryProcessorBinarizable()));
}
[Test]
public void TestStreamTransformerIsInvokedForDuplicateKeys()
{
var cache = _grid.GetOrCreateCache<string, long>("c");
using (var streamer = _grid.GetDataStreamer<string, long>(cache.Name))
{
streamer.AllowOverwrite = true;
streamer.Receiver = new StreamTransformer<string, long, object, object>(new CountingEntryProcessor());
var words = Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
foreach (var word in words)
{
streamer.AddData(word, 1L);
}
}
Assert.AreEqual(3, cache.Get("a"));
Assert.AreEqual(2, cache.Get("b"));
}
/// <summary>
/// Tests specified receiver.
/// </summary>
private void TestStreamReceiver(IStreamReceiver<int, int> receiver)
{
using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
ldr.AllowOverwrite = true;
ldr.Receiver = new StreamReceiverBinarizable();
ldr.Receiver = receiver; // check double assignment
Assert.AreEqual(ldr.Receiver, receiver);
for (var i = 0; i < 100; i++)
ldr.AddData(i, i);
ldr.Flush();
for (var i = 0; i < 100; i++)
Assert.AreEqual(i + 1, _cache.Get(i));
}
}
/// <summary>
/// Tests the stream receiver in keepBinary mode.
/// </summary>
[Test]
public void TestStreamReceiverKeepBinary()
{
// ReSharper disable once LocalVariableHidesMember
var cache = _grid.GetCache<int, BinarizableEntry>(CacheName);
using (var ldr0 = _grid.GetDataStreamer<int, int>(CacheName))
using (var ldr = ldr0.WithKeepBinary<int, IBinaryObject>())
{
ldr.Receiver = new StreamReceiverKeepBinary();
ldr.AllowOverwrite = true;
for (var i = 0; i < 100; i++)
ldr.AddData(i, _grid.GetBinary().ToBinary<IBinaryObject>(new BinarizableEntry {Val = i}));
ldr.Flush();
for (var i = 0; i < 100; i++)
Assert.AreEqual(i + 1, cache.Get(i).Val);
// Repeating WithKeepBinary call: valid args.
Assert.AreSame(ldr, ldr.WithKeepBinary<int, IBinaryObject>());
// Invalid type args.
var ex = Assert.Throws<InvalidOperationException>(() => ldr.WithKeepBinary<string, IBinaryObject>());
Assert.AreEqual(
"Can't change type of binary streamer. WithKeepBinary has been called on an instance of " +
"binary streamer with incompatible generic arguments.", ex.Message);
}
}
/// <summary>
/// Test binarizable receiver.
/// </summary>
private class StreamReceiverBinarizable : IStreamReceiver<int, int>
{
/** <inheritdoc /> */
public void Receive(ICache<int, int> cache, ICollection<ICacheEntry<int, int>> entries)
{
cache.PutAll(entries.ToDictionary(x => x.Key, x => x.Value + 1));
}
}
/// <summary>
/// Test binary receiver.
/// </summary>
[Serializable]
private class StreamReceiverKeepBinary : IStreamReceiver<int, IBinaryObject>
{
/** <inheritdoc /> */
public void Receive(ICache<int, IBinaryObject> cache, ICollection<ICacheEntry<int, IBinaryObject>> entries)
{
var binary = cache.Ignite.GetBinary();
cache.PutAll(entries.ToDictionary(x => x.Key, x =>
binary.ToBinary<IBinaryObject>(new BinarizableEntry
{
Val = x.Value.Deserialize<BinarizableEntry>().Val + 1
})));
}
}
/// <summary>
/// Test serializable receiver.
/// </summary>
[Serializable]
private class StreamReceiverSerializable : IStreamReceiver<int, int>
{
/** <inheritdoc /> */
public void Receive(ICache<int, int> cache, ICollection<ICacheEntry<int, int>> entries)
{
cache.PutAll(entries.ToDictionary(x => x.Key, x => x.Value + 1));
}
}
/// <summary>
/// Test entry processor.
/// </summary>
[Serializable]
private class EntryProcessorSerializable : ICacheEntryProcessor<int, int, int, int>
{
/** <inheritdoc /> */
public int Process(IMutableCacheEntry<int, int> entry, int arg)
{
entry.Value = entry.Key + 1;
return 0;
}
}
/// <summary>
/// Test entry processor.
/// </summary>
private class EntryProcessorBinarizable : ICacheEntryProcessor<int, int, int, int>, IBinarizable
{
/** <inheritdoc /> */
public int Process(IMutableCacheEntry<int, int> entry, int arg)
{
entry.Value = entry.Key + 1;
return 0;
}
/** <inheritdoc /> */
public void WriteBinary(IBinaryWriter writer)
{
// No-op.
}
/** <inheritdoc /> */
public void ReadBinary(IBinaryReader reader)
{
// No-op.
}
}
/// <summary>
/// Binarizable entry.
/// </summary>
private class BinarizableEntry
{
public int Val { get; set; }
}
/// <summary>
/// Container class.
/// </summary>
private class Container
{
public Container Inner;
}
private class CountingEntryProcessor : ICacheEntryProcessor<string, long, object, object>
{
public object Process(IMutableCacheEntry<string, long> e, object arg)
{
e.Value++;
return null;
}
}
}
}