IGNITE-21931 .NET: Refactor DataStreamer to use StreamerBatchSend (#3546)
* Refactor streamer to batch by partition instead of by node
* Use `StreamerBatchSend` instead of `TupleUpsertAll`
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 4849b0d..6dfe0f9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -115,7 +115,7 @@
public Dictionary<string, object?> LastSqlScriptProps { get; private set; } = new();
- public long UpsertAllRowCount { get; set; }
+ public long StreamerRowCount { get; set; }
public long DroppedConnectionCount { get; set; }
@@ -281,11 +281,6 @@
Thread.Sleep(MultiRowOperationDelayPerRow * count);
}
- if (opCode == ClientOp.TupleUpsertAll)
- {
- UpsertAllRowCount += count;
- }
-
Send(handler, requestId, new byte[] { 1, 0 }.AsMemory());
continue;
@@ -334,6 +329,13 @@
Thread.Sleep(HeartbeatDelay);
Send(handler, requestId, Array.Empty<byte>());
continue;
+
+ case ClientOp.StreamerBatchSend:
+ reader.Skip(4);
+ StreamerRowCount += reader.ReadInt32();
+
+ Send(handler, requestId, Array.Empty<byte>());
+ continue;
}
// Fake error message for any other op code.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index c17bfa4..c3c7059 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -112,7 +112,7 @@
public async Task TestDataStreamerReceivesPartitionAssignmentUpdates() =>
await TestClientReceivesPartitionAssignmentUpdates(
view => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
- ClientOp.TupleUpsertAll);
+ ClientOp.StreamerBatchSend);
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
@@ -138,7 +138,7 @@
await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key), ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(() => recordView.DeleteAsync(null, key), ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key), ClientOp.TupleDeleteExact, expectedNode);
- await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new IgniteTuple { ["ID"] = keyId + 1 } };
@@ -172,7 +172,7 @@
await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key), ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(() => recordView.DeleteAsync(null, key), ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key), ClientOp.TupleDeleteExact, expectedNode);
- await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, key - 1, key + 1 };
@@ -218,7 +218,7 @@
await AssertOpOnNode(() => kvView.PutAllAsync(null, pairs), ClientOp.TupleUpsertAll, expectedNode);
await AssertOpOnNode(() => kvView.RemoveAllAsync(null, keys), ClientOp.TupleDeleteAll, expectedNode);
await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs), ClientOp.TupleDeleteAllExact, expectedNode);
- await AssertOpOnNode(() => kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(() => kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
}
[Test]
@@ -248,7 +248,7 @@
await AssertOpOnNode(() => kvView.ContainsAsync(null, key), ClientOp.TupleContainsKey, expectedNode);
await AssertOpOnNode(
() => kvView.StreamDataAsync(new[] { new KeyValuePair<int, int>(key, val) }.ToAsyncEnumerable()),
- ClientOp.TupleUpsertAll,
+ ClientOp.StreamerBatchSend,
expectedNode);
// Multi-key operations use the first key for colocation.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index d509c6f..15b9e6a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -149,7 +149,7 @@
public async Task TestRetryLimitExhausted()
{
using var server = new FakeServer(
- shouldDropConnection: ctx => ctx is { OpCode: ClientOp.TupleUpsertAll, RequestCount: > 7 });
+ shouldDropConnection: ctx => ctx is { OpCode: ClientOp.StreamerBatchSend, RequestCount: > 7 });
using var client = await server.ConnectClientAsync();
var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
@@ -157,7 +157,7 @@
var ex = Assert.ThrowsAsync<IgniteClientConnectionException>(
async () => await table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(10_000)));
- StringAssert.StartsWith("Operation TupleUpsertAll failed after 16 retries", ex!.Message);
+ StringAssert.StartsWith("Operation StreamerBatchSend failed after 16 retries", ex!.Message);
}
[Test]
@@ -167,7 +167,7 @@
int upsertIdx = 0;
using var server = new FakeServer(
- shouldDropConnection: ctx => ctx.OpCode == ClientOp.TupleUpsertAll && Interlocked.Increment(ref upsertIdx) % 2 == 1);
+ shouldDropConnection: ctx => ctx.OpCode == ClientOp.StreamerBatchSend && Interlocked.Increment(ref upsertIdx) % 2 == 1);
// Streamer has it's own retry policy, so we can disable retries on the client.
using var client = await server.ConnectClientAsync(new IgniteClientConfiguration
@@ -178,7 +178,7 @@
var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
await table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(count));
- Assert.AreEqual(count, server.UpsertAllRowCount);
+ Assert.AreEqual(count, server.StreamerRowCount);
Assert.That(server.DroppedConnectionCount, Is.GreaterThanOrEqualTo(count / DataStreamerOptions.Default.PageSize));
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 0c9c64e..22ac237 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -356,7 +356,7 @@
async IAsyncEnumerable<IIgniteTuple> GetData()
{
// First set of batches uses old schema.
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < 20; i++)
{
yield return GetTuple(i);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 43e7935..0a1d3ee 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -144,6 +144,11 @@
/// <summary>
/// Change compute job priority (<see cref="IJobExecution{T}.ChangePriorityAsync"/>).
/// </summary>
- ComputeChangePriority
+ ComputeChangePriority,
+
+ /// <summary>
+ /// Send data streamer batch (<see cref="IDataStreamerTarget{T}.StreamDataAsync"/>).
+ /// </summary>
+ StreamerBatchSend
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 80b3cec..7a1e804 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -128,6 +128,9 @@
ComputeCancel = 60,
/** Change compute job priority. */
- ComputeChangePriority = 61
+ ComputeChangePriority = 61,
+
+ /** Send streamer batch. */
+ StreamerBatchSend = 62
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index abab50c..0648e0f 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -68,6 +68,7 @@
ClientOp.ClusterGetNodes => null,
ClientOp.PartitionAssignmentGet => null,
ClientOp.SqlParamMeta => null,
+ ClientOp.StreamerBatchSend => ClientOperationType.StreamerBatchSend,
// Do not return null from default arm intentionally so we don't forget to update this when new ClientOp values are added.
// ReSharper disable once PatternIsRedundant
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index ec11ab3..59655b0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -22,7 +22,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
-using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -54,20 +53,16 @@
/// Streams the data.
/// </summary>
/// <param name="data">Data.</param>
- /// <param name="sender">Batch sender.</param>
+ /// <param name="table">Table.</param>
/// <param name="writer">Item writer.</param>
- /// <param name="schemaProvider">Schema provider.</param>
- /// <param name="partitionAssignmentProvider">Partitioner.</param>
/// <param name="options">Options.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <typeparam name="T">Element type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
internal static async Task StreamDataAsync<T>(
IAsyncEnumerable<T> data,
- Func<PooledArrayBuffer, int, string, IRetryPolicy, Task> sender,
+ Table table,
RecordSerializer<T> writer,
- Func<int?, Task<Schema>> schemaProvider, // Not a ValueTask because Tasks are cached.
- Func<ValueTask<string?[]>> partitionAssignmentProvider,
DataStreamerOptions options,
CancellationToken cancellationToken)
{
@@ -90,12 +85,16 @@
// ConcurrentDictionary is not necessary because we consume the source sequentially.
// However, locking for batches is required due to auto-flush background task.
- var batches = new Dictionary<string, Batch<T>>();
+ var batches = new Dictionary<int, Batch<T>>();
var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit };
- var schema = await schemaProvider(null).ConfigureAwait(false);
- var partitionAssignment = await partitionAssignmentProvider().ConfigureAwait(false);
+ var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
+
+ var partitionAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
+ var partitionCount = partitionAssignment.Length; // Can't be changed.
+ Debug.Assert(partitionCount > 0, "partitionCount > 0");
var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
+
using var flushCts = new CancellationTokenSource();
try
@@ -116,7 +115,7 @@
if (lastPartitionsAssignmentCheck.Elapsed > PartitionAssignmentUpdateFrequency)
{
- var newAssignment = await partitionAssignmentProvider().ConfigureAwait(false);
+ var newAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
if (newAssignment != partitionAssignment)
{
@@ -146,7 +145,7 @@
return;
- async ValueTask<(Batch<T> Batch, string Partition)> AddWithRetryUnmapped(T item)
+ async ValueTask<(Batch<T> Batch, int Partition)> AddWithRetryUnmapped(T item)
{
try
{
@@ -154,12 +153,12 @@
}
catch (Exception e) when (e.CausedByUnmappedColumns())
{
- schema = await schemaProvider(Table.SchemaVersionForceLatest).ConfigureAwait(false);
+ schema = await table.GetSchemaAsync(Table.SchemaVersionForceLatest).ConfigureAwait(false);
return Add(item);
}
}
- (Batch<T> Batch, string Partition) Add(T item)
+ (Batch<T> Batch, int Partition) Add(T item)
{
var schema0 = schema;
var tupleBuilder = new BinaryTupleBuilder(schema0.Columns.Length, hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
@@ -174,7 +173,7 @@
}
}
- (Batch<T> Batch, string Partition) Add0(T item, ref BinaryTupleBuilder tupleBuilder, Schema schema0)
+ (Batch<T> Batch, int Partition) Add0(T item, ref BinaryTupleBuilder tupleBuilder, Schema schema0)
{
var columnCount = schema0.Columns.Length;
@@ -185,11 +184,8 @@
writer.Handler.Write(ref tupleBuilder, item, schema0, keyOnly: false, noValueSetRef);
- // ReSharper disable once AccessToModifiedClosure (reviewed)
- var partitionAssignment0 = partitionAssignment;
- var partition = partitionAssignment0[Math.Abs(tupleBuilder.GetHash() % partitionAssignment0.Length)] ?? string.Empty;
-
- var batch = GetOrCreateBatch(partition);
+ var partitionId = Math.Abs(tupleBuilder.GetHash() % partitionCount);
+ var batch = GetOrCreateBatch(partitionId);
lock (batch)
{
@@ -214,17 +210,17 @@
Metrics.StreamerItemsQueuedIncrement();
- return (batch, partition);
+ return (batch, partitionId);
}
- Batch<T> GetOrCreateBatch(string partition)
+ Batch<T> GetOrCreateBatch(int partitionId)
{
- ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
+ ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partitionId, out _);
if (batchRef == null)
{
batchRef = new Batch<T>(options.PageSize, schema);
- InitBuffer(batchRef);
+ InitBuffer(batchRef, partitionId, schema);
Metrics.StreamerBatchesActiveIncrement();
}
@@ -232,7 +228,7 @@
return batchRef;
}
- async Task SendAsync(Batch<T> batch, string partition)
+ async Task SendAsync(Batch<T> batch, int partitionId)
{
var expectedSize = batch.Count;
@@ -253,12 +249,12 @@
buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
- batch.Task = SendAndDisposeBufAsync(buf, partition, batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
+ batch.Task = SendAndDisposeBufAsync(buf, partitionId, batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
batch.Items = ArrayPool<T>.Shared.Rent(options.PageSize);
batch.Count = 0;
batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf will be disposed in SendAndDisposeBufAsync.
- InitBuffer(batch);
+ InitBuffer(batch, partitionId, schema);
batch.LastFlush = Stopwatch.GetTimestamp();
batch.Schema = schema;
batch.SchemaOutdated = false;
@@ -269,7 +265,7 @@
async Task SendAndDisposeBufAsync(
PooledArrayBuffer buf,
- string partition,
+ int partitionId,
Task oldTask,
T[] items,
int count,
@@ -280,10 +276,13 @@
if (batchSchemaOutdated)
{
// Schema update was detected while the batch was being filled.
- buf.Reset();
- writer.WriteMultiple(buf, null, schema, items.Take(count));
+ // Re-serialize the whole batch.
+ ReWriteBatch(buf, partitionId, schema, items.AsSpan(0, count), writer);
}
+ // ReSharper disable once AccessToModifiedClosure
+ var preferredNode = PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
+
try
{
int? schemaVersion = null;
@@ -296,17 +295,16 @@
// Might be updated by another batch.
if (schema.Version != schemaVersion)
{
- schema = await schemaProvider(schemaVersion).ConfigureAwait(false);
+ schema = await table.GetSchemaAsync(schemaVersion).ConfigureAwait(false);
}
// Serialize again with the new schema.
- buf.Reset();
- writer.WriteMultiple(buf, null, schema, items.Take(count));
+ ReWriteBatch(buf, partitionId, schema, items.AsSpan(0, count), writer);
}
// Wait for the previous batch for this node to preserve item order.
await oldTask.ConfigureAwait(false);
- await sender(buf, count, partition, retryPolicy).ConfigureAwait(false);
+ await SendBatchAsync(table, buf, count, preferredNode, retryPolicy).ConfigureAwait(false);
return;
}
@@ -349,19 +347,6 @@
}
}
- void InitBuffer(Batch<T> batch)
- {
- var buf = batch.Buffer;
-
- var w = buf.MessageWriter;
- w.Write(schema.TableId);
- w.WriteTx(null);
- w.Write(schema.Version);
-
- batch.CountPos = buf.Position;
- buf.Advance(5); // Reserve count.
- }
-
async Task Drain()
{
foreach (var (partition, batch) in batches)
@@ -376,6 +361,64 @@
}
}
+ private static void InitBuffer<T>(Batch<T> batch, int partitionId, Schema schema)
+ {
+ var buf = batch.Buffer;
+ WriteBatchHeader(buf, partitionId, schema);
+
+ batch.CountPos = buf.Position;
+ buf.Advance(5); // Reserve count.
+ }
+
+ private static void WriteBatchHeader(PooledArrayBuffer buf, int partitionId, Schema schema)
+ {
+ var w = buf.MessageWriter;
+ w.Write(schema.TableId);
+ w.Write(partitionId);
+ w.WriteNil(); // Deleted rows bit set.
+ w.Write(schema.Version);
+ }
+
+ private static void ReWriteBatch<T>(
+ PooledArrayBuffer buf,
+ int partitionId,
+ Schema schema,
+ ReadOnlySpan<T> items,
+ RecordSerializer<T> writer)
+ {
+ buf.Reset();
+ WriteBatchHeader(buf, partitionId, schema);
+
+ var w = buf.MessageWriter;
+ w.Write(items.Length);
+
+ foreach (var item in items)
+ {
+ writer.Handler.Write(ref w, schema, item, keyOnly: false, computeHash: false);
+ }
+ }
+
+ private static async Task SendBatchAsync(
+ Table table,
+ PooledArrayBuffer buf,
+ int count,
+ PreferredNode preferredNode,
+ IRetryPolicy retryPolicy)
+ {
+ var (resBuf, socket) = await table.Socket.DoOutInOpAndGetSocketAsync(
+ ClientOp.StreamerBatchSend,
+ tx: null,
+ buf,
+ preferredNode,
+ retryPolicy)
+ .ConfigureAwait(false);
+
+ resBuf.Dispose();
+
+ Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
+ Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
+ }
+
private sealed record Batch<T>
{
public Batch(int capacity, Schema schema)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 9f60de1..7e545ab 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -299,24 +299,8 @@
CancellationToken cancellationToken = default) =>
await DataStreamer.StreamDataAsync(
data,
- sender: async (batch, count, preferredNode, retryPolicy) =>
- {
- var (resBuf, socket) = await _table.Socket.DoOutInOpAndGetSocketAsync(
- ClientOp.TupleUpsertAll,
- tx: null,
- batch,
- PreferredNode.FromName(preferredNode),
- retryPolicy)
- .ConfigureAwait(false);
-
- resBuf.Dispose();
-
- Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
- Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
- },
+ _table,
writer: _ser,
- schemaProvider: _table.GetSchemaAsync,
- partitionAssignmentProvider: () => _table.GetPartitionAssignmentAsync(),
options ?? DataStreamerOptions.Default,
cancellationToken).ConfigureAwait(false);
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index 41dcbcf..e364c23 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -61,6 +61,7 @@
ClientOperationType.ComputeCancel => false,
ClientOperationType.ComputeChangePriority => false,
ClientOperationType.ComputeGetStatus => true,
+ ClientOperationType.StreamerBatchSend => false,
var unsupported => throw new NotSupportedException("Unsupported operation type: " + unsupported)
};
}