blob: b6e1b79ebf54d335aa5cfba1a3faa17c74eccb31 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Tests.Table;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Ignite.Table;
using Internal.Proto;
using NUnit.Framework;
/// <summary>
/// Tests for <see cref="IDataStreamerTarget{T}"/>.
/// <para />
/// See DataStreamer partition awareness tests in <see cref="PartitionAwarenessTests"/>.
/// </summary>
public class DataStreamerTests : IgniteTestsBase
{
private const int Count = 100;
private const int UpdatedKey = Count / 2;
private const int DeletedKey = Count + 1;
private static int _unknownKey = 333000;
[SetUp]
public async Task PrepareData()
{
await TupleView.UpsertAsync(null, GetTuple(UpdatedKey, "update me"));
await TupleView.UpsertAsync(null, GetTuple(DeletedKey, "delete me"));
}
[TearDown]
public async Task DeleteAll() => await Client.Sql.ExecuteAsync(null, $"DELETE FROM {TableName}");
[Test]
public async Task TestBasicStreamingRecordBinaryView()
{
await TupleView.StreamDataAsync(GetData(), DataStreamerOptions.Default with { PageSize = 10 });
await CheckData();
static async IAsyncEnumerable<DataStreamerItem<IIgniteTuple>> GetData()
{
for (int i = 0; i < Count; i++)
{
yield return DataStreamerItem.Create(GetTuple(i, "t" + i));
}
await Task.Yield();
yield return DataStreamerItem.Create(GetTuple(DeletedKey), DataStreamerOperationType.Remove);
}
}
[Test]
public async Task TestBasicStreamingRecordView()
{
var options = DataStreamerOptions.Default with { PageSize = 5 };
var data = Enumerable.Range(0, Count)
.Select(x => DataStreamerItem.Create(GetPoco(x, "t" + x)))
.Concat(new[] { DataStreamerItem.Create(GetPoco(DeletedKey), DataStreamerOperationType.Remove) })
.ToList();
await Table.GetRecordView<Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
await CheckData();
}
[Test]
public async Task TestBasicStreamingKeyValueBinaryView()
{
var options = DataStreamerOptions.Default with { PageSize = 10_000 };
var data = Enumerable.Range(0, Count)
.Select(x => DataStreamerItem.Create(KeyValuePair.Create(GetTuple(x), GetTuple("t" + x))))
.Concat(new[] { DataStreamerItem.Create(KeyValuePair.Create(GetTuple(DeletedKey), default(IIgniteTuple)!), DataStreamerOperationType.Remove) })
.ToList();
await Table.KeyValueBinaryView.StreamDataAsync(data.ToAsyncEnumerable(), options);
await CheckData();
}
[Test]
public async Task TestBasicStreamingKeyValueView()
{
var options = DataStreamerOptions.Default with { PageSize = 1 };
var data = Enumerable.Range(0, Count)
.Select(x => DataStreamerItem.Create(KeyValuePair.Create((long)x, GetPoco(x, "t" + x))))
.Concat(new[] { DataStreamerItem.Create(KeyValuePair.Create((long)DeletedKey, default(Poco)!), DataStreamerOperationType.Remove) })
.ToList();
await Table.GetKeyValueView<long, Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
await CheckData();
}
[Test]
public async Task TestAutoFlushFrequency([Values(true, false)] bool enabled)
{
using var cts = new CancellationTokenSource();
_ = TupleView.StreamDataAsync(
GetTuplesWithDelay(cts.Token),
new()
{
AutoFlushFrequency = enabled
? TimeSpan.FromMilliseconds(50)
: TimeSpan.MaxValue
});
if (enabled)
{
TestUtils.WaitForCondition(() => TupleView.ContainsKeyAsync(null, GetTuple(0)).GetAwaiter().GetResult(), 3000);
}
else
{
await Task.Delay(300);
Assert.IsFalse(await TupleView.ContainsKeyAsync(null, GetTuple(0)));
}
Assert.IsFalse(await TupleView.ContainsKeyAsync(null, GetTuple(1)));
cts.Cancel();
}
[Test]
public async Task TestCancellation()
{
using var cts = new CancellationTokenSource();
var streamTask = TupleView.StreamDataAsync(GetTuplesWithDelay(), cancellationToken: cts.Token);
cts.Cancel();
Assert.CatchAsync<OperationCanceledException>(async () => await streamTask);
Assert.IsFalse(
await TupleView.ContainsKeyAsync(null, GetTuple(0)),
"No data was streamed - cancelled before any batches were full.");
}
[Test]
public void TestOptionsValidation()
{
AssertException(DataStreamerOptions.Default with { PageSize = -10 }, "PageSize should be positive.");
AssertException(DataStreamerOptions.Default with { RetryLimit = -1 }, "RetryLimit should be non-negative.");
AssertException(
DataStreamerOptions.Default with { AutoFlushFrequency = TimeSpan.FromDays(-1) },
"AutoFlushFrequency should be positive.");
void AssertException(DataStreamerOptions options, string message)
{
var ex = Assert.ThrowsAsync<ArgumentException>(
async () => await Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(), options));
StringAssert.Contains(message, ex?.Message);
}
}
[Test]
public async Task TestRetryLimitExhausted()
{
using var server = new FakeServer(
shouldDropConnection: ctx => ctx is { OpCode: ClientOp.StreamerBatchSend, RequestCount: > 7 });
using var client = await server.ConnectClientAsync();
var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
var ex = Assert.ThrowsAsync<IgniteClientConnectionException>(
async () => await table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(10_000)));
StringAssert.StartsWith("Operation StreamerBatchSend failed after 16 retries", ex!.Message);
}
[Test]
public async Task TestManyItemsWithDisconnectAndRetry()
{
const int count = 100_000;
int upsertIdx = 0;
using var server = new FakeServer(
shouldDropConnection: ctx => ctx.OpCode == ClientOp.StreamerBatchSend && Interlocked.Increment(ref upsertIdx) % 2 == 1);
// Streamer has it's own retry policy, so we can disable retries on the client.
using var client = await server.ConnectClientAsync(new IgniteClientConfiguration
{
RetryPolicy = new RetryNonePolicy()
});
var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
await table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(count));
Assert.AreEqual(count, server.StreamerRowCount);
Assert.That(server.DroppedConnectionCount, Is.GreaterThanOrEqualTo(count / DataStreamerOptions.Default.PageSize));
}
[Test]
public async Task TestAddUpdateRemoveMixed(
[Values(1, 2, 100)] int pageSize,
[Values(true, false)] bool existingMinKey)
{
var minKey = existingMinKey ? UpdatedKey : Interlocked.Add(ref _unknownKey, 10);
await Table.GetRecordView<Poco>().StreamDataAsync(
GetData(),
DataStreamerOptions.Default with { PageSize = pageSize });
IList<Option<Poco>> res = await PocoView.GetAllAsync(null, Enumerable.Range(minKey, 4).Select(x => GetPoco(x)));
Assert.AreEqual(4, res.Count);
Assert.IsFalse(res[0].HasValue, "Deleted key should not exist: " + res[0]);
Assert.IsTrue(res[1].HasValue);
Assert.AreEqual("created2", res[1].Value.Val);
Assert.IsTrue(res[2].HasValue);
Assert.AreEqual("updated", res[2].Value.Val);
Assert.IsTrue(res[3].HasValue);
Assert.AreEqual("created", res[3].Value.Val);
async IAsyncEnumerable<DataStreamerItem<Poco>> GetData()
{
await Task.Yield();
yield return DataStreamerItem.Create(GetPoco(minKey, "created"));
yield return DataStreamerItem.Create(GetPoco(minKey, "updated"));
yield return DataStreamerItem.Create(GetPoco(minKey, "deleted"), DataStreamerOperationType.Remove);
yield return DataStreamerItem.Create(GetPoco(minKey + 1, "created"));
yield return DataStreamerItem.Create(GetPoco(minKey + 1, "updated"));
yield return DataStreamerItem.Create(GetPoco(minKey + 1, "deleted"), DataStreamerOperationType.Remove);
yield return DataStreamerItem.Create(GetPoco(minKey + 1, "created2"));
yield return DataStreamerItem.Create(GetPoco(minKey + 2, "created"));
yield return DataStreamerItem.Create(GetPoco(minKey + 2, "updated"));
yield return DataStreamerItem.Create(GetPoco(minKey + 3, "created"));
}
}
private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int count)
{
for (var i = 0; i < count; i++)
{
yield return new IgniteTuple { ["ID"] = i };
await Task.Yield();
}
}
private static async IAsyncEnumerable<IIgniteTuple> GetTuplesWithDelay([EnumeratorCancellation] CancellationToken ct = default)
{
for (var i = 0; i < 3; i++)
{
yield return GetTuple(i, "t" + i);
await Task.Delay(15000, ct);
}
}
private async Task CheckData()
{
var data = Enumerable.Range(0, Count).Select(x => GetTuple(x));
var res = await TupleView.GetAllAsync(null, data);
Assert.AreEqual(Count, res.Count);
foreach (var (_, hasVal) in res)
{
Assert.IsTrue(hasVal);
}
var deletedExists = await TupleView.ContainsKeyAsync(null, GetTuple(DeletedKey));
Assert.IsFalse(deletedExists);
}
}