fix: Replace Dictionary with ConcurrentDictionary in Consumer to fix KeyNotFoundException
- Replace Dictionary<string, SubConsumer<TMessage>> with ConcurrentDictionary to ensure thread safety
- Fix KeyNotFoundException that occurs in multi-threaded scenarios when accessing _subConsumers
- Add System.Collections.Concurrent using directive
- Minimal change with maximum impact for thread safety
Fixes: KeyNotFoundException in Consumer.Acknowledge method when multiple threads access the dictionary concurrently
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index a89f997..b941f6b 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -18,6 +18,7 @@
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using DotPulsar.Internal.Abstractions;
+using System.Collections.Concurrent;
using DotPulsar.Internal.Compression;
using DotPulsar.Internal.PulsarApi;
@@ -32,7 +33,7 @@
private readonly Executor _executor;
private readonly SemaphoreSlim _semaphoreSlim;
private readonly AsyncLock _lock;
- private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers;
+ private readonly ConcurrentDictionary<string, SubConsumer<TMessage>> _subConsumers;
private readonly TaskCompletionSource<IMessage<TMessage>> _neverEndingTask;
private SubConsumer<TMessage>[] _receivers;
private Task<IMessage<TMessage>>[] _receiveTasks;
@@ -79,7 +80,7 @@
_exceptionHandler = exceptionHandler;
_allSubConsumersAreReady = false;
_isDisposed = 0;
- _subConsumers = [];
+ _subConsumers = new ConcurrentDictionary<string, SubConsumer<TMessage>>();
_singleSubConsumer = null;
_ = Setup();