Made (partitioned) producer more resilient.
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 7bce3be..6079994 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -16,6 +16,7 @@
 {
     using Abstractions;
     using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
     using DotPulsar.Extensions;
     using DotPulsar.Internal.Extensions;
     using DotPulsar.Internal.PulsarApi;
@@ -24,7 +25,7 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Producer<TMessage> : IProducer<TMessage>
+    public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
     {
         private readonly StateManager<ProducerState> _state;
         private readonly IConnectionPool _connectionPool;
@@ -35,8 +36,10 @@
         private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
         private readonly IMessageRouter _messageRouter;
         private readonly CancellationTokenSource _cts;
+        private readonly IExecute _executor;
         private int _isDisposed;
         private int _producerCount;
+        private Exception? _throw;
 
         public Uri ServiceUrl { get; }
         public string Topic { get; }
@@ -60,74 +63,84 @@
             _processManager = processManager;
             _messageRouter = options.MessageRouter;
             _cts = new CancellationTokenSource();
+            _executor = new Executor(Guid.Empty, this, _exceptionHandler);
             _producers = new ConcurrentDictionary<int, IProducer<TMessage>>();
-            _ = Monitor();
+            _ = Setup();
         }
 
-        private async Task Monitor()
+        private async Task Setup()
         {
             await Task.Yield();
 
             try
             {
-                var numberOfPartitions = await GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
-                var isPartitionedTopic = numberOfPartitions != 0;
-                var monitoringTasks = new Task<ProducerStateChanged>[isPartitionedTopic ? numberOfPartitions : 1];
+                await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
+            }
+            catch (Exception exception)
+            {
+                if (_cts.IsCancellationRequested)
+                    return;
 
-                var topic = Topic;
+                _throw = exception;
+                _state.SetState(ProducerState.Faulted);
+            }
+        }
 
-                for (var partition = 0; partition < numberOfPartitions; ++partition)
+        private async Task Monitor()
+        {
+            var numberOfPartitions = await GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
+            var isPartitionedTopic = numberOfPartitions != 0;
+            var monitoringTasks = new Task<ProducerStateChanged>[isPartitionedTopic ? numberOfPartitions : 1];
+
+            var topic = Topic;
+
+            for (var partition = 0; partition < monitoringTasks.Length; ++partition)
+            {
+                if (isPartitionedTopic)
+                    topic = $"{Topic}-partition-{partition}";
+
+                var producer = CreateSubProducer(topic);
+                _ = _producers.TryAdd(partition, producer);
+                monitoringTasks[partition] = producer.StateChangedFrom(ProducerState.Disconnected, _cts.Token).AsTask();
+            }
+
+            Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
+
+            var connectedProducers = 0;
+
+            while (true)
+            {
+                await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
+
+                for (var i = 0; i < monitoringTasks.Length; ++i)
                 {
-                    if (isPartitionedTopic)
-                        topic = $"{Topic}-partition-{partition}";
+                    var task = monitoringTasks[i];
+                    if (!task.IsCompleted)
+                        continue;
 
-                    var producer = CreateSubProducer(topic);
-                    _ = _producers.TryAdd(partition, producer);
-                    monitoringTasks[partition] = producer.StateChangedFrom(ProducerState.Disconnected, _cts.Token).AsTask();
-                }
-
-                Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
-
-                var connectedProducers = 0;
-
-                while (true)
-                {
-                    await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
-
-                    for (var i = 0; i < monitoringTasks.Length; ++i)
+                    var state = task.Result.ProducerState;
+                    switch (state)
                     {
-                        var task = monitoringTasks[i];
-                        if (!task.IsCompleted)
-                            continue;
-
-                        var state = task.Result.ProducerState;
-                        switch (state)
-                        {
-                            case ProducerState.Connected:
-                                ++connectedProducers;
-                                break;
-                            case ProducerState.Disconnected:
-                                --connectedProducers;
-                                break;
-                            case ProducerState.Faulted:
-                                throw new Exception("SubProducer faulted");
-                        }
-
-                        monitoringTasks[i] = task.Result.Producer.StateChangedFrom(state, _cts.Token).AsTask();
+                        case ProducerState.Connected:
+                            ++connectedProducers;
+                            break;
+                        case ProducerState.Disconnected:
+                            --connectedProducers;
+                            break;
+                        case ProducerState.Faulted:
+                            _state.SetState(ProducerState.Faulted);
+                            return;
                     }
 
-                    if (connectedProducers == 0)
-                        _state.SetState(ProducerState.Disconnected);
-                    else if (connectedProducers == numberOfPartitions)
-                        _state.SetState(ProducerState.Connected);
-                    else
-                        _state.SetState(ProducerState.PartiallyConnected);
+                    monitoringTasks[i] = task.Result.Producer.StateChangedFrom(state, _cts.Token).AsTask();
                 }
-            }
-            catch
-            {
-                if (!_cts.IsCancellationRequested)
-                    _state.SetState(ProducerState.Faulted);
+
+                if (connectedProducers == 0)
+                    _state.SetState(ProducerState.Disconnected);
+                else if (connectedProducers == monitoringTasks.Length)
+                    _state.SetState(ProducerState.Connected);
+                else
+                    _state.SetState(ProducerState.PartiallyConnected);
             }
         }
 
@@ -182,18 +195,24 @@
             _cts.Cancel();
             _cts.Dispose();
 
+            _state.SetState(ProducerState.Closed);
+
             foreach (var producer in _producers.Values)
             {
                 await producer.DisposeAsync().ConfigureAwait(false);
             }
-
-            _state.SetState(ProducerState.Closed);
         }
 
         private async ValueTask<int> ChoosePartitions(DotPulsar.MessageMetadata? metadata, CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             if (_producerCount == 0)
-                await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false);
+            {
+                _ = await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false);
+                if (_throw is not null)
+                    throw _throw;
+            }
 
             return _messageRouter.ChoosePartition(metadata, _producerCount);
         }
@@ -203,5 +222,13 @@
 
         public async ValueTask<MessageId> Send(DotPulsar.MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
             => await _producers[await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false)].Send(message, cancellationToken).ConfigureAwait(false);
+
+        private void ThrowIfDisposed()
+        {
+            if (_isDisposed != 0)
+                throw new ProducerDisposedException(GetType().FullName!);
+        }
+
+        public void Register(IEvent @event) { }
     }
 }
diff --git a/src/DotPulsar/Internal/SubProducer.cs b/src/DotPulsar/Internal/SubProducer.cs
index 5d15b0e..78022d0 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -152,7 +152,7 @@
         private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-                throw new ProducerDisposedException(GetType().FullName!);
+                throw new ProducerDisposedException(typeof(Producer<TMessage>).FullName!);
         }
     }
 }