namespace Apache.Ignite.Internal
using System;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Net.Security;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Buffers;
using Ignite.Network;
using Microsoft.Extensions.Logging;
using Network;
using Proto;
using Proto.MsgPack;
/// <summary>
/// Wrapper over framework socket for Ignite thin client operations.
/// </summary>
// ReSharper disable SuggestBaseTypeForParameter (NetworkStream has more efficient read/write methods).
internal sealed partial class ClientSocket : IDisposable
/** General-purpose client type code. */
private const byte ClientType = 2;
/** Version 3.0.0. */
private static readonly ClientProtocolVersion Ver300 = new(3, 0, 0);
/** Current version. */
private static readonly ClientProtocolVersion CurrentProtocolVersion = Ver300;
/** Minimum supported heartbeat interval. */
private static readonly TimeSpan MinRecommendedHeartbeatInterval = TimeSpan.FromMilliseconds(500);
/** Socket id for debug logging. */
private static long _socketId;
/** Underlying stream. */
private readonly Stream _stream;
/** Current async operations, map from request id. */
private readonly ConcurrentDictionary<long, TaskCompletionSource<PooledBuffer>> _requests = new();
/** Current notification handlers, map from request id. */
private readonly ConcurrentDictionary<long, NotificationHandler> _notificationHandlers = new();
/** Requests can be sent by one thread at a time. */
Justification = "WaitHandle is not used in SemaphoreSlim, no need to dispose.")]
private readonly SemaphoreSlim _sendLock = new(initialCount: 1);
/** Cancellation token source that gets cancelled when this instance is disposed. */
Justification = "WaitHandle is not used in CancellationTokenSource, no need to dispose.")]
private readonly CancellationTokenSource _disposeTokenSource = new();
/** Dispose lock. */
private readonly object _disposeLock = new();
/** Heartbeat timer. */
private readonly Timer _heartbeatTimer;
/** Effective heartbeat interval. */
private readonly TimeSpan _heartbeatInterval;
/** Socket timeout for handshakes and heartbeats. */
private readonly TimeSpan _socketTimeout;
/** Operation timeout for user-initiated requests. */
private readonly TimeSpan _operationTimeout;
/** Logger. */
private readonly ILogger _logger;
/** Event listener. */
private readonly IClientSocketEventListener _listener;
/** Pre-allocated buffer for message size + op code + request id. To be used under <see cref="_sendLock"/>. */
private readonly byte[] _prefixBuffer = new byte[ProtoCommon.MessagePrefixSize];
/** Request id generator. */
private long _requestId;
/** Exception that caused this socket to close. */
private volatile Exception? _exception;
/// <summary>
/// Initializes a new instance of the <see cref="ClientSocket"/> class.
/// </summary>
/// <param name="stream">Network stream.</param>
/// <param name="configuration">Configuration.</param>
/// <param name="connectionContext">Connection context.</param>
/// <param name="listener">Event listener.</param>
/// <param name="logger">Logger.</param>
private ClientSocket(
Stream stream,
IgniteClientConfiguration configuration,
ConnectionContext connectionContext,
IClientSocketEventListener listener,
ILogger logger)
_stream = stream;
ConnectionContext = connectionContext;
_listener = listener;
_logger = logger;
_socketTimeout = configuration.SocketTimeout;
_operationTimeout = configuration.OperationTimeout;
MetricsContext = connectionContext.ClusterNode.MetricsContext ??
throw new InvalidOperationException("Metrics context is missing.");
_heartbeatInterval = GetHeartbeatInterval(configuration.HeartbeatInterval, connectionContext.IdleTimeout, _logger);
// ReSharper disable once AsyncVoidLambda (timer callback)
_heartbeatTimer = new Timer(
callback: async _ => await SendHeartbeatAsync().ConfigureAwait(false),
state: null,
dueTime: _heartbeatInterval,
period: TimeSpan.FromMilliseconds(-1));
// Because this call is not awaited, execution of the current method continues before the call is completed.
// Receive loop runs in the background and should not be awaited.
_ = RunReceiveLoop(_disposeTokenSource.Token);
/// <summary>
/// Gets a value indicating whether this socket is disposed.
/// </summary>
public bool IsDisposed => _disposeTokenSource.IsCancellationRequested;
/// <summary>
/// Gets the connection context.
/// </summary>
public ConnectionContext ConnectionContext { get; }
/// <summary>
/// Connects the socket to the specified endpoint and performs handshake.
/// </summary>
/// <param name="endPoint">Specific endpoint to connect to.</param>
/// <param name="configuration">Configuration.</param>
/// <param name="listener">Event listener.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
"CA2000:Dispose objects before losing scope",
Justification = "NetworkStream is returned from this method in the socket.")]
[SuppressMessage("Maintainability", "CA1508:Avoid dead conditional code", Justification = "False positive")]
[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Reviewed")]
public static async Task<ClientSocket> ConnectAsync(
SocketEndpoint endPoint,
IgniteClientConfiguration configuration,
IClientSocketEventListener listener)
using var cts = new CancellationTokenSource();
var logger = configuration.LoggerFactory.CreateLogger(typeof(ClientSocket).FullName! + "-" +
Interlocked.Increment(ref _socketId));
bool connected = false;
Socket? socket = null;
Stream? stream = null;
socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
NoDelay = true
await socket.ConnectAsync(endPoint.EndPoint, cts.Token)
.WaitAsync(configuration.SocketTimeout, cts.Token)
Metrics.ConnectionsEstablished.Add(1, endPoint.MetricsContext.Tags);
connected = true;
stream = new NetworkStream(socket, ownsSocket: true);
if (configuration.SslStreamFactory is { } sslStreamFactory &&
await sslStreamFactory.CreateAsync(stream, endPoint.Host, cts.Token)
.WaitAsync(configuration.SocketTimeout, cts.Token)
.ConfigureAwait(false) is { } sslStream)
stream = sslStream;
logger.LogSslConnectionEstablishedDebug(socket.RemoteEndPoint, sslStream.NegotiatedCipherSuite);
var context = await HandshakeAsync(stream, endPoint, configuration, listener, cts.Token)
.WaitAsync(configuration.SocketTimeout, cts.Token)
logger.LogHandshakeSucceededDebug(socket.RemoteEndPoint, context);
return new ClientSocket(stream, configuration, context, listener, logger);
catch (Exception ex)
if (stream != null)
await stream.DisposeAsync().ConfigureAwait(false);
catch (Exception disposeEx)
logger.LogFailedToDisposeSocketAfterFailedConnectionAttemptWarn(disposeEx, disposeEx.Message);
logger.LogConnectionFailedBeforeOrDuringHandshakeWarn(ex, endPoint.EndPoint, ex.Message);
if (ex.GetBaseException() is TimeoutException)
Metrics.HandshakesFailedTimeout.Add(1, endPoint.MetricsContext.Tags);
Metrics.HandshakesFailed.Add(1, endPoint.MetricsContext.Tags);
if (connected)
throw new IgniteClientConnectionException(
"Failed to connect to endpoint: " + endPoint.EndPoint,
/// <summary>
/// Performs an in-out operation.
/// </summary>
/// <param name="clientOp">Client op code.</param>
/// <param name="request">Request data.</param>
/// <param name="expectNotifications">Whether to expect notifications as a result of the operation.</param>
/// <returns>Response data.</returns>
public Task<PooledBuffer> DoOutInOpAsync(
ClientOp clientOp,
PooledArrayBuffer? request = null,
bool expectNotifications = false) =>
DoOutInOpAsyncInternal(clientOp, request, expectNotifications)
/// <inheritdoc/>
public void Dispose()
/// <summary>
/// Performs the handshake exchange.
/// </summary>
/// <param name="stream">Network stream.</param>
/// <param name="endPoint">Endpoint.</param>
/// <param name="configuration">Configuration.</param>
/// <param name="listener">Client socket event listener.</param>
/// <param name="cancellationToken">Cancellation token.</param>
private static async Task<ConnectionContext> HandshakeAsync(
Stream stream,
SocketEndpoint endPoint,
IgniteClientConfiguration configuration,
IClientSocketEventListener listener,
CancellationToken cancellationToken)
await stream.WriteAsync(ProtoCommon.MagicBytes, cancellationToken).ConfigureAwait(false);
await WriteHandshakeAsync(stream, CurrentProtocolVersion, configuration, endPoint.MetricsContext, cancellationToken)
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
await CheckMagicBytesAsync(stream, endPoint.MetricsContext, cancellationToken).ConfigureAwait(false);
using var response = await ReadResponseAsync(stream, new byte[4], endPoint.MetricsContext, CancellationToken.None)
return ReadHandshakeResponse(response.GetReader(), endPoint, GetSslInfo(stream), listener);
private static async ValueTask CheckMagicBytesAsync(
Stream stream,
MetricsContext metricsContext,
CancellationToken cancellationToken)
var responseMagic = ByteArrayPool.Rent(ProtoCommon.MagicBytes.Length);
await ReceiveBytesAsync(stream, responseMagic, ProtoCommon.MagicBytes.Length, metricsContext, cancellationToken)
for (var i = 0; i < ProtoCommon.MagicBytes.Length; i++)
if (responseMagic[i] != ProtoCommon.MagicBytes[i])
throw new IgniteClientConnectionException(
"Invalid magic bytes returned from the server: " + BitConverter.ToString(responseMagic));
private static ConnectionContext ReadHandshakeResponse(
MsgPackReader reader,
SocketEndpoint endPoint,
ISslInfo? sslInfo,
IClientSocketEventListener listener)
var serverVer = new ClientProtocolVersion(reader.ReadInt16(), reader.ReadInt16(), reader.ReadInt16());
if (serverVer != CurrentProtocolVersion)
throw new IgniteClientConnectionException(ErrorGroups.Client.Protocol, "Unexpected server version: " + serverVer);
if (!reader.TryReadNil())
throw ReadError(ref reader);
var idleTimeoutMs = reader.ReadInt64();
var clusterNodeId = reader.ReadString();
var clusterNodeName = reader.ReadString();
var clusterId = reader.ReadGuid();
var clusterName = reader.ReadString();
var observableTimestamp = reader.ReadInt64();
// Cluster version.
reader.Skip(); // Major.
reader.Skip(); // Minor.
reader.Skip(); // Maintenance.
reader.Skip(); // Patch.
reader.Skip(); // Pre-release.
reader.Skip(); // Features.
reader.Skip(); // Extensions.
return new ConnectionContext(
new ClusterNode(clusterNodeId, clusterNodeName, endPoint.EndPoint, endPoint.MetricsContext),
private static IgniteException ReadError(ref MsgPackReader reader)
Guid traceId = reader.TryReadNil() ? Guid.NewGuid() : reader.ReadGuid();
int code = reader.TryReadNil() ? 65537 : reader.ReadInt32();
string className = reader.ReadString();
string? message = reader.ReadStringNullable();
string? javaStackTrace = reader.ReadStringNullable();
var ex = ExceptionMapper.GetException(traceId, code, className, message, javaStackTrace);
int extensionCount = reader.TryReadNil() ? 0 : reader.ReadInt32();
for (int i = 0; i < extensionCount; i++)
var key = reader.ReadString();
if (key == ErrorExtensions.ExpectedSchemaVersion)
ex.Data[key] = reader.ReadInt32();
reader.Skip(); // Unknown extension - ignore.
return ex;
private static async ValueTask<PooledBuffer> ReadResponseAsync(
Stream stream,
byte[] messageSizeBytes,
MetricsContext metricsContext,
CancellationToken cancellationToken)
var size = await ReadMessageSizeAsync(stream, messageSizeBytes, metricsContext, cancellationToken).ConfigureAwait(false);
var bytes = ByteArrayPool.Rent(size);
await ReceiveBytesAsync(stream, bytes, size, metricsContext, cancellationToken).ConfigureAwait(false);
return new PooledBuffer(bytes, 0, size);
catch (Exception)
private static async Task<int> ReadMessageSizeAsync(
Stream stream,
byte[] buffer,
MetricsContext metricsContext,
CancellationToken cancellationToken)
const int messageSizeByteCount = 4;
Debug.Assert(buffer.Length >= messageSizeByteCount, "buffer.Length >= messageSizeByteCount");
await ReceiveBytesAsync(stream, buffer, messageSizeByteCount, metricsContext, cancellationToken).ConfigureAwait(false);
return ReadMessageSize(buffer);
private static async Task ReceiveBytesAsync(
Stream stream,
byte[] buffer,
int size,
MetricsContext metricsContext,
CancellationToken cancellationToken)
int received = 0;
while (received < size)
var res = await stream.ReadAsync(buffer.AsMemory(received, size - received), cancellationToken).ConfigureAwait(false);
if (res == 0)
// Disconnected.
throw new IgniteClientConnectionException(
"Connection lost (failed to read data from socket)",
new SocketException((int) SocketError.ConnectionAborted));
received += res;
AddBytesReceived(res, metricsContext);
private static async ValueTask WriteHandshakeAsync(
Stream stream,
ClientProtocolVersion version,
IgniteClientConfiguration configuration,
MetricsContext metricsContext,
CancellationToken token)
using var bufferWriter = new PooledArrayBuffer(prefixSize: ProtoCommon.MessagePrefixSize);
WriteHandshake(bufferWriter.MessageWriter, version, configuration);
// Prepend size.
var buf = bufferWriter.GetWrittenMemory();
var size = buf.Length - ProtoCommon.MessagePrefixSize;
var resBuf = buf.Slice(ProtoCommon.MessagePrefixSize - 4);
WriteMessageSize(resBuf, size);
await stream.WriteAsync(resBuf, token).ConfigureAwait(false);
AddBytesSent(resBuf.Length + ProtoCommon.MagicBytes.Length, metricsContext);
private static void WriteHandshake(MsgPackWriter w, ClientProtocolVersion version, IgniteClientConfiguration configuration)
// Version.
w.Write(ClientType); // Client type: general purpose.
w.WriteBinaryHeader(0); // Features.
if (configuration.Authenticator != null)
w.Write(3); // Extensions.
w.Write(0); // Extensions.
private static void WriteMessageSize(Memory<byte> target, int size) =>
BinaryPrimitives.WriteInt32BigEndian(target.Span, size);
private static int ReadMessageSize(Span<byte> responseLenBytes) => BinaryPrimitives.ReadInt32BigEndian(responseLenBytes);
private static TimeSpan GetHeartbeatInterval(TimeSpan configuredInterval, TimeSpan serverIdleTimeout, ILogger logger)
if (configuredInterval <= TimeSpan.Zero)
throw new IgniteClientException(
$"{nameof(IgniteClientConfiguration)}.{nameof(IgniteClientConfiguration.HeartbeatInterval)} should be greater than zero.");
if (serverIdleTimeout <= TimeSpan.Zero)
return configuredInterval;
var recommendedHeartbeatInterval = serverIdleTimeout / 3;
if (recommendedHeartbeatInterval < MinRecommendedHeartbeatInterval)
recommendedHeartbeatInterval = MinRecommendedHeartbeatInterval;
if (configuredInterval < recommendedHeartbeatInterval)
logger.LogServerSideIdleTimeoutIgnoredInfo(serverIdleTimeout, configuredInterval);
return configuredInterval;
serverIdleTimeout, configuredInterval, recommendedHeartbeatInterval);
return recommendedHeartbeatInterval;
private static ISslInfo? GetSslInfo(Stream stream) =>
stream is SslStream sslStream
? new SslInfo(
: null;
private async Task<PooledBuffer> DoOutInOpAsyncInternal(
ClientOp clientOp,
PooledArrayBuffer? request = null,
bool expectNotifications = false)
var ex = _exception;
if (ex != null)
throw new IgniteClientConnectionException(
"Socket is closed due to an error, examine inner exception for details.",
if (_disposeTokenSource.IsCancellationRequested)
throw new IgniteClientConnectionException(
"Socket is disposed.",
new ObjectDisposedException(nameof(ClientSocket)));
var requestId = Interlocked.Increment(ref _requestId);
var taskCompletionSource = new TaskCompletionSource<PooledBuffer>();
_requests[requestId] = taskCompletionSource;
NotificationHandler? notificationHandler = null;
if (expectNotifications)
notificationHandler = new NotificationHandler();
_notificationHandlers[requestId] = notificationHandler;
await SendRequestAsync(request, clientOp, requestId).ConfigureAwait(false);
PooledBuffer resBuf = await taskCompletionSource.Task.ConfigureAwait(false);
resBuf.Metadata = notificationHandler;
return resBuf;
catch (Exception e)
if (_requests.TryRemove(requestId, out _))
_notificationHandlers.TryRemove(requestId, out _);
if (e is OperationCanceledException or ObjectDisposedException)
// Canceled task means Dispose was called.
throw new IgniteClientConnectionException(ErrorGroups.Client.Connection, "Connection closed.", e);
Justification = "Any exception during socket write should be handled to close the socket.")]
private async ValueTask SendRequestAsync(PooledArrayBuffer? request, ClientOp op, long requestId)
// Reset heartbeat timer - don't sent heartbeats when connection is active anyway.
_heartbeatTimer.Change(dueTime: _heartbeatInterval, period: TimeSpan.FromMilliseconds(-1));
_logger.LogSendingRequestTrace(requestId, op, ConnectionContext.ClusterNode.Address);
await _sendLock.WaitAsync(_disposeTokenSource.Token).ConfigureAwait(false);
var prefixMem = _prefixBuffer.AsMemory()[4..];
var prefixSize = MsgPackWriter.WriteUnsigned(prefixMem.Span, (ulong)op);
prefixSize += MsgPackWriter.WriteUnsigned(prefixMem[prefixSize..].Span, (ulong)requestId);
if (request != null)
var requestBuf = request.GetWrittenMemory();
WriteMessageSize(_prefixBuffer, prefixSize + requestBuf.Length - ProtoCommon.MessagePrefixSize);
var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
var requestBufStart = ProtoCommon.MessagePrefixSize - prefixBytes.Length;
var requestBufWithPrefix = requestBuf.Slice(requestBufStart);
// Copy prefix to request buf to avoid extra WriteAsync call for the prefix.
await _stream.WriteAsync(requestBufWithPrefix, _disposeTokenSource.Token).ConfigureAwait(false);
// Request without body, send only the prefix.
WriteMessageSize(_prefixBuffer, prefixSize);
var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
await _stream.WriteAsync(prefixBytes, _disposeTokenSource.Token).ConfigureAwait(false);
Metrics.RequestsSent.Add(1, MetricsContext.Tags);
catch (Exception e)
var message = "Exception while writing to socket, connection closed: " + e.Message;
_logger.LogSocketIoError(e, message);
var connEx = new IgniteClientConnectionException(ErrorGroups.Client.Connection, message, new SocketException());
throw connEx;
Justification = "Any exception in receive loop should be handled.")]
private async Task RunReceiveLoop(CancellationToken cancellationToken)
// Reuse the same array for all responses.
var messageSizeBytes = new byte[4];
while (!cancellationToken.IsCancellationRequested)
PooledBuffer response = await ReadResponseAsync(
_stream, messageSizeBytes, MetricsContext, cancellationToken).ConfigureAwait(false);
// Invoke response handler in another thread to continue the receive loop.
// Response buffer should be disposed by the task handler.
ThreadPool.QueueUserWorkItem<(ClientSocket Socket, PooledBuffer Buf)>(
callBack: static r => r.Socket.HandleResponse(r.Buf),
state: (this, response),
preferLocal: true);
catch (Exception e)
var message = "Exception while reading from socket, connection closed: " + e.Message;
_logger.LogSocketIoError(e, message);
Dispose(new IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Thread root.")]
private void HandleResponse(PooledBuffer response)
bool handled = false;
handled = HandleResponseInner(response);
catch (IgniteClientConnectionException e)
catch (Exception e)
var message = "Exception while handling response, connection closed: " + e.Message;
Dispose(new IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
if (!handled)
/// <summary>
/// Handles a server response.
/// </summary>
/// <param name="response">Response buffer.</param>
/// <returns>
/// A value indicating whether the response buffer was passed on to the final handler and does not need to be disposed.
/// </returns>
private bool HandleResponseInner(PooledBuffer response)
var reader = response.GetReader();
var requestId = reader.ReadInt64();
var flags = (ResponseFlags)reader.ReadInt32();
_logger.LogReceivedResponseTrace(requestId, flags, ConnectionContext.ClusterNode.Address);
HandlePartitionAssignmentChange(flags, ref reader);
HandleObservableTimestamp(ref reader);
var exception = flags.HasFlag(ResponseFlags.Error) ? ReadError(ref reader) : null;
response.Position += reader.Consumed;
if (flags.HasFlag(ResponseFlags.Notification))
return HandleNotification(requestId, exception, response);
if (!_requests.TryRemove(requestId, out var taskCompletionSource))
var message = $"Unexpected response ID ({requestId}) received from the server " +
$"[remoteAddress={ConnectionContext.ClusterNode.Address}], closing the socket.";
_logger.LogUnexpectedResponseIdError(null, message);
throw new IgniteClientConnectionException(ErrorGroups.Client.Protocol, message);
if (exception != null)
return false;
Metrics.RequestsCompleted.Add(1, MetricsContext.Tags);
return taskCompletionSource.TrySetResult(response);
/// <summary>
/// Handles a server notification.
/// </summary>
/// <param name="requestId">Request id.</param>
/// <param name="exception">Exception.</param>
/// <param name="response">Response buffer.</param>
/// <returns>
/// A value indicating whether the response buffer was passed on to the final handler and does not need to be disposed.
/// </returns>
private bool HandleNotification(long requestId, Exception? exception, PooledBuffer response)
if (!_notificationHandlers.TryRemove(requestId, out var notificationHandler))
var message = $"Unexpected notification ID ({requestId}) received from the server " +
$"[remoteAddress={ConnectionContext.ClusterNode.Address}], closing the socket.";
_logger.LogUnexpectedResponseIdError(null, message);
throw new IgniteClientConnectionException(ErrorGroups.Client.Protocol, message);
if (exception != null)
return false;
return notificationHandler.TrySetResult(response);
private void HandleObservableTimestamp(ref MsgPackReader reader)
var observableTimestamp = reader.ReadInt64();
private void HandlePartitionAssignmentChange(ResponseFlags flags, ref MsgPackReader reader)
if (flags.HasFlag(ResponseFlags.PartitionAssignmentChanged))
long timestamp = reader.ReadInt64();
_logger.LogPartitionAssignmentChangeNotificationInfo(ConnectionContext.ClusterNode.Address, timestamp);
/// <summary>
/// Sends heartbeat message.
/// </summary>
Justification = "Any heartbeat exception should cause this instance to be disposed with an error.")]
private async Task SendHeartbeatAsync()
using var buf = await DoOutInOpAsync(ClientOp.Heartbeat).WaitAsync(_socketTimeout).ConfigureAwait(false);
catch (Exception e)
var message = "Heartbeat failed: " + e.Message;
_logger.LogHeartbeatError(e, message);
Dispose(new IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
/// <summary>
/// Disposes this socket and completes active requests with the specified exception.
/// </summary>
/// <param name="ex">Exception that caused this socket to close. Null when socket is closed by the user.</param>
private void Dispose(Exception? ex)
lock (_disposeLock)
if (_disposeTokenSource.IsCancellationRequested)
if (ex != null)
_logger.LogConnectionClosedWithErrorWarn(ex, ConnectionContext.ClusterNode.Address, ex.Message);
Metrics.ConnectionsLost.Add(1, MetricsContext.Tags);
if (ex.GetBaseException() is TimeoutException)
Metrics.ConnectionsLostTimeout.Add(1, MetricsContext.Tags);
_exception = ex;
ex ??= new IgniteClientConnectionException(ErrorGroups.Client.Connection, "Connection closed.");
while (!_requests.IsEmpty)
foreach (var reqId in _requests.Keys.ToArray())
if (_requests.TryRemove(reqId, out var req))
while (!_notificationHandlers.IsEmpty)
foreach (var reqId in _notificationHandlers.Keys.ToArray())
if (_notificationHandlers.TryRemove(reqId, out var notificationHandler))