blob: 3c8881f61c346e11cff71ac6c02b54ffb4d5fe9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Tests.Dataload
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Datastream;
using NUnit.Framework;
/// <summary>
/// Data streamer tests.
/// </summary>
public sealed class DataStreamerTest
{
/** Cache name. */
private const string CacheName = "partitioned";
/** Node. */
private IIgnite _grid;
/** Node 2. */
private IIgnite _grid2;
/** Cache. */
private ICache<int, int?> _cache;
/// <summary>
/// Initialization routine.
/// </summary>
[TestFixtureSetUp]
public void InitClient()
{
_grid = Ignition.Start(TestUtils.GetTestConfiguration());
_grid2 = Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration())
{
IgniteInstanceName = "grid1"
});
_cache = _grid.CreateCache<int, int?>(CacheName);
}
/// <summary>
/// Fixture teardown.
/// </summary>
[TestFixtureTearDown]
public void StopGrids()
{
Ignition.StopAll(true);
}
/// <summary>
///
/// </summary>
[SetUp]
public void BeforeTest()
{
Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
for (int i = 0; i < 100; i++)
_cache.Remove(i);
}
[TearDown]
public void AfterTest()
{
TestUtils.AssertHandleRegistryIsEmpty(1000, _grid);
}
/// <summary>
/// Test data streamer property configuration. Ensures that at least no exceptions are thrown.
/// </summary>
[Test]
public void TestPropertyPropagation()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
Assert.AreEqual(CacheName, ldr.CacheName);
Assert.AreEqual(TimeSpan.Zero, ldr.AutoFlushInterval);
ldr.AutoFlushInterval = TimeSpan.FromMinutes(5);
Assert.AreEqual(5, ldr.AutoFlushInterval.TotalMinutes);
#pragma warning disable 618 // Type or member is obsolete
Assert.AreEqual(5 * 60 * 1000, ldr.AutoFlushFrequency);
ldr.AutoFlushFrequency = 9000;
Assert.AreEqual(9000, ldr.AutoFlushFrequency);
Assert.AreEqual(9, ldr.AutoFlushInterval.TotalSeconds);
#pragma warning restore 618 // Type or member is obsolete
Assert.IsFalse(ldr.AllowOverwrite);
ldr.AllowOverwrite = true;
Assert.IsTrue(ldr.AllowOverwrite);
ldr.AllowOverwrite = false;
Assert.IsFalse(ldr.AllowOverwrite);
Assert.IsFalse(ldr.SkipStore);
ldr.SkipStore = true;
Assert.IsTrue(ldr.SkipStore);
ldr.SkipStore = false;
Assert.IsFalse(ldr.SkipStore);
Assert.AreEqual(DataStreamerDefaults.DefaultPerNodeBufferSize, ldr.PerNodeBufferSize);
ldr.PerNodeBufferSize = 1;
Assert.AreEqual(1, ldr.PerNodeBufferSize);
ldr.PerNodeBufferSize = 2;
Assert.AreEqual(2, ldr.PerNodeBufferSize);
Assert.AreEqual(DataStreamerDefaults.DefaultPerThreadBufferSize, ldr.PerThreadBufferSize);
ldr.PerThreadBufferSize = 1;
Assert.AreEqual(1, ldr.PerThreadBufferSize);
ldr.PerThreadBufferSize = 2;
Assert.AreEqual(2, ldr.PerThreadBufferSize);
Assert.AreEqual(0, ldr.PerNodeParallelOperations);
var ops = DataStreamerDefaults.DefaultParallelOperationsMultiplier *
IgniteConfiguration.DefaultThreadPoolSize;
ldr.PerNodeParallelOperations = ops;
Assert.AreEqual(ops, ldr.PerNodeParallelOperations);
ldr.PerNodeParallelOperations = 2;
Assert.AreEqual(2, ldr.PerNodeParallelOperations);
Assert.AreEqual(DataStreamerDefaults.DefaultTimeout, ldr.Timeout);
ldr.Timeout = TimeSpan.MaxValue;
Assert.AreEqual(TimeSpan.MaxValue, ldr.Timeout);
ldr.Timeout = TimeSpan.FromSeconds(1.5);
Assert.AreEqual(1.5, ldr.Timeout.TotalSeconds);
}
}
/// <summary>
/// Tests removal without <see cref="IDataStreamer{TK,TV}.AllowOverwrite"/>.
/// </summary>
[Test]
public void TestRemoveNoOverwrite()
{
_cache.Put(1, 1);
using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
ldr.Remove(1);
}
Assert.IsTrue(_cache.ContainsKey(1));
}
/// <summary>
/// Test data add/remove.
/// </summary>
[Test]
public void TestAddRemove()
{
IDataStreamer<int, int> ldr;
using (ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
Assert.IsFalse(ldr.Task.IsCompleted);
ldr.AllowOverwrite = true;
// Additions.
var task = ldr.GetCurrentBatchTask();
ldr.Add(1, 1);
ldr.Flush();
Assert.AreEqual(1, _cache.Get(1));
Assert.IsTrue(task.IsCompleted);
Assert.IsFalse(ldr.Task.IsCompleted);
task = ldr.GetCurrentBatchTask();
ldr.Add(new KeyValuePair<int, int>(2, 2));
ldr.Flush();
Assert.AreEqual(2, _cache.Get(2));
Assert.IsTrue(task.IsCompleted);
task = ldr.GetCurrentBatchTask();
ldr.Add(new [] { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
ldr.Flush();
Assert.AreEqual(3, _cache.Get(3));
Assert.AreEqual(4, _cache.Get(4));
Assert.IsTrue(task.IsCompleted);
// Removal.
task = ldr.GetCurrentBatchTask();
ldr.Remove(1);
ldr.Flush();
Assert.IsFalse(_cache.ContainsKey(1));
Assert.IsTrue(task.IsCompleted);
// Mixed.
ldr.Add(5, 5);
ldr.Remove(2);
ldr.Add(new KeyValuePair<int, int>(7, 7));
ldr.Add(6, 6);
ldr.Remove(4);
ldr.Add(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(9, 9), new KeyValuePair<int, int>(10, 10) });
ldr.Add(new KeyValuePair<int, int>(8, 8));
ldr.Remove(3);
ldr.Add(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(11, 11), new KeyValuePair<int, int>(12, 12) });
ldr.Flush();
for (int i = 2; i < 5; i++)
Assert.IsFalse(_cache.ContainsKey(i));
for (int i = 5; i < 13; i++)
Assert.AreEqual(i, _cache.Get(i));
}
Assert.IsTrue(ldr.Task.Wait(5000));
}
/// <summary>
/// Test data add/remove.
/// </summary>
[Test]
public void TestAddRemoveObsolete()
{
#pragma warning disable 618 // Type or member is obsolete
IDataStreamer<int, int> ldr;
using (ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
Assert.IsFalse(ldr.Task.IsCompleted);
ldr.AllowOverwrite = true;
// Additions.
var task = ldr.AddData(1, 1);
ldr.Flush();
Assert.AreEqual(1, _cache.Get(1));
Assert.IsTrue(task.IsCompleted);
Assert.IsFalse(ldr.Task.IsCompleted);
task = ldr.AddData(new KeyValuePair<int, int>(2, 2));
ldr.Flush();
Assert.AreEqual(2, _cache.Get(2));
Assert.IsTrue(task.IsCompleted);
task = ldr.AddData(new [] { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) });
ldr.Flush();
Assert.AreEqual(3, _cache.Get(3));
Assert.AreEqual(4, _cache.Get(4));
Assert.IsTrue(task.IsCompleted);
// Removal.
task = ldr.RemoveData(1);
ldr.Flush();
Assert.IsFalse(_cache.ContainsKey(1));
Assert.IsTrue(task.IsCompleted);
// Mixed.
ldr.AddData(5, 5);
ldr.RemoveData(2);
ldr.AddData(new KeyValuePair<int, int>(7, 7));
ldr.AddData(6, 6);
ldr.RemoveData(4);
ldr.AddData(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(9, 9), new KeyValuePair<int, int>(10, 10) });
ldr.AddData(new KeyValuePair<int, int>(8, 8));
ldr.RemoveData(3);
ldr.AddData(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(11, 11), new KeyValuePair<int, int>(12, 12) });
ldr.Flush();
for (int i = 2; i < 5; i++)
Assert.IsFalse(_cache.ContainsKey(i));
for (int i = 5; i < 13; i++)
Assert.AreEqual(i, _cache.Get(i));
}
Assert.IsTrue(ldr.Task.Wait(5000));
#pragma warning restore 618 // Type or member is obsolete
}
/// <summary>
/// Tests object graphs with loops.
/// </summary>
[Test]
public void TestObjectGraphs()
{
var obj1 = new Container();
var obj2 = new Container();
var obj3 = new Container();
var obj4 = new Container();
obj1.Inner = obj2;
obj2.Inner = obj1;
obj3.Inner = obj1;
obj4.Inner = new Container();
using (var ldr = _grid.GetDataStreamer<int, Container>(CacheName))
{
ldr.AllowOverwrite = true;
ldr.Add(1, obj1);
ldr.Add(2, obj2);
ldr.Add(3, obj3);
ldr.Add(4, obj4);
}
var cache = _grid.GetCache<int, Container>(CacheName);
var res = cache[1];
Assert.AreEqual(res, res.Inner.Inner);
Assert.IsNotNull(cache[2].Inner);
Assert.IsNotNull(cache[2].Inner.Inner);
Assert.IsNotNull(cache[3].Inner);
Assert.IsNotNull(cache[3].Inner.Inner);
Assert.IsNotNull(cache[4].Inner);
Assert.IsNull(cache[4].Inner.Inner);
}
/// <summary>
/// Test "tryFlush".
/// </summary>
[Test]
public void TestTryFlushObsolete()
{
#pragma warning disable 618 // Type or member is obsolete
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
var fut = ldr.AddData(1, 1);
ldr.TryFlush();
fut.Wait();
Assert.AreEqual(1, _cache.Get(1));
}
#pragma warning restore 618 // Type or member is obsolete
}
/// <summary>
/// Test FlushAsync.
/// </summary>
[Test]
public void TestFlushAsync()
{
using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
ldr.Add(1, 1);
ldr.FlushAsync().Wait();
Assert.AreEqual(1, _cache.Get(1));
}
}
/// <summary>
/// Test buffer size adjustments.
/// </summary>
[Test]
public void TestBufferSize()
{
using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
const int timeout = 5000;
var part1 = GetPrimaryPartitionKeys(_grid, 4);
var part2 = GetPrimaryPartitionKeys(_grid2, 4);
ldr.Add(part1[0], part1[0]);
var task = ldr.GetCurrentBatchTask();
Thread.Sleep(100);
Assert.IsFalse(task.IsCompleted);
ldr.PerNodeBufferSize = 2;
ldr.PerThreadBufferSize = 1;
ldr.Add(part2[0], part2[0]);
ldr.Add(part1[1], part1[1]);
ldr.Add(part2[1], part2[1]);
Assert.IsTrue(task.Wait(timeout));
Assert.AreEqual(part1[0], _cache.Get(part1[0]));
Assert.AreEqual(part1[1], _cache.Get(part1[1]));
Assert.AreEqual(part2[0], _cache.Get(part2[0]));
Assert.AreEqual(part2[1], _cache.Get(part2[1]));
var task2 = ldr.GetCurrentBatchTask();
ldr.Add(new[]
{
new KeyValuePair<int, int>(part1[2], part1[2]),
new KeyValuePair<int, int>(part1[3], part1[3]),
new KeyValuePair<int, int>(part2[2], part2[2]),
new KeyValuePair<int, int>(part2[3], part2[3])
});
Assert.IsTrue(task2.Wait(timeout));
Assert.AreEqual(part1[2], _cache.Get(part1[2]));
Assert.AreEqual(part1[3], _cache.Get(part1[3]));
Assert.AreEqual(part2[2], _cache.Get(part2[2]));
Assert.AreEqual(part2[3], _cache.Get(part2[3]));
}
}
/// <summary>
/// Gets the primary partition keys.
/// </summary>
private static int[] GetPrimaryPartitionKeys(IIgnite ignite, int count)
{
var affinity = ignite.GetAffinity(CacheName);
var localNode = ignite.GetCluster().GetLocalNode();
var part = affinity.GetPrimaryPartitions(localNode).First();
return Enumerable.Range(0, int.MaxValue)
.Where(k => affinity.GetPartition(k) == part)
.Take(count)
.ToArray();
}
/// <summary>
/// Test close.
/// </summary>
[Test]
public void TestClose()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
var fut = ldr.GetCurrentBatchTask();
ldr.Add(1, 1);
ldr.Close(false);
Assert.IsTrue(fut.Wait(5000));
Assert.AreEqual(1, _cache.Get(1));
}
}
/// <summary>
/// Test close with cancellation.
/// </summary>
[Test]
public void TestCancel()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
var fut = ldr.GetCurrentBatchTask();
ldr.Add(1, 1);
ldr.Close(true);
Assert.IsTrue(fut.Wait(5000));
Assert.IsFalse(_cache.ContainsKey(1));
}
}
/// <summary>
/// Tests that streamer gets collected when there are no references to it.
/// </summary>
[Test]
public void TestFinalizer()
{
// Create streamer reference in a different thread to defeat Debug mode quirks.
var streamerRef = Task.Factory.StartNew
(() => new WeakReference(_grid.GetDataStreamer<int, int>(CacheName))).Result;
GC.Collect();
GC.WaitForPendingFinalizers();
Assert.IsNull(streamerRef.Target);
}
/// <summary>
/// Test auto-flush feature.
/// </summary>
[Test]
public void TestAutoFlushObsolete()
{
#pragma warning disable 618 // Type or member is obsolete
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
// Test auto flush turning on.
var fut = ldr.AddData(1, 1);
Thread.Sleep(100);
Assert.IsFalse(fut.IsCompleted);
ldr.AutoFlushFrequency = 1000;
fut.Wait();
// Test forced flush after frequency change.
fut = ldr.AddData(2, 2);
ldr.AutoFlushFrequency = long.MaxValue;
fut.Wait();
// Test another forced flush after frequency change.
fut = ldr.AddData(3, 3);
ldr.AutoFlushFrequency = 1000;
fut.Wait();
// Test flush before stop.
fut = ldr.AddData(4, 4);
ldr.AutoFlushFrequency = 0;
fut.Wait();
// Test flush after second turn on.
fut = ldr.AddData(5, 5);
ldr.AutoFlushFrequency = 1000;
fut.Wait();
Assert.AreEqual(1, _cache.Get(1));
Assert.AreEqual(2, _cache.Get(2));
Assert.AreEqual(3, _cache.Get(3));
Assert.AreEqual(4, _cache.Get(4));
Assert.AreEqual(5, _cache.Get(5));
}
#pragma warning restore 618 // Type or member is obsolete
}
/// <summary>
/// Test auto-flush feature.
/// </summary>
[Test]
public void TestAutoFlush()
{
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
// Test auto flush turning on.
var fut = ldr.GetCurrentBatchTask();
ldr.Add(1, 1);
Thread.Sleep(100);
Assert.IsFalse(fut.IsCompleted);
ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
fut.Wait();
// Test forced flush after frequency change.
fut = ldr.GetCurrentBatchTask();
ldr.Add(2, 2);
ldr.AutoFlushInterval = TimeSpan.MaxValue;
fut.Wait();
// Test another forced flush after frequency change.
fut = ldr.GetCurrentBatchTask();
ldr.Add(3, 3);
ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
fut.Wait();
// Test flush before stop.
fut = ldr.GetCurrentBatchTask();
ldr.Add(4, 4);
ldr.AutoFlushInterval = TimeSpan.Zero;
fut.Wait();
// Test flush after second turn on.
fut = ldr.GetCurrentBatchTask();
ldr.Add(5, 5);
ldr.AutoFlushInterval = TimeSpan.FromSeconds(1);
fut.Wait();
Assert.AreEqual(1, _cache.Get(1));
Assert.AreEqual(2, _cache.Get(2));
Assert.AreEqual(3, _cache.Get(3));
Assert.AreEqual(4, _cache.Get(4));
Assert.AreEqual(5, _cache.Get(5));
}
}
/// <summary>
/// Test multithreaded behavior.
/// </summary>
[Test]
[Category(TestUtils.CategoryIntensive)]
public void TestMultithreaded()
{
int entriesPerThread = 100000;
int threadCnt = 8;
for (int i = 0; i < 5; i++)
{
_cache.Clear();
Assert.AreEqual(0, _cache.GetSize());
Stopwatch watch = new Stopwatch();
watch.Start();
using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
ldr.PerNodeBufferSize = 1024;
int ctr = 0;
TestUtils.RunMultiThreaded(() =>
{
int threadIdx = Interlocked.Increment(ref ctr);
int startIdx = (threadIdx - 1) * entriesPerThread;
int endIdx = startIdx + entriesPerThread;
for (int j = startIdx; j < endIdx; j++)
{
// ReSharper disable once AccessToDisposedClosure
ldr.Add(j, j);
if (j % 100000 == 0)
Console.WriteLine("Put [thread=" + threadIdx + ", cnt=" + j + ']');
}
}, threadCnt);
}
Console.WriteLine("Iteration " + i + ": " + watch.ElapsedMilliseconds);
watch.Reset();
for (int j = 0; j < threadCnt * entriesPerThread; j++)
Assert.AreEqual(j, j);
}
}
/// <summary>
/// Tests custom receiver.
/// </summary>
[Test]
public void TestStreamReceiver()
{
TestStreamReceiver(new StreamReceiverBinarizable());
TestStreamReceiver(new StreamReceiverSerializable());
}
/// <summary>
/// Tests StreamVisitor.
/// </summary>
[Test]
public void TestStreamVisitor()
{
#if !NETCOREAPP // Serializing delegates is not supported on this platform.
TestStreamReceiver(new StreamVisitor<int, int>((c, e) => c.Put(e.Key, e.Value + 1)));
#endif
}
/// <summary>
/// Tests StreamTransformer.
/// </summary>
[Test]
public void TestStreamTransformer()
{
TestStreamReceiver(new StreamTransformer<int, int, int, int>(new EntryProcessorSerializable()));
TestStreamReceiver(new StreamTransformer<int, int, int, int>(new EntryProcessorBinarizable()));
}
[Test]
public void TestStreamTransformerIsInvokedForDuplicateKeys()
{
var cache = _grid.GetOrCreateCache<string, long>("c");
using (var streamer = _grid.GetDataStreamer<string, long>(cache.Name))
{
streamer.AllowOverwrite = true;
streamer.Receiver = new StreamTransformer<string, long, object, object>(new CountingEntryProcessor());
var words = Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
foreach (var word in words)
{
streamer.Add(word, 1L);
}
}
Assert.AreEqual(3, cache.Get("a"));
Assert.AreEqual(2, cache.Get("b"));
}
/// <summary>
/// Tests specified receiver.
/// </summary>
private void TestStreamReceiver(IStreamReceiver<int, int> receiver)
{
using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
ldr.AllowOverwrite = true;
ldr.Receiver = new StreamReceiverBinarizable();
ldr.Receiver = receiver; // check double assignment
Assert.AreEqual(ldr.Receiver, receiver);
for (var i = 0; i < 100; i++)
ldr.Add(i, i);
ldr.Flush();
for (var i = 0; i < 100; i++)
Assert.AreEqual(i + 1, _cache.Get(i));
}
}
/// <summary>
/// Tests the stream receiver in keepBinary mode.
/// </summary>
[Test]
public void TestStreamReceiverKeepBinary()
{
// ReSharper disable once LocalVariableHidesMember
var cache = _grid.GetCache<int, BinarizableEntry>(CacheName);
using (var ldr0 = _grid.GetDataStreamer<int, int>(CacheName))
using (var ldr = ldr0.WithKeepBinary<int, IBinaryObject>())
{
ldr.Receiver = new StreamReceiverKeepBinary();
ldr.AllowOverwrite = true;
for (var i = 0; i < 100; i++)
ldr.Add(i, _grid.GetBinary().ToBinary<IBinaryObject>(new BinarizableEntry {Val = i}));
ldr.Flush();
for (var i = 0; i < 100; i++)
Assert.AreEqual(i + 1, cache.Get(i).Val);
// Repeating WithKeepBinary call: valid args.
Assert.AreSame(ldr, ldr.WithKeepBinary<int, IBinaryObject>());
// Invalid type args.
var ex = Assert.Throws<InvalidOperationException>(() => ldr.WithKeepBinary<string, IBinaryObject>());
Assert.AreEqual(
"Can't change type of binary streamer. WithKeepBinary has been called on an instance of " +
"binary streamer with incompatible generic arguments.", ex.Message);
}
}
/// <summary>
/// Streamer test with destroyed cache.
/// </summary>
[Test]
public void TestDestroyCache()
{
var cache = _grid.CreateCache<int, int>(TestUtils.TestName);
var streamer = _grid.GetDataStreamer<int, int>(cache.Name);
streamer.Add(1, 2);
streamer.FlushAsync().Wait();
_grid.DestroyCache(cache.Name);
streamer.Add(2, 3);
var ex = Assert.Throws<AggregateException>(() => streamer.Flush()).GetBaseException();
Assert.IsNotNull(ex);
Assert.AreEqual("class org.apache.ignite.IgniteCheckedException: DataStreamer data loading failed.",
ex.Message);
Assert.Throws<CacheException>(() => streamer.Close(true));
}
/// <summary>
/// Streamer test with destroyed cache.
/// </summary>
[Test]
public void TestDestroyCacheObsolete()
{
#pragma warning disable 618 // Type or member is obsolete
var cache = _grid.CreateCache<int, int>(TestUtils.TestName);
var streamer = _grid.GetDataStreamer<int, int>(cache.Name);
var task = streamer.AddData(1, 2);
streamer.Flush();
task.Wait();
_grid.DestroyCache(cache.Name);
streamer.AddData(2, 3);
var ex = Assert.Throws<AggregateException>(() => streamer.Flush()).GetBaseException();
Assert.IsNotNull(ex);
Assert.AreEqual("class org.apache.ignite.IgniteCheckedException: DataStreamer data loading failed.",
ex.Message);
Assert.Throws<CacheException>(() => streamer.Close(true));
#pragma warning restore 618 // Type or member is obsolete
}
/// <summary>
/// Tests that streaming binary objects with a thin client results in those objects being
/// available through SQL in the cache's table.
/// </summary>
[Test]
public void TestBinaryStreamerCreatesSqlRecord()
{
var cacheCfg = new CacheConfiguration
{
Name = "TestBinaryStreamerCreatesSqlRecord",
SqlSchema = "persons",
QueryEntities = new[]
{
new QueryEntity
{
ValueTypeName = "Person",
Fields = new List<QueryField>
{
new QueryField
{
Name = "Name",
FieldType = typeof(string),
},
new QueryField
{
Name = "Age",
FieldType = typeof(int)
}
}
}
}
};
var cacheClientBinary = _grid.GetOrCreateCache<int, IBinaryObject>(cacheCfg)
.WithKeepBinary<int, IBinaryObject>();
// Prepare a binary object.
var jane = _grid.GetBinary().GetBuilder("Person")
.SetStringField("Name", "Jane")
.SetIntField("Age", 43)
.Build();
const int key = 1;
// Stream the binary object to the server.
using (var streamer = _grid.GetDataStreamer<int, IBinaryObject>(cacheCfg.Name))
{
streamer.Add(key, jane);
streamer.Flush();
}
// Check that SQL works.
var query = new SqlFieldsQuery("SELECT Name, Age FROM \"PERSONS\".PERSON");
var fullResultAfterClientStreamer = cacheClientBinary.Query(query).GetAll();
Assert.IsNotNull(fullResultAfterClientStreamer);
Assert.AreEqual(1, fullResultAfterClientStreamer.Count);
Assert.AreEqual("Jane", fullResultAfterClientStreamer[0][0]);
Assert.AreEqual(43, fullResultAfterClientStreamer[0][1]);
}
#if NETCOREAPP
/// <summary>
/// Tests async streamer usage.
/// Using async cache and streamer operations within the streamer means that we end up on different threads.
/// Streamer is thread-safe and is expected to handle this well.
/// </summary>
[Test]
public async Task TestStreamerAsyncAwait()
{
using (var ldr = _grid.GetDataStreamer<int, int>(CacheName))
{
ldr.AllowOverwrite = true;
ldr.Add(Enumerable.Range(1, 500).ToDictionary(x => x, x => -x));
Assert.IsFalse(await _cache.ContainsKeysAsync(new[] {1, 2}));
var flushTask = ldr.FlushAsync();
Assert.IsFalse(flushTask.IsCompleted);
await flushTask;
Assert.AreEqual(-1, await _cache.GetAsync(1));
Assert.AreEqual(-2, await _cache.GetAsync(2));
// Remove.
var batchTask = ldr.GetCurrentBatchTask();
Assert.IsFalse(batchTask.IsCompleted);
Assert.IsFalse(batchTask.IsFaulted);
ldr.Remove(1);
var flushTask2 = ldr.FlushAsync();
Assert.AreSame(batchTask, flushTask2);
await flushTask2;
Assert.IsTrue(batchTask.IsCompleted);
Assert.IsFalse(await _cache.ContainsKeyAsync(1));
// Empty buffer flush is allowed.
await ldr.FlushAsync();
await ldr.FlushAsync();
}
}
#endif
/// <summary>
/// Test binarizable receiver.
/// </summary>
private class StreamReceiverBinarizable : IStreamReceiver<int, int>
{
/** <inheritdoc /> */
public void Receive(ICache<int, int> cache, ICollection<ICacheEntry<int, int>> entries)
{
cache.PutAll(entries.ToDictionary(x => x.Key, x => x.Value + 1));
}
}
/// <summary>
/// Test binary receiver.
/// </summary>
[Serializable]
private class StreamReceiverKeepBinary : IStreamReceiver<int, IBinaryObject>
{
/** <inheritdoc /> */
public void Receive(ICache<int, IBinaryObject> cache, ICollection<ICacheEntry<int, IBinaryObject>> entries)
{
var binary = cache.Ignite.GetBinary();
cache.PutAll(entries.ToDictionary(x => x.Key, x =>
binary.ToBinary<IBinaryObject>(new BinarizableEntry
{
Val = x.Value.Deserialize<BinarizableEntry>().Val + 1
})));
}
}
/// <summary>
/// Test serializable receiver.
/// </summary>
[Serializable]
private class StreamReceiverSerializable : IStreamReceiver<int, int>
{
/** <inheritdoc /> */
public void Receive(ICache<int, int> cache, ICollection<ICacheEntry<int, int>> entries)
{
cache.PutAll(entries.ToDictionary(x => x.Key, x => x.Value + 1));
}
}
/// <summary>
/// Test entry processor.
/// </summary>
[Serializable]
private class EntryProcessorSerializable : ICacheEntryProcessor<int, int, int, int>
{
/** <inheritdoc /> */
public int Process(IMutableCacheEntry<int, int> entry, int arg)
{
entry.Value = entry.Key + 1;
return 0;
}
}
/// <summary>
/// Test entry processor.
/// </summary>
private class EntryProcessorBinarizable : ICacheEntryProcessor<int, int, int, int>, IBinarizable
{
/** <inheritdoc /> */
public int Process(IMutableCacheEntry<int, int> entry, int arg)
{
entry.Value = entry.Key + 1;
return 0;
}
/** <inheritdoc /> */
public void WriteBinary(IBinaryWriter writer)
{
// No-op.
}
/** <inheritdoc /> */
public void ReadBinary(IBinaryReader reader)
{
// No-op.
}
}
/// <summary>
/// Binarizable entry.
/// </summary>
private class BinarizableEntry
{
public int Val { get; set; }
}
/// <summary>
/// Container class.
/// </summary>
private class Container
{
public Container Inner;
}
private class CountingEntryProcessor : ICacheEntryProcessor<string, long, object, object>
{
public object Process(IMutableCacheEntry<string, long> e, object arg)
{
e.Value++;
return null;
}
}
}
}