blob: b594a414ec5ee85baf43dff1aa311c0da142dd2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Tests.Compute.Executor;
using System;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reflection;
using System.Runtime.Loader;
using System.Threading;
using System.Threading.Tasks;
using Ignite.Compute;
using Ignite.Table;
using Internal.Buffers;
using Internal.Compute;
using Internal.Compute.Executor;
using Internal.Proto.MsgPack;
using Internal.Table.Serialization;
using NUnit.Framework;
/// <summary>
/// Tests for <see cref="JobLoadContext"/>.
/// </summary>
public class JobLoadContextTests
{
private static readonly ConcurrentDictionary<Guid, string> DisposedJobStates = new();
[SetUp]
public void SetUp() => DisposedJobStates.Clear();
[Test]
public async Task TestJobExecution()
{
var res = await ExecuteJobAsync(DotNetJobs.AddOne, 1);
Assert.AreEqual(2, res);
}
[Test]
public async Task TestDisposableJob([Values(true, false)] bool async)
{
var jobType = async ? typeof(AsyncDisposableJob) : typeof(DisposableJob);
var jobDesc = new JobDescriptor<object, Guid>(jobType.AssemblyQualifiedName!);
var expectedState = async ? "InitializedExecutingExecutedAsyncDisposed" : "InitializedExecutedDisposed";
var execId = await ExecuteJobAsync(jobDesc, null);
Assert.IsTrue(DisposedJobStates.TryRemove(execId, out var state));
Assert.AreEqual(expectedState, state);
}
[Test]
public async Task TestDisposableReceiver([Values(true, false)] bool async)
{
var receiverType = async ? typeof(AsyncDisposableReceiver) : typeof(DisposableReceiver);
var expectedState = async ? "ExecutedAsyncDisposed" : "ExecutedDisposed";
var id = Guid.NewGuid();
await ExecuteReceiverAsync(receiverType, id);
Assert.IsTrue(DisposedJobStates.TryRemove(id, out var state));
Assert.AreEqual(expectedState, state);
}
[Test]
public void TestJobWithoutDefaultConstructorThrows()
{
var ex = Assert.ThrowsAsync<InvalidOperationException>(async () => await ExecuteJobAsync(DotNetJobs.NoCtor, 1));
Assert.AreEqual($"No public parameterless constructor for type '{typeof(DotNetJobs.NoCtorJob).AssemblyQualifiedName}'", ex.Message);
}
[Test]
public void TestCreateJobWrapperWithMultipleJobInterfacesThrows()
{
var jobLoadCtx = new JobLoadContext(AssemblyLoadContext.Default);
var ex = Assert.Throws<AmbiguousMatchException>(
() => jobLoadCtx.CreateJobWrapper(typeof(MultiInterfaceJob).AssemblyQualifiedName!));
Assert.AreEqual("Ambiguous match found for ' Apache.Ignite.Compute.IComputeJob`2[System.Object,System.Guid]'.", ex.Message);
}
[Test]
public void TestCreateReceiverWrapperWithMultipleReceiverInterfacesThrows()
{
var jobLoadCtx = new JobLoadContext(AssemblyLoadContext.Default);
var ex = Assert.Throws<AmbiguousMatchException>(
() => jobLoadCtx.CreateReceiverWrapper(typeof(MultiInterfaceReceiver).AssemblyQualifiedName!));
Assert.AreEqual(
"Ambiguous match found for ' Apache.Ignite.Table.IDataStreamerReceiver`3[System.Int32,System.Int32,System.Int32]'.",
ex.Message);
}
private static async Task<TResult> ExecuteJobAsync<TArg, TResult>(JobDescriptor<TArg, TResult> job, TArg? jobArg)
{
var jobLoadCtx = new JobLoadContext(AssemblyLoadContext.Default);
var jobWrapper = jobLoadCtx.CreateJobWrapper(job.JobClassName);
return await JobWrapperHelper.ExecuteAsync<TArg, TResult>(jobWrapper, jobArg);
}
private static async Task ExecuteReceiverAsync(Type receiverType, object arg)
{
var loadCtx = new JobLoadContext(AssemblyLoadContext.Default);
var receiverWrapper = loadCtx.CreateReceiverWrapper(receiverType.AssemblyQualifiedName!);
using var argBuf = WriteReceiverInfo(receiverType.AssemblyQualifiedName!, arg);
using var resBuf = new PooledArrayBuffer();
await receiverWrapper.ExecuteAsync(null!, argBuf, resBuf, CancellationToken.None);
static PooledBuffer WriteReceiverInfo(string typeName, object arg)
{
var items = new object[] { "hello" };
using var receiverInfoBuilder = StreamerReceiverSerializer.BuildReceiverInfo<object, object>(
typeName, arg, items, null, null, prefixSize: 4);
Memory<byte> receiverInfoMem = receiverInfoBuilder.Build();
BinaryPrimitives.WriteInt32LittleEndian(receiverInfoMem.Span, receiverInfoBuilder.NumElements);
using var jobArgBuf = new PooledArrayBuffer();
MsgPackWriter w = jobArgBuf.MessageWriter;
ComputePacker.PackArgOrResult(ref w, receiverInfoMem, null);
var resMem = jobArgBuf.GetWrittenMemory();
var resBytes = ByteArrayPool.Rent(resMem.Length);
resMem.Span.CopyTo(resBytes);
return new PooledBuffer(resBytes, 0, resMem.Length);
}
}
private class DisposableJob : IComputeJob<object, Guid>, IDisposable
{
private readonly Guid _id = Guid.NewGuid();
private string _state;
public DisposableJob() => _state = "Initialized";
public ValueTask<Guid> ExecuteAsync(IJobExecutionContext context, object arg, CancellationToken cancellationToken)
{
_state += "Executed";
return ValueTask.FromResult(_id);
}
public void Dispose() => DisposedJobStates[_id] = _state + "Disposed";
}
private class AsyncDisposableJob : IComputeJob<object, Guid>, IAsyncDisposable
{
private readonly Guid _id = Guid.NewGuid();
private string _state;
public AsyncDisposableJob() => _state = "Initialized";
public async ValueTask<Guid> ExecuteAsync(IJobExecutionContext context, object arg, CancellationToken cancellationToken)
{
_state += "Executing";
await Task.Delay(1, cancellationToken);
_state += "Executed";
return _id;
}
public async ValueTask DisposeAsync()
{
await Task.Delay(1);
DisposedJobStates[_id] = _state + "AsyncDisposed";
}
}
private sealed class AsyncDisposableReceiver : IDataStreamerReceiver<object, Guid, object>, IAsyncDisposable
{
private Guid _id;
public ValueTask<IList<object>?> ReceiveAsync(
IList<object> page,
Guid arg,
IDataStreamerReceiverContext context,
CancellationToken cancellationToken)
{
_id = arg;
DisposedJobStates[_id] = "Executed";
return ValueTask.FromResult<IList<object>?>(null);
}
public ValueTask DisposeAsync()
{
DisposedJobStates[_id] += "AsyncDisposed";
return ValueTask.CompletedTask;
}
}
private sealed class DisposableReceiver : IDataStreamerReceiver<object, Guid, object>, IDisposable
{
private Guid _id;
public ValueTask<IList<object>?> ReceiveAsync(
IList<object> page,
Guid arg,
IDataStreamerReceiverContext context,
CancellationToken cancellationToken)
{
_id = arg;
DisposedJobStates[_id] = "Executed";
return ValueTask.FromResult<IList<object>?>(null);
}
public void Dispose() => DisposedJobStates[_id] += "Disposed";
}
private class MultiInterfaceJob : IComputeJob<object, Guid>, IComputeJob<int, string>
{
public ValueTask<Guid> ExecuteAsync(IJobExecutionContext context, object arg, CancellationToken cancellationToken) =>
ValueTask.FromResult(Guid.Empty);
public ValueTask<string> ExecuteAsync(IJobExecutionContext context, int arg, CancellationToken cancellationToken) =>
ValueTask.FromResult("x");
}
private class MultiInterfaceReceiver : IDataStreamerReceiver<int, int, int>, IDataStreamerReceiver<int, int, short>
{
ValueTask<IList<int>?> IDataStreamerReceiver<int, int, int>.ReceiveAsync(
IList<int> page, int arg, IDataStreamerReceiverContext context, CancellationToken cancellationToken) =>
throw new NotImplementedException();
ValueTask<IList<short>?> IDataStreamerReceiver<int, int, short>.ReceiveAsync(
IList<int> page, int arg, IDataStreamerReceiverContext context, CancellationToken cancellationToken) =>
throw new NotImplementedException();
}
}