blob: 9b226454241a7ec5cfc734e087ea99db536b4006 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Tests.Table;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using Compute;
using Ignite.Compute;
using Ignite.Marshalling;
using Ignite.Table;
using NUnit.Framework;
using TestHelpers;
/// <summary>
/// Tests for data streamer with .NET receiver.
/// </summary>
public class DataStreamerPlatformReceiverTests : IgniteTestsBase
{
private static readonly JobDescriptor<JobInfo, object?> StreamerRunnerJob = new(
ComputeTests.PlatformTestNodeRunner + "$StreamerRunnerJob")
{
ArgMarshaller = new JsonMarshaller<JobInfo>()
};
private DeploymentUnit _defaultTestUnit = null!;
[OneTimeSetUp]
public async Task DeployDefaultUnit() => _defaultTestUnit = await ManagementApi.DeployTestsAssembly();
[OneTimeTearDown]
public async Task UndeployDefaultUnit() => await ManagementApi.UnitUndeploy(_defaultTestUnit);
[TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))]
public async Task TestEchoReceiverAllDataTypes(object item)
{
List<object> items = [item, item];
var res = await RunEchoReceiver(items);
var expected = items.Select(x => x is decimal dec ? new BigDecimal(dec) : x).ToList();
CollectionAssert.AreEqual(expected, res);
}
[TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))]
public async Task TestEchoArgsReceiverAllDataTypes(object arg)
{
var res = await RunEchoArgReceiver(arg);
if (arg is decimal dec)
{
arg = new BigDecimal(dec);
}
Assert.AreEqual(arg, res);
}
[Test]
public async Task TestEchoArgsReceiverTupleWithSchema()
{
var arg = TestCases.GetTupleWithAllFieldTypes(x => x is not decimal);
var res = await RunEchoArgReceiver(arg);
Assert.AreEqual(arg, res);
}
[Test]
public async Task TestEchoReceiverTupleWithSchema()
{
var arg = TestCases.GetTupleWithAllFieldTypes(x => x is not decimal);
List<object> items = [arg];
var res = await RunEchoReceiver(items);
Assert.AreEqual(items, res);
}
[Test]
public void TestMissingClass()
{
var receiverDesc = new ReceiverDescriptor<object, object, object>("BadClass")
{
Options = new ReceiverExecutionOptions
{
ExecutorType = JobExecutorType.DotNetSidecar
}
};
IAsyncEnumerable<object> resStream = PocoView.StreamDataAsync(
new object[] { 1 }.ToAsyncEnumerable(),
receiverDesc,
keySelector: _ => new Poco(),
payloadSelector: x => x.ToString()!,
receiverArg: "arg");
var ex = Assert.ThrowsAsync<DataStreamerException>(async () => await resStream.SingleAsync());
StringAssert.StartsWith(".NET job failed: Failed to load type 'BadClass'", ex.Message);
StringAssert.Contains("Could not resolve type 'BadClass' in assembly 'Apache.Ignite", ex.Message);
Assert.AreEqual(1, ex.FailedItems.Count);
}
[Test]
public void TestMissingAssembly()
{
var receiverDesc = new ReceiverDescriptor<object>("MyClass, BadAssembly")
{
Options = new ReceiverExecutionOptions
{
ExecutorType = JobExecutorType.DotNetSidecar
}
};
var task = PocoView.StreamDataAsync(
new object[] { 1 }.ToAsyncEnumerable(),
keySelector: _ => new Poco(),
payloadSelector: x => x.ToString()!,
receiverDesc,
receiverArg: "arg");
var ex = Assert.ThrowsAsync<DataStreamerException>(async () => await task);
StringAssert.Contains(".NET job failed: Failed to load type 'MyClass, BadAssembly'", ex.Message);
StringAssert.Contains("Could not load file or assembly 'BadAssembly", ex.Message);
StringAssert.Contains("The system cannot find the file specified.", ex.Message);
Assert.AreEqual(1, ex.FailedItems.Count);
}
[Test]
public void TestReceiverError()
{
IAsyncEnumerable<object> resStream = PocoView.StreamDataAsync(
new object[] { 1 }.ToAsyncEnumerable(),
DotNetReceivers.Error with { DeploymentUnits = [_defaultTestUnit] },
keySelector: _ => new Poco(),
payloadSelector: x => x.ToString()!,
receiverArg: "hello");
var ex = Assert.ThrowsAsync<DataStreamerException>(async () => await resStream.SingleAsync());
Assert.AreEqual(".NET job failed: Error in receiver: hello", ex.Message);
Assert.AreEqual("IGN-CATALOG-1", ex.CodeAsString);
Assert.AreEqual(1, ex.FailedItems.Count);
}
[Test]
public async Task TestRunDotNetReceiverFromJava()
{
var jobTarget = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
var jobInfo = new JobInfo(
TypeName: typeof(DotNetReceivers.UpdateTupleReceiver).AssemblyQualifiedName!,
Arg: "hello",
DeploymentUnits: [$"{_defaultTestUnit.Name}:{_defaultTestUnit.Version}"],
JobExecutorType: "DOTNET_SIDECAR");
var jobExec = await Client.Compute.SubmitAsync(jobTarget, StreamerRunnerJob, jobInfo);
var res = await jobExec.GetResultAsync();
Assert.AreEqual("Streaming finished: TupleImpl [VAL=java-test, VAL2=dotnet-test]", res);
}
[Test]
public async Task TestIgniteApiAccessFromReceiver()
{
var ids = Enumerable.Range(10, 50).ToList();
var tableName = nameof(TestIgniteApiAccessFromReceiver);
await Client.Sql.ExecuteAsync(null, $"DROP TABLE IF EXISTS {tableName}");
var res = await TupleView.StreamDataAsync(
data: ids.ToAsyncEnumerable(),
receiver: DotNetReceivers.CreateTableAndUpsert with { DeploymentUnits = [_defaultTestUnit] },
keySelector: _ => new IgniteTuple { ["key"] = 1L },
payloadSelector: id => id,
receiverArg: tableName,
options: new DataStreamerOptions { PageSize = 13 }).ToListAsync();
Assert.AreEqual(ids.Count, res.Count);
await using var resultSet = await Client.Sql.ExecuteAsync(null, $"SELECT * FROM {tableName}");
var rows = await resultSet.ToListAsync();
Assert.AreEqual(ids.Count, rows.Count);
}
[Test]
public async Task TestEchoManyItems([Values(1, 3, 99, 100_000)] int pageSize)
{
const int count = 1_000;
var items = Enumerable.Range(0, count)
.Select(x => new IgniteTuple { ["id"] = x, ["name"] = $"foo-{x}" })
.ToList();
var res = await RunEchoReceiver(items, new DataStreamerOptions { PageSize = pageSize });
CollectionAssert.AreEqual(items, res);
}
[Test]
public async Task TestPlatformExecutorWithOldServerThrowsCompatibilityError()
{
using var server = new FakeServer();
using var client = await server.ConnectClientAsync();
var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
var view = table!.RecordBinaryView;
var ex = Assert.ThrowsAsync<DataStreamerException>(async () => await view.StreamDataAsync(
new object[] { "unused" }.ToAsyncEnumerable(),
DotNetReceivers.EchoArgs with { DeploymentUnits = [_defaultTestUnit] },
keySelector: _ => new IgniteTuple { ["ID"] = 1 },
payloadSelector: _ => "unused",
receiverArg: "test").SingleAsync());
Assert.AreEqual("ReceiverExecutionOptions are not supported by the server.", ex.Message);
Assert.AreEqual(1, ex.FailedItems.Count);
}
[Test]
public async Task TestMarshallerReceiver()
{
var receiverItem = new DotNetReceivers.ReceiverItem<string>(Guid.NewGuid(), "hello");
var receiverArg = new DotNetReceivers.ReceiverArg(123, "345");
DotNetReceivers.ReceiverResult<string> res = await PocoView.StreamDataAsync(
new object[] { "unused" }.ToAsyncEnumerable(),
DotNetReceivers.Marshaller with { DeploymentUnits = [_defaultTestUnit] },
keySelector: _ => new Poco(),
payloadSelector: _ => receiverItem,
receiverArg: receiverArg).FirstAsync();
Assert.AreEqual(receiverArg, res.Arg);
Assert.AreEqual(receiverItem, res.Item);
}
[Test]
public void TestErrorInMarshaller()
{
var ex = Assert.ThrowsAsync<DataStreamerException>(async () => await PocoView.StreamDataAsync(
new object[] { "unused" }.ToAsyncEnumerable(),
DotNetReceivers.Marshaller with { DeploymentUnits = [_defaultTestUnit] },
keySelector: _ => new Poco(),
payloadSelector: _ => new DotNetReceivers.ReceiverItem<string>(Guid.Empty, "error!"),
receiverArg: new DotNetReceivers.ReceiverArg(1, "1")).FirstAsync());
Assert.AreEqual(
".NET job failed: Test marshaller error: ReceiverItem { Id = 00000000-0000-0000-0000-000000000000, Value = error! }",
ex.Message);
Assert.AreEqual(1, ex.FailedItems.Count);
}
private async Task<object> RunEchoArgReceiver(object arg, IRecordView<Poco>? view = null)
{
view ??= PocoView;
return await view.StreamDataAsync(
new object[] { "unused" }.ToAsyncEnumerable(),
DotNetReceivers.EchoArgs with { DeploymentUnits = [_defaultTestUnit] },
keySelector: _ => new Poco(),
payloadSelector: _ => "unused",
receiverArg: arg).SingleAsync();
}
private async Task<List<object>> RunEchoReceiver(IEnumerable<object> items, DataStreamerOptions? options = null) =>
await PocoView.StreamDataAsync(
items.ToAsyncEnumerable(),
DotNetReceivers.Echo with { DeploymentUnits = [_defaultTestUnit] },
keySelector: _ => new Poco(),
payloadSelector: x => x,
receiverArg: "unused",
options: options).ToListAsync();
[SuppressMessage("ReSharper", "NotAccessedPositionalProperty.Local", Justification = "JSON")]
private record JobInfo(string TypeName, object Arg, List<string> DeploymentUnits, string JobExecutorType);
}