| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| namespace Apache.Ignite.Tests.Compute; |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Diagnostics.CodeAnalysis; |
| using System.Linq; |
| using System.Threading.Tasks; |
| using Ignite.Compute; |
| using Ignite.Marshalling; |
| using Ignite.Table; |
| using Network; |
| using NUnit.Framework; |
| using TestHelpers; |
| |
| /// <summary> |
| /// Tests for platform compute (non-Java jobs). |
| /// <para /> |
| /// Development: |
| /// - Changing test code, job code: no need to restart Ignite. |
| /// - Changing core code: restart Ignite servers and do a full .NET solution rebuild to reflect the changes in .NET compute executor. |
| /// <para /> |
| /// Debugging: |
| /// - Run tests once so that .NET executor processes are started. |
| /// - Attach to the executor processes. |
| /// - Run tests again to debug the executor. |
| /// </summary> |
| public class PlatformComputeTests : IgniteTestsBase |
| { |
| private static readonly JobDescriptor<JobInfo, object?> JobRunnerJob = new(ComputeTests.PlatformTestNodeRunner + "$JobRunnerJob") |
| { |
| ArgMarshaller = new JsonMarshaller<JobInfo>() |
| }; |
| |
| private DeploymentUnit _defaultTestUnit = null!; |
| |
| [OneTimeSetUp] |
| public async Task DeployDefaultUnit() => _defaultTestUnit = await ManagementApi.DeployTestsAssembly(); |
| |
| [OneTimeTearDown] |
| public async Task UndeployDefaultUnit() => await ManagementApi.UnitUndeploy(_defaultTestUnit); |
| |
| [Test] |
| public async Task TestEchoJob([Values(true, false)] bool withSsl) |
| { |
| var jobDesc = DotNetJobs.Echo with { DeploymentUnits = [_defaultTestUnit] }; |
| var jobTarget = JobTarget.Node(await GetClusterNodeAsync(withSsl ? "_3" : string.Empty)); |
| |
| var jobExec = await Client.Compute.SubmitAsync( |
| jobTarget, |
| jobDesc, |
| "Hello world!"); |
| |
| var result = await jobExec.GetResultAsync(); |
| |
| Assert.AreEqual("Hello world!", result); |
| } |
| |
| [Test] |
| public async Task TestBroadcastJob() |
| { |
| var jobDesc = DotNetJobs.Echo with { DeploymentUnits = [_defaultTestUnit] }; |
| var jobTarget = BroadcastJobTarget.Nodes( |
| await GetClusterNodeAsync(), |
| await GetClusterNodeAsync("_2"), |
| await GetClusterNodeAsync("_3")); |
| |
| var jobExec = await Client.Compute.SubmitBroadcastAsync( |
| jobTarget, |
| jobDesc, |
| "Hello world!"); |
| |
| foreach (var job in jobExec.JobExecutions) |
| { |
| var res = await job.GetResultAsync(); |
| Assert.AreEqual("Hello world!", res); |
| } |
| } |
| |
| [Test] |
| [TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))] |
| public async Task TestAllSupportedArgTypes(object val) |
| { |
| var result = await ExecJobAsync(DotNetJobs.Echo, val); |
| |
| if (val is decimal dec) |
| { |
| val = new BigDecimal(dec); |
| } |
| |
| Assert.AreEqual(val, result); |
| } |
| |
| [Test] |
| public async Task TestMissingClass() |
| { |
| var target = JobTarget.Node(await GetClusterNodeAsync()); |
| var desc = new JobDescriptor<string, string>( |
| "MyNamespace.MyJob", |
| [_defaultTestUnit], |
| new JobExecutionOptions { ExecutorType = JobExecutorType.DotNetSidecar }); |
| |
| var jobExec = await Client.Compute.SubmitAsync(target, desc, "arg"); |
| var ex = Assert.ThrowsAsync<IgniteException>(async () => await jobExec.GetResultAsync()); |
| |
| StringAssert.StartsWith(".NET job failed: Failed to load type 'MyNamespace.MyJob'", ex.Message); |
| StringAssert.Contains("Could not resolve type 'MyNamespace.MyJob' in assembly 'Apache.Ignite", ex.Message); |
| Assert.AreEqual("IGN-COMPUTE-9", ex.CodeAsString); |
| } |
| |
| [Test] |
| public async Task TestMissingAssembly() |
| { |
| // Run without providing deployment units. |
| var target = JobTarget.Node(await GetClusterNodeAsync(string.Empty)); |
| var jobExec = await Client.Compute.SubmitAsync(target, DotNetJobs.Echo, "Hello world!"); |
| |
| var ex = Assert.ThrowsAsync<IgniteException>(async () => await jobExec.GetResultAsync()); |
| StringAssert.StartsWith(".NET job failed: Failed to load type 'Apache.Ignite.Tests.Compute.DotNetJobs+EchoJob", ex.Message); |
| StringAssert.Contains("Could not load file or assembly 'Apache.Ignite.Tests", ex.Message); |
| Assert.AreEqual("IGN-COMPUTE-9", ex.CodeAsString); |
| } |
| |
| [Test] |
| public async Task TestJobError() |
| { |
| var target = JobTarget.Node(await GetClusterNodeAsync(string.Empty)); |
| var desc = DotNetJobs.Error with { DeploymentUnits = [_defaultTestUnit] }; |
| |
| var jobExec = await Client.Compute.SubmitAsync(target, desc, "arg"); |
| var ex = Assert.ThrowsAsync<IgniteException>(async () => await jobExec.GetResultAsync()); |
| |
| Assert.AreEqual(".NET job failed: Test exception: arg", ex.Message); |
| Assert.AreEqual("IGN-COMPUTE-9", ex.CodeAsString); |
| |
| StringAssert.Contains( |
| "System.ArithmeticException: Test exception: arg" + |
| $"{Environment.NewLine} at Apache.Ignite.Tests.Compute.DotNetJobs.ErrorJob.Throw(Object arg)" + |
| $"{Environment.NewLine} at Apache.Ignite.Tests.Compute.DotNetJobs.ErrorJob.ExecuteAsync", |
| ex.InnerException?.Message); |
| } |
| |
| [Test] |
| [Ignore("IGNITE-25181")] |
| public async Task TestDotNetJobFailsOnServerWithClientCertificate() |
| { |
| var target = JobTarget.Node(await GetClusterNodeAsync("_4")); |
| var desc = new JobDescriptor<string, string>( |
| "SomeJob", |
| [_defaultTestUnit], |
| new JobExecutionOptions { ExecutorType = JobExecutorType.DotNetSidecar }); |
| |
| var jobExec = await Client.Compute.SubmitAsync(target, desc, "Hello world!"); |
| var ex = Assert.ThrowsAsync<IgniteException>(async () => await jobExec.GetResultAsync()); |
| |
| // TODO IGNITE-25181 Support client certs with .NET compute executor. |
| Assert.AreEqual("Could not start .NET executor process in 2 attempts", ex.Message); |
| } |
| |
| [Test] |
| public async Task TestCallDotNetJobFromJava() |
| { |
| var targetNode = await GetClusterNodeAsync(); |
| var target = JobTarget.Node(targetNode); |
| |
| var arg = new JobInfo( |
| TypeName: typeof(DotNetJobs.EchoJob).AssemblyQualifiedName!, |
| Arg: "arg1", |
| DeploymentUnits: [$"{_defaultTestUnit.Name}:{_defaultTestUnit.Version}"], |
| NodeId: targetNode.Id, |
| JobExecutorType: "DOTNET_SIDECAR"); |
| |
| var jobExec = await Client.Compute.SubmitAsync(target, JobRunnerJob, arg); |
| var res = await jobExec.GetResultAsync(); |
| |
| Assert.AreEqual("arg1", res); |
| } |
| |
| [Test] |
| public async Task TestDotNetJobRunsInAnotherProcess() |
| { |
| var jobProcessId = await ExecJobAsync(DotNetJobs.ProcessId); |
| |
| Assert.AreNotEqual(Environment.ProcessId, jobProcessId); |
| } |
| |
| [Test] |
| public async Task TestDotNetSidecarProcessIsRestartedOnExit() |
| { |
| var jobTimeout = TimeSpan.FromSeconds(5); |
| |
| // Get executor process id. |
| int jobProcessId1 = await ExecJobAsync(DotNetJobs.ProcessId).WaitAsync(jobTimeout); |
| |
| // Run a job that exits the process. This job fails because the process exits before the result is returned. |
| var ex = Assert.ThrowsAsync<IgniteException>( |
| async () => await ExecJobAsync(DotNetJobs.ProcessExit).WaitAsync(jobTimeout)); |
| |
| // Run another job - the process should be restarted automatically. |
| int jobProcessId2 = await ExecJobAsync(DotNetJobs.ProcessId).WaitAsync(jobTimeout); |
| |
| Assert.AreNotEqual(jobProcessId1, jobProcessId2); |
| Assert.AreEqual(".NET compute executor connection lost", ex.Message); |
| Assert.AreEqual("IGN-CLIENT-9", ex.CodeAsString); |
| } |
| |
| [Test] |
| public async Task TestIgniteApiAccessFromJob() |
| { |
| var apiRes = await ExecJobAsync(DotNetJobs.ApiTest, "Hello world!"); |
| |
| Assert.AreEqual( |
| "Arg: Hello world!|SQL result: IgniteTuple { ANSWER = 42 }|Table result: Option { HasValue = True, Value = Hello }", |
| apiRes); |
| } |
| |
| [Test] |
| public async Task TestManyJobsAssemblyLoadContextUnload() |
| { |
| int jobCount = 100; |
| |
| var jobTasks = Enumerable |
| .Range(0, jobCount) |
| .Select(x => ExecJobAsync(DotNetJobs.Echo, x)) |
| .ToArray(); |
| |
| await Task.WhenAll(jobTasks); |
| |
| var assemblyLoadContextCount = await ExecJobAsync(DotNetJobs.AssemblyLoadContextCount); |
| |
| // Default context + current job context, all others should be unloaded. |
| Assert.AreEqual(2, assemblyLoadContextCount); |
| } |
| |
| [Test] |
| public async Task TestTupleWithSchemaRoundTrip() |
| { |
| var tuple = TestCases.GetTupleWithAllFieldTypes(); |
| tuple["nested_tuple"] = TestCases.GetTupleWithAllFieldTypes(x => x is not decimal); |
| |
| var expectedTuple = Enumerable.Range(0, tuple.FieldCount).Aggregate( |
| seed: new IgniteTuple(), |
| (acc, i) => |
| { |
| acc[tuple.GetName(i)] = tuple[i] is decimal d ? new BigDecimal(d) : tuple[i]; |
| return acc; |
| }); |
| |
| var res = (IIgniteTuple)(await ExecJobAsync(DotNetJobs.Echo, tuple))!; |
| |
| Assert.AreEqual(expectedTuple, res); |
| } |
| |
| [Test] |
| public async Task TestDeepNestedTupleWithSchemaRoundTrip() |
| { |
| var tuple = TestCases.GetNestedTuple(100); |
| var res = await ExecJobAsync(DotNetJobs.Echo, tuple); |
| |
| Assert.AreEqual(tuple, res); |
| StringAssert.Contains("CHILD99 = IgniteTuple { ID = 99, CHILD100 = IgniteTuple { ID = 100 } } } } } } } }", res?.ToString()); |
| } |
| |
| [Test] |
| public async Task TestPlatformExecutorWithOldServerThrowsCompatibilityError() |
| { |
| using var server = new FakeServer(); |
| using var client = await server.ConnectClientAsync(); |
| |
| var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await ExecJobAsync(DotNetJobs.Echo, arg: "test", client: client)); |
| Assert.AreEqual("Job executor type 'DotNetSidecar' is not supported by the server.", ex.Message); |
| } |
| |
| private async Task<IClusterNode> GetClusterNodeAsync(string? suffix = null) |
| { |
| var nodeName = ComputeTests.PlatformTestNodeRunner + suffix; |
| |
| var nodes = await Client.GetClusterNodesAsync(); |
| return nodes.First(n => n.Name == nodeName); |
| } |
| |
| private async Task<TRes> ExecJobAsync<TArg, TRes>(JobDescriptor<TArg, TRes> desc, TArg arg = default!, IIgniteClient? client = null) |
| { |
| var jobDesc = desc with { DeploymentUnits = [_defaultTestUnit] }; |
| var jobTarget = JobTarget.Node(await GetClusterNodeAsync()); |
| |
| client ??= Client; |
| |
| var jobExec = await client.Compute.SubmitAsync( |
| jobTarget, |
| jobDesc, |
| arg: arg); |
| |
| return await jobExec.GetResultAsync(); |
| } |
| |
| [SuppressMessage("ReSharper", "NotAccessedPositionalProperty.Local", Justification = "JSON")] |
| private record JobInfo(string TypeName, object Arg, List<string> DeploymentUnits, Guid NodeId, string JobExecutorType); |
| } |