blob: 84f19fbe15c7b83375803a883ce0102c2617a358 [file]
#region License
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#endregion
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Gremlin.Net.Driver;
using Xunit;
namespace Gremlin.Net.UnitTest.Driver
{
public class ResultSetTests
{
/// <summary>
/// Creates a ResultSet backed by a channel. The background task writes
/// the provided items to the channel and then completes it.
/// </summary>
private static ResultSet<T> CreateResultSet<T>(IEnumerable<T> items)
{
var channel = Channel.CreateUnbounded<object>(
new UnboundedChannelOptions { SingleWriter = true });
var disposeCts = new CancellationTokenSource();
var backgroundTask = Task.Run(async () =>
{
foreach (var item in items)
{
await channel.Writer.WriteAsync(item!).ConfigureAwait(false);
}
channel.Writer.Complete();
});
return new ResultSet<T>(channel.Reader, disposeCts, backgroundTask);
}
/// <summary>
/// Creates a ResultSet whose background task completes the channel with an error.
/// </summary>
private static ResultSet<T> CreateFaultedResultSet<T>(Exception exception)
{
var channel = Channel.CreateUnbounded<object>(
new UnboundedChannelOptions { SingleWriter = true });
var disposeCts = new CancellationTokenSource();
var backgroundTask = Task.Run(() =>
{
channel.Writer.Complete(exception);
});
return new ResultSet<T>(channel.Reader, disposeCts, backgroundTask);
}
/// <summary>
/// Creates a ResultSet whose background task blocks until the dispose CTS is cancelled.
/// </summary>
private static (ResultSet<T> resultSet, TaskCompletionSource<bool> started) CreateBlockingResultSet<T>()
{
var channel = Channel.CreateUnbounded<object>(
new UnboundedChannelOptions { SingleWriter = true });
var disposeCts = new CancellationTokenSource();
var started = new TaskCompletionSource<bool>();
var backgroundTask = Task.Run(async () =>
{
started.SetResult(true);
try
{
// Block until cancelled
await Task.Delay(Timeout.Infinite, disposeCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Expected on dispose
}
finally
{
channel.Writer.Complete();
}
});
return (new ResultSet<T>(channel.Reader, disposeCts, backgroundTask), started);
}
[Fact]
public async Task AwaitForeachShouldYieldAllResults()
{
var expected = new List<int> { 1, 2, 3, 4, 5 };
await using var resultSet = CreateResultSet(expected);
var actual = new List<int>();
await foreach (var item in resultSet)
{
actual.Add(item);
}
Assert.Equal(expected, actual);
}
[Fact]
public async Task AwaitForeachShouldYieldStringResults()
{
var expected = new List<string> { "hello", "world", "gremlin" };
await using var resultSet = CreateResultSet(expected);
var actual = new List<string>();
await foreach (var item in resultSet)
{
actual.Add(item);
}
Assert.Equal(expected, actual);
}
[Fact]
public async Task AwaitForeachShouldHandleEmptyResultSet()
{
await using var resultSet = CreateResultSet(new List<int>());
var actual = new List<int>();
await foreach (var item in resultSet)
{
actual.Add(item);
}
Assert.Empty(actual);
}
[Fact]
public async Task ToListAsyncShouldReturnAllResultsInOrder()
{
var expected = new List<int> { 10, 20, 30, 40, 50 };
await using var resultSet = CreateResultSet(expected);
var actual = await resultSet.ToListAsync();
Assert.Equal(expected, actual);
}
[Fact]
public async Task ToListAsyncShouldReturnEmptyListForEmptyResultSet()
{
await using var resultSet = CreateResultSet(new List<string>());
var actual = await resultSet.ToListAsync();
Assert.Empty(actual);
}
[Fact]
public async Task DisposeAsyncShouldCancelBackgroundTask()
{
var (resultSet, started) = CreateBlockingResultSet<int>();
await started.Task;
var disposeTask = resultSet.DisposeAsync().AsTask();
var completed = await Task.WhenAny(disposeTask, Task.Delay(5000));
Assert.Same(disposeTask, completed);
}
[Fact]
public async Task ErrorInChannelShouldPropagateToConsumerDuringIteration()
{
var expectedException = new InvalidOperationException("deserialization failed");
await using var resultSet = CreateFaultedResultSet<int>(expectedException);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () =>
{
await foreach (var _ in resultSet)
{
// Should not yield any items
}
});
Assert.Same(expectedException, ex);
}
[Fact]
public async Task ErrorInChannelShouldPropagateToToListAsync()
{
var expectedException = new InvalidOperationException("stream error");
await using var resultSet = CreateFaultedResultSet<string>(expectedException);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(
() => resultSet.ToListAsync());
Assert.Same(expectedException, ex);
}
[Fact]
public async Task SecondCallToGetAsyncEnumeratorShouldThrowInvalidOperationException()
{
await using var resultSet = CreateResultSet(new List<int> { 1, 2, 3 });
// First call should succeed
var enumerator = resultSet.GetAsyncEnumerator();
// Consume to avoid leaving the enumerator in a bad state
while (await enumerator.MoveNextAsync())
{
}
// Second call throws immediately — the guard check is eager (not deferred
// to MoveNextAsync) so callers get fast feedback.
Assert.Throws<InvalidOperationException>(
() => resultSet.GetAsyncEnumerator());
}
[Fact]
public async Task ToListAsyncAfterGetAsyncEnumeratorShouldThrowInvalidOperationException()
{
await using var resultSet = CreateResultSet(new List<int> { 1 });
// Consume via await foreach first
await foreach (var _ in resultSet)
{
}
// ToListAsync calls GetAsyncEnumerator internally, so it should throw
await Assert.ThrowsAsync<InvalidOperationException>(
() => resultSet.ToListAsync());
}
[Fact]
public void DisposeShouldNotThrowWhenCalledSynchronously()
{
var resultSet = CreateResultSet(new List<int> { 1, 2, 3 });
// Synchronous Dispose should not throw
resultSet.Dispose();
}
[Fact]
public async Task DoubleDisposeAsyncShouldNotThrow()
{
var resultSet = CreateResultSet(new List<int> { 1, 2, 3 });
await resultSet.DisposeAsync();
await resultSet.DisposeAsync();
// If we get here without throwing, double-dispose is safe
}
[Fact]
public void DoubleDisposeSyncShouldNotThrow()
{
var resultSet = CreateResultSet(new List<int> { 1, 2, 3 });
resultSet.Dispose();
resultSet.Dispose();
// If we get here without throwing, double sync dispose is safe
}
[Fact]
public async Task MixedDisposeAsyncThenSyncShouldNotThrow()
{
var resultSet = CreateResultSet(new List<int> { 1, 2, 3 });
await resultSet.DisposeAsync();
resultSet.Dispose();
// Mixed disposal order should not throw
}
[Fact]
public async Task CancellationDuringIterationShouldStopEnumeration()
{
// Create a channel that won't complete on its own — items trickle in slowly
var channel = Channel.CreateUnbounded<object>(
new UnboundedChannelOptions { SingleWriter = true });
var disposeCts = new CancellationTokenSource();
var backgroundTask = Task.Run(async () =>
{
for (var i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i).ConfigureAwait(false);
try
{
await Task.Delay(50, disposeCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
}
channel.Writer.Complete();
});
var resultSet = new ResultSet<int>(channel.Reader, disposeCts, backgroundTask);
using var iterationCts = new CancellationTokenSource();
var collected = new List<int>();
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
{
await foreach (var item in resultSet.WithCancellation(iterationCts.Token))
{
collected.Add(item);
if (collected.Count >= 2)
{
iterationCts.Cancel();
}
}
});
Assert.True(collected.Count >= 2, "Should have collected at least 2 items before cancellation");
await resultSet.DisposeAsync();
}
}
}