IGNITE-21931 .NET: Refactor DataStreamer to use StreamerBatchSend (#3546)

* Refactor streamer to batch by partition instead of by node
* Use `StreamerBatchSend` instead of `TupleUpsertAll`
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 4849b0d..6dfe0f9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -115,7 +115,7 @@
 
         public Dictionary<string, object?> LastSqlScriptProps { get; private set; } = new();
 
-        public long UpsertAllRowCount { get; set; }
+        public long StreamerRowCount { get; set; }
 
         public long DroppedConnectionCount { get; set; }
 
@@ -281,11 +281,6 @@
                             Thread.Sleep(MultiRowOperationDelayPerRow * count);
                         }
 
-                        if (opCode == ClientOp.TupleUpsertAll)
-                        {
-                            UpsertAllRowCount += count;
-                        }
-
                         Send(handler, requestId, new byte[] { 1, 0 }.AsMemory());
                         continue;
 
@@ -334,6 +329,13 @@
                         Thread.Sleep(HeartbeatDelay);
                         Send(handler, requestId, Array.Empty<byte>());
                         continue;
+
+                    case ClientOp.StreamerBatchSend:
+                        reader.Skip(4);
+                        StreamerRowCount += reader.ReadInt32();
+
+                        Send(handler, requestId, Array.Empty<byte>());
+                        continue;
                 }
 
                 // Fake error message for any other op code.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index c17bfa4..c3c7059 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -112,7 +112,7 @@
     public async Task TestDataStreamerReceivesPartitionAssignmentUpdates() =>
         await TestClientReceivesPartitionAssignmentUpdates(
             view => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
-            ClientOp.TupleUpsertAll);
+            ClientOp.StreamerBatchSend);
 
     [Test]
     [TestCaseSource(nameof(KeyNodeCases))]
@@ -138,7 +138,7 @@
         await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key), ClientOp.TupleReplaceExact, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteAsync(null, key), ClientOp.TupleDelete, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key), ClientOp.TupleDeleteExact, expectedNode);
-        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
 
         // Multi-key operations use the first key for colocation.
         var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new IgniteTuple { ["ID"] = keyId + 1 } };
@@ -172,7 +172,7 @@
         await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key), ClientOp.TupleReplaceExact, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteAsync(null, key), ClientOp.TupleDelete, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key), ClientOp.TupleDeleteExact, expectedNode);
-        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
 
         // Multi-key operations use the first key for colocation.
         var keys = new[] { key, key - 1, key + 1 };
@@ -218,7 +218,7 @@
         await AssertOpOnNode(() => kvView.PutAllAsync(null, pairs), ClientOp.TupleUpsertAll, expectedNode);
         await AssertOpOnNode(() => kvView.RemoveAllAsync(null, keys), ClientOp.TupleDeleteAll, expectedNode);
         await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs), ClientOp.TupleDeleteAllExact, expectedNode);
-        await AssertOpOnNode(() => kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+        await AssertOpOnNode(() => kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
     }
 
     [Test]
@@ -248,7 +248,7 @@
         await AssertOpOnNode(() => kvView.ContainsAsync(null, key), ClientOp.TupleContainsKey, expectedNode);
         await AssertOpOnNode(
             () => kvView.StreamDataAsync(new[] { new KeyValuePair<int, int>(key, val) }.ToAsyncEnumerable()),
-            ClientOp.TupleUpsertAll,
+            ClientOp.StreamerBatchSend,
             expectedNode);
 
         // Multi-key operations use the first key for colocation.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index d509c6f..15b9e6a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -149,7 +149,7 @@
     public async Task TestRetryLimitExhausted()
     {
         using var server = new FakeServer(
-            shouldDropConnection: ctx => ctx is { OpCode: ClientOp.TupleUpsertAll, RequestCount: > 7 });
+            shouldDropConnection: ctx => ctx is { OpCode: ClientOp.StreamerBatchSend, RequestCount: > 7 });
 
         using var client = await server.ConnectClientAsync();
         var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
@@ -157,7 +157,7 @@
         var ex = Assert.ThrowsAsync<IgniteClientConnectionException>(
             async () => await table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(10_000)));
 
-        StringAssert.StartsWith("Operation TupleUpsertAll failed after 16 retries", ex!.Message);
+        StringAssert.StartsWith("Operation StreamerBatchSend failed after 16 retries", ex!.Message);
     }
 
     [Test]
@@ -167,7 +167,7 @@
         int upsertIdx = 0;
 
         using var server = new FakeServer(
-            shouldDropConnection: ctx => ctx.OpCode == ClientOp.TupleUpsertAll && Interlocked.Increment(ref upsertIdx) % 2 == 1);
+            shouldDropConnection: ctx => ctx.OpCode == ClientOp.StreamerBatchSend && Interlocked.Increment(ref upsertIdx) % 2 == 1);
 
         // Streamer has it's own retry policy, so we can disable retries on the client.
         using var client = await server.ConnectClientAsync(new IgniteClientConfiguration
@@ -178,7 +178,7 @@
         var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
         await table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(count));
 
-        Assert.AreEqual(count, server.UpsertAllRowCount);
+        Assert.AreEqual(count, server.StreamerRowCount);
         Assert.That(server.DroppedConnectionCount, Is.GreaterThanOrEqualTo(count / DataStreamerOptions.Default.PageSize));
     }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 0c9c64e..22ac237 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -356,7 +356,7 @@
         async IAsyncEnumerable<IIgniteTuple> GetData()
         {
             // First set of batches uses old schema.
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < 20; i++)
             {
                 yield return GetTuple(i);
             }
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 43e7935..0a1d3ee 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -144,6 +144,11 @@
         /// <summary>
         /// Change compute job priority (<see cref="IJobExecution{T}.ChangePriorityAsync"/>).
         /// </summary>
-        ComputeChangePriority
+        ComputeChangePriority,
+
+        /// <summary>
+        /// Send data streamer batch (<see cref="IDataStreamerTarget{T}.StreamDataAsync"/>).
+        /// </summary>
+        StreamerBatchSend
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 80b3cec..7a1e804 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -128,6 +128,9 @@
         ComputeCancel = 60,
 
         /** Change compute job priority. */
-        ComputeChangePriority = 61
+        ComputeChangePriority = 61,
+
+        /** Send streamer batch. */
+        StreamerBatchSend = 62
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index abab50c..0648e0f 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -68,6 +68,7 @@
                 ClientOp.ClusterGetNodes => null,
                 ClientOp.PartitionAssignmentGet => null,
                 ClientOp.SqlParamMeta => null,
+                ClientOp.StreamerBatchSend => ClientOperationType.StreamerBatchSend,
 
                 // Do not return null from default arm intentionally so we don't forget to update this when new ClientOp values are added.
                 // ReSharper disable once PatternIsRedundant
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index ec11ab3..59655b0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -22,7 +22,6 @@
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Diagnostics.CodeAnalysis;
-using System.Linq;
 using System.Runtime.InteropServices;
 using System.Threading;
 using System.Threading.Tasks;
@@ -54,20 +53,16 @@
     /// Streams the data.
     /// </summary>
     /// <param name="data">Data.</param>
-    /// <param name="sender">Batch sender.</param>
+    /// <param name="table">Table.</param>
     /// <param name="writer">Item writer.</param>
-    /// <param name="schemaProvider">Schema provider.</param>
-    /// <param name="partitionAssignmentProvider">Partitioner.</param>
     /// <param name="options">Options.</param>
     /// <param name="cancellationToken">Cancellation token.</param>
     /// <typeparam name="T">Element type.</typeparam>
     /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
     internal static async Task StreamDataAsync<T>(
         IAsyncEnumerable<T> data,
-        Func<PooledArrayBuffer, int, string, IRetryPolicy, Task> sender,
+        Table table,
         RecordSerializer<T> writer,
-        Func<int?, Task<Schema>> schemaProvider, // Not a ValueTask because Tasks are cached.
-        Func<ValueTask<string?[]>> partitionAssignmentProvider,
         DataStreamerOptions options,
         CancellationToken cancellationToken)
     {
@@ -90,12 +85,16 @@
 
         // ConcurrentDictionary is not necessary because we consume the source sequentially.
         // However, locking for batches is required due to auto-flush background task.
-        var batches = new Dictionary<string, Batch<T>>();
+        var batches = new Dictionary<int, Batch<T>>();
         var retryPolicy = new RetryLimitPolicy { RetryLimit = options.RetryLimit };
 
-        var schema = await schemaProvider(null).ConfigureAwait(false);
-        var partitionAssignment = await partitionAssignmentProvider().ConfigureAwait(false);
+        var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
+
+        var partitionAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
+        var partitionCount = partitionAssignment.Length; // Can't be changed.
+        Debug.Assert(partitionCount > 0, "partitionCount > 0");
         var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
+
         using var flushCts = new CancellationTokenSource();
 
         try
@@ -116,7 +115,7 @@
 
                 if (lastPartitionsAssignmentCheck.Elapsed > PartitionAssignmentUpdateFrequency)
                 {
-                    var newAssignment = await partitionAssignmentProvider().ConfigureAwait(false);
+                    var newAssignment = await table.GetPartitionAssignmentAsync().ConfigureAwait(false);
 
                     if (newAssignment != partitionAssignment)
                     {
@@ -146,7 +145,7 @@
 
         return;
 
-        async ValueTask<(Batch<T> Batch, string Partition)> AddWithRetryUnmapped(T item)
+        async ValueTask<(Batch<T> Batch, int Partition)> AddWithRetryUnmapped(T item)
         {
             try
             {
@@ -154,12 +153,12 @@
             }
             catch (Exception e) when (e.CausedByUnmappedColumns())
             {
-                schema = await schemaProvider(Table.SchemaVersionForceLatest).ConfigureAwait(false);
+                schema = await table.GetSchemaAsync(Table.SchemaVersionForceLatest).ConfigureAwait(false);
                 return Add(item);
             }
         }
 
-        (Batch<T> Batch, string Partition) Add(T item)
+        (Batch<T> Batch, int Partition) Add(T item)
         {
             var schema0 = schema;
             var tupleBuilder = new BinaryTupleBuilder(schema0.Columns.Length, hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
@@ -174,7 +173,7 @@
             }
         }
 
-        (Batch<T> Batch, string Partition) Add0(T item, ref BinaryTupleBuilder tupleBuilder, Schema schema0)
+        (Batch<T> Batch, int Partition) Add0(T item, ref BinaryTupleBuilder tupleBuilder, Schema schema0)
         {
             var columnCount = schema0.Columns.Length;
 
@@ -185,11 +184,8 @@
 
             writer.Handler.Write(ref tupleBuilder, item, schema0, keyOnly: false, noValueSetRef);
 
-            // ReSharper disable once AccessToModifiedClosure (reviewed)
-            var partitionAssignment0 = partitionAssignment;
-            var partition = partitionAssignment0[Math.Abs(tupleBuilder.GetHash() % partitionAssignment0.Length)] ?? string.Empty;
-
-            var batch = GetOrCreateBatch(partition);
+            var partitionId = Math.Abs(tupleBuilder.GetHash() % partitionCount);
+            var batch = GetOrCreateBatch(partitionId);
 
             lock (batch)
             {
@@ -214,17 +210,17 @@
 
             Metrics.StreamerItemsQueuedIncrement();
 
-            return (batch, partition);
+            return (batch, partitionId);
         }
 
-        Batch<T> GetOrCreateBatch(string partition)
+        Batch<T> GetOrCreateBatch(int partitionId)
         {
-            ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
+            ref var batchRef = ref CollectionsMarshal.GetValueRefOrAddDefault(batches, partitionId, out _);
 
             if (batchRef == null)
             {
                 batchRef = new Batch<T>(options.PageSize, schema);
-                InitBuffer(batchRef);
+                InitBuffer(batchRef, partitionId, schema);
 
                 Metrics.StreamerBatchesActiveIncrement();
             }
@@ -232,7 +228,7 @@
             return batchRef;
         }
 
-        async Task SendAsync(Batch<T> batch, string partition)
+        async Task SendAsync(Batch<T> batch, int partitionId)
         {
             var expectedSize = batch.Count;
 
@@ -253,12 +249,12 @@
                 buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
                 buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
 
-                batch.Task = SendAndDisposeBufAsync(buf, partition, batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
+                batch.Task = SendAndDisposeBufAsync(buf, partitionId, batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
 
                 batch.Items = ArrayPool<T>.Shared.Rent(options.PageSize);
                 batch.Count = 0;
                 batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf will be disposed in SendAndDisposeBufAsync.
-                InitBuffer(batch);
+                InitBuffer(batch, partitionId, schema);
                 batch.LastFlush = Stopwatch.GetTimestamp();
                 batch.Schema = schema;
                 batch.SchemaOutdated = false;
@@ -269,7 +265,7 @@
 
         async Task SendAndDisposeBufAsync(
             PooledArrayBuffer buf,
-            string partition,
+            int partitionId,
             Task oldTask,
             T[] items,
             int count,
@@ -280,10 +276,13 @@
             if (batchSchemaOutdated)
             {
                 // Schema update was detected while the batch was being filled.
-                buf.Reset();
-                writer.WriteMultiple(buf, null, schema, items.Take(count));
+                // Re-serialize the whole batch.
+                ReWriteBatch(buf, partitionId, schema, items.AsSpan(0, count), writer);
             }
 
+            // ReSharper disable once AccessToModifiedClosure
+            var preferredNode = PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
+
             try
             {
                 int? schemaVersion = null;
@@ -296,17 +295,16 @@
                             // Might be updated by another batch.
                             if (schema.Version != schemaVersion)
                             {
-                                schema = await schemaProvider(schemaVersion).ConfigureAwait(false);
+                                schema = await table.GetSchemaAsync(schemaVersion).ConfigureAwait(false);
                             }
 
                             // Serialize again with the new schema.
-                            buf.Reset();
-                            writer.WriteMultiple(buf, null, schema, items.Take(count));
+                            ReWriteBatch(buf, partitionId, schema, items.AsSpan(0, count), writer);
                         }
 
                         // Wait for the previous batch for this node to preserve item order.
                         await oldTask.ConfigureAwait(false);
-                        await sender(buf, count, partition, retryPolicy).ConfigureAwait(false);
+                        await SendBatchAsync(table, buf, count, preferredNode, retryPolicy).ConfigureAwait(false);
 
                         return;
                     }
@@ -349,19 +347,6 @@
             }
         }
 
-        void InitBuffer(Batch<T> batch)
-        {
-            var buf = batch.Buffer;
-
-            var w = buf.MessageWriter;
-            w.Write(schema.TableId);
-            w.WriteTx(null);
-            w.Write(schema.Version);
-
-            batch.CountPos = buf.Position;
-            buf.Advance(5); // Reserve count.
-        }
-
         async Task Drain()
         {
             foreach (var (partition, batch) in batches)
@@ -376,6 +361,64 @@
         }
     }
 
+    private static void InitBuffer<T>(Batch<T> batch, int partitionId, Schema schema)
+    {
+        var buf = batch.Buffer;
+        WriteBatchHeader(buf, partitionId, schema);
+
+        batch.CountPos = buf.Position;
+        buf.Advance(5); // Reserve count.
+    }
+
+    private static void WriteBatchHeader(PooledArrayBuffer buf, int partitionId, Schema schema)
+    {
+        var w = buf.MessageWriter;
+        w.Write(schema.TableId);
+        w.Write(partitionId);
+        w.WriteNil(); // Deleted rows bit set.
+        w.Write(schema.Version);
+    }
+
+    private static void ReWriteBatch<T>(
+        PooledArrayBuffer buf,
+        int partitionId,
+        Schema schema,
+        ReadOnlySpan<T> items,
+        RecordSerializer<T> writer)
+    {
+        buf.Reset();
+        WriteBatchHeader(buf, partitionId, schema);
+
+        var w = buf.MessageWriter;
+        w.Write(items.Length);
+
+        foreach (var item in items)
+        {
+            writer.Handler.Write(ref w, schema, item, keyOnly: false, computeHash: false);
+        }
+    }
+
+    private static async Task SendBatchAsync(
+        Table table,
+        PooledArrayBuffer buf,
+        int count,
+        PreferredNode preferredNode,
+        IRetryPolicy retryPolicy)
+    {
+        var (resBuf, socket) = await table.Socket.DoOutInOpAndGetSocketAsync(
+                ClientOp.StreamerBatchSend,
+                tx: null,
+                buf,
+                preferredNode,
+                retryPolicy)
+            .ConfigureAwait(false);
+
+        resBuf.Dispose();
+
+        Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
+        Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
+    }
+
     private sealed record Batch<T>
     {
         public Batch(int capacity, Schema schema)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 9f60de1..7e545ab 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -299,24 +299,8 @@
             CancellationToken cancellationToken = default) =>
             await DataStreamer.StreamDataAsync(
                 data,
-                sender: async (batch, count, preferredNode, retryPolicy) =>
-                {
-                    var (resBuf, socket) = await _table.Socket.DoOutInOpAndGetSocketAsync(
-                            ClientOp.TupleUpsertAll,
-                            tx: null,
-                            batch,
-                            PreferredNode.FromName(preferredNode),
-                            retryPolicy)
-                        .ConfigureAwait(false);
-
-                    resBuf.Dispose();
-
-                    Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
-                    Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
-                },
+                _table,
                 writer: _ser,
-                schemaProvider: _table.GetSchemaAsync,
-                partitionAssignmentProvider: () => _table.GetPartitionAssignmentAsync(),
                 options ?? DataStreamerOptions.Default,
                 cancellationToken).ConfigureAwait(false);
 
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index 41dcbcf..e364c23 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -61,6 +61,7 @@
                 ClientOperationType.ComputeCancel => false,
                 ClientOperationType.ComputeChangePriority => false,
                 ClientOperationType.ComputeGetStatus => true,
+                ClientOperationType.StreamerBatchSend => false,
                 var unsupported => throw new NotSupportedException("Unsupported operation type: " + unsupported)
             };
         }