Refactoring partitioned producer. Still work in progress (need more resilience and testing)
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs b/src/DotPulsar/Internal/Abstractions/Process.cs
index 4c0d3d8..056471a 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -68,17 +68,11 @@
                 case ChannelUnsubscribed _:
                     ChannelState = ChannelState.Unsubscribed;
                     break;
-                default:
-                    HandleExtend(e);
-                    break;
             }
 
             CalculateState();
         }
 
         protected abstract void CalculateState();
-
-        protected virtual void HandleExtend(IEvent e)
-            => throw new NotImplementedException();
     }
 }
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index f29d269..a783827 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -51,7 +51,6 @@
                 AsyncLockDisposedException _ => FaultAction.Retry,
                 PulsarStreamDisposedException _ => FaultAction.Retry,
                 AsyncQueueDisposedException _ => FaultAction.Retry,
-                LookupNotReadyException _ => FaultAction.Retry,
                 OperationCanceledException _ => cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry,
                 DotPulsarException _ => FaultAction.Rethrow,
                 SocketException socketException => socketException.SocketErrorCode switch
diff --git a/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs b/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
deleted file mode 100644
index 326641d..0000000
--- a/src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Events
-{
-    using Abstractions;
-    using System;
-
-    /// <summary>
-    /// Representation of the sub producer of a partitioned producer state change.
-    /// </summary>
-    public sealed class PartitionedSubProducerStateChanged : IEvent
-    {
-        public Guid CorrelationId { get; }
-        public ProducerState ProducerState { get; }
-
-        public PartitionedSubProducerStateChanged(Guid correlationId, ProducerState producerState)
-        {
-            CorrelationId = correlationId;
-            ProducerState = producerState;
-        }
-    }
-}
diff --git a/src/DotPulsar/Internal/Events/UpdatePartitions.cs b/src/DotPulsar/Internal/Events/UpdatePartitions.cs
deleted file mode 100644
index b9894b5..0000000
--- a/src/DotPulsar/Internal/Events/UpdatePartitions.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Events
-{
-    using Abstractions;
-    using System;
-
-    /// <summary>
-    /// Representation of the partitions count of the partitioned topic updating.
-    /// </summary>
-    public sealed class UpdatePartitions : IEvent
-    {
-        public Guid CorrelationId { get; }
-
-        public uint PartitionsCount { get; }
-
-        public UpdatePartitions(Guid correlationId, uint partitionsCount)
-        {
-            CorrelationId = correlationId;
-            PartitionsCount = partitionsCount;
-        }
-    }
-}
diff --git a/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs
deleted file mode 100644
index 13fc562..0000000
--- a/src/DotPulsar/Internal/Exceptions/LookupNotReadyException.cs
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Exceptions
-{
-    using DotPulsar.Exceptions;
-
-    public sealed class LookupNotReadyException : DotPulsarException
-    {
-        public LookupNotReadyException() : base("The topic lookup operation is not ready yet") { }
-    }
-}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index d3c3495..7bce3be 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -16,76 +16,150 @@
 {
     using Abstractions;
     using DotPulsar.Abstractions;
-    using Events;
-    using Exceptions;
+    using DotPulsar.Extensions;
+    using DotPulsar.Internal.Extensions;
+    using DotPulsar.Internal.PulsarApi;
     using System;
     using System.Collections.Concurrent;
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
+    public sealed class Producer<TMessage> : IProducer<TMessage>
     {
-        private readonly Guid _correlationId;
-        private readonly IRegisterEvent _eventRegister;
-        private readonly IExecute _executor;
-        private readonly IStateChanged<ProducerState> _state;
-        private readonly PulsarClient _pulsarClient;
+        private readonly StateManager<ProducerState> _state;
+        private readonly IConnectionPool _connectionPool;
+        private readonly IHandleException _exceptionHandler;
+        private readonly ICompressorFactory? _compressorFactory;
         private readonly ProducerOptions<TMessage> _options;
+        private readonly ProcessManager _processManager;
         private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
         private readonly IMessageRouter _messageRouter;
-        private readonly CancellationTokenSource _cts = new();
-        private int _producersCount;
+        private readonly CancellationTokenSource _cts;
         private int _isDisposed;
+        private int _producerCount;
+
         public Uri ServiceUrl { get; }
         public string Topic { get; }
 
         public Producer(
-            Guid correlationId,
             Uri serviceUrl,
-            string topic,
-            IRegisterEvent registerEvent,
-            IExecute executor,
-            IStateChanged<ProducerState> state,
             ProducerOptions<TMessage> options,
-            PulsarClient pulsarClient
-        )
+            ProcessManager processManager,
+            IHandleException exceptionHandler,
+            IConnectionPool connectionPool,
+            ICompressorFactory? compressorFactory)
         {
-            _correlationId = correlationId;
+            _state = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
             ServiceUrl = serviceUrl;
-            Topic = topic;
-            _eventRegister = registerEvent;
-            _executor = executor;
-            _state = state;
+            Topic = options.Topic;
             _isDisposed = 0;
             _options = options;
-            _pulsarClient = pulsarClient;
+            _exceptionHandler = exceptionHandler;
+            _connectionPool = connectionPool;
+            _compressorFactory = compressorFactory;
+            _processManager = processManager;
             _messageRouter = options.MessageRouter;
-
-            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(1, 31);
+            _cts = new CancellationTokenSource();
+            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>();
+            _ = Monitor();
         }
 
-        private void CreateSubProducers(int startIndex, int count)
+        private async Task Monitor()
         {
-            if (count == 0)
-            {
-                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId);
-                _producers[0] = producer;
-                return;
-            }
+            await Task.Yield();
 
-            for (var i = startIndex; i < count; ++i)
+            try
             {
-                var producer = _pulsarClient.NewSubProducer(Topic, _options, _executor, _correlationId, (uint) i);
-                _producers[i] = producer;
+                var numberOfPartitions = await GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
+                var isPartitionedTopic = numberOfPartitions != 0;
+                var monitoringTasks = new Task<ProducerStateChanged>[isPartitionedTopic ? numberOfPartitions : 1];
+
+                var topic = Topic;
+
+                for (var partition = 0; partition < numberOfPartitions; ++partition)
+                {
+                    if (isPartitionedTopic)
+                        topic = $"{Topic}-partition-{partition}";
+
+                    var producer = CreateSubProducer(topic);
+                    _ = _producers.TryAdd(partition, producer);
+                    monitoringTasks[partition] = producer.StateChangedFrom(ProducerState.Disconnected, _cts.Token).AsTask();
+                }
+
+                Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
+
+                var connectedProducers = 0;
+
+                while (true)
+                {
+                    await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
+
+                    for (var i = 0; i < monitoringTasks.Length; ++i)
+                    {
+                        var task = monitoringTasks[i];
+                        if (!task.IsCompleted)
+                            continue;
+
+                        var state = task.Result.ProducerState;
+                        switch (state)
+                        {
+                            case ProducerState.Connected:
+                                ++connectedProducers;
+                                break;
+                            case ProducerState.Disconnected:
+                                --connectedProducers;
+                                break;
+                            case ProducerState.Faulted:
+                                throw new Exception("SubProducer faulted");
+                        }
+
+                        monitoringTasks[i] = task.Result.Producer.StateChangedFrom(state, _cts.Token).AsTask();
+                    }
+
+                    if (connectedProducers == 0)
+                        _state.SetState(ProducerState.Disconnected);
+                    else if (connectedProducers == numberOfPartitions)
+                        _state.SetState(ProducerState.Connected);
+                    else
+                        _state.SetState(ProducerState.PartiallyConnected);
+                }
+            }
+            catch
+            {
+                if (!_cts.IsCancellationRequested)
+                    _state.SetState(ProducerState.Faulted);
             }
         }
 
-        private async void UpdatePartitions(CancellationToken cancellationToken)
+        private SubProducer<TMessage> CreateSubProducer(string topic)
         {
-            var partitionsCount = (int) await _pulsarClient.GetNumberOfPartitions(Topic, cancellationToken).ConfigureAwait(false);
-            _eventRegister.Register(new UpdatePartitions(_correlationId, (uint)partitionsCount));
-            CreateSubProducers(_producers.Count, partitionsCount);
-            _producersCount = partitionsCount;
+            var correlationId = Guid.NewGuid();
+            var executor = new Executor(correlationId, _processManager, _exceptionHandler);
+            var producerName = _options.ProducerName;
+            var schema = _options.Schema;
+            var initialSequenceId = _options.InitialSequenceId;
+            var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, _compressorFactory);
+            var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+            var initialChannel = new NotReadyChannel<TMessage>();
+            var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
+            var process = new ProducerProcess(correlationId, stateManager, producer);
+            _processManager.Add(process);
+            process.Start();
+            return producer;
+        }
+
+        private async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
+        {
+            var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+            var commandPartitionedMetadata = new CommandPartitionedTopicMetadata { Topic = topic };
+            var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+            if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+                response.PartitionMetadataResponse.Throw();
+
+            return response.PartitionMetadataResponse.Partitions;
         }
 
         public bool IsFinalState()
@@ -94,10 +168,10 @@
         public bool IsFinalState(ProducerState state)
             => _state.IsFinalState(state);
 
-        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default)
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
             => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default)
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
             => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
 
         public async ValueTask DisposeAsync()
@@ -113,27 +187,21 @@
                 await producer.DisposeAsync().ConfigureAwait(false);
             }
 
-            _eventRegister.Register(new ProducerDisposed(_correlationId));
+            _state.SetState(ProducerState.Closed);
         }
 
-        public async Task EstablishNewChannel(CancellationToken cancellationToken)
+        private async ValueTask<int> ChoosePartitions(DotPulsar.MessageMetadata? metadata, CancellationToken cancellationToken)
         {
-            await _executor.Execute(() => UpdatePartitions(cancellationToken), cancellationToken).ConfigureAwait(false);
+            if (_producerCount == 0)
+                await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false);
+
+            return _messageRouter.ChoosePartition(metadata, _producerCount);
         }
 
-        private int ChoosePartitions(MessageMetadata? metadata)
-        {
-            if (_producers.IsEmpty)
-            {
-                throw new LookupNotReadyException();
-            }
-            return _producersCount == 0 ? 0 : _messageRouter.ChoosePartition(metadata, _producersCount);
-        }
+        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
+            => await _producers[await ChoosePartitions(null, cancellationToken).ConfigureAwait(false)].Send(message, cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default)
-            => await _executor.Execute(() => _producers[ChoosePartitions(null)].Send(message, cancellationToken), cancellationToken).ConfigureAwait(false);
-
-        public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default)
-            => await _executor.Execute(() => _producers[ChoosePartitions(metadata)].Send(message, cancellationToken), cancellationToken).ConfigureAwait(false);
+        public async ValueTask<MessageId> Send(DotPulsar.MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
+            => await _producers[await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false)].Send(message, cancellationToken).ConfigureAwait(false);
     }
 }
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index 36834df..7e0b37f 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -15,9 +15,7 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
-    using Events;
     using System;
-    using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class ProducerProcess : Process
@@ -25,101 +23,30 @@
         private readonly IStateManager<ProducerState> _stateManager;
         private readonly IEstablishNewChannel _producer;
 
-        // The following variables are only used when this is the process for parent producer.
-        private readonly IRegisterEvent _processManager;
-        private int _partitionsCount;
-        private int _connectedProducersCount;
-        private int _initialProducersCount;
-
-        // The following variables are only used for sub producer
-        private readonly Guid? _partitionedProducerId;
-
         public ProducerProcess(
             Guid correlationId,
             IStateManager<ProducerState> stateManager,
-            IEstablishNewChannel producer,
-            IRegisterEvent processManager,
-            Guid? partitionedProducerId = null) : base(correlationId)
+            IEstablishNewChannel producer) : base(correlationId)
         {
             _stateManager = stateManager;
             _producer = producer;
-            _processManager = processManager;
-            _partitionedProducerId = partitionedProducerId;
         }
 
         public override async ValueTask DisposeAsync()
         {
-            SetState(ProducerState.Closed);
+            _stateManager.SetState(ProducerState.Closed);
             CancellationTokenSource.Cancel();
-
             await _producer.DisposeAsync().ConfigureAwait(false);
         }
 
-        protected override void HandleExtend(IEvent e)
-        {
-            switch (e)
-            {
-                case PartitionedSubProducerStateChanged stateChanged:
-                    switch (stateChanged.ProducerState)
-                    {
-                        case ProducerState.Closed:
-                            _stateManager.SetState(ProducerState.Closed);
-                            break;
-                        case ProducerState.Connected:
-                            Interlocked.Increment(ref _connectedProducersCount);
-                            break;
-                        case ProducerState.Disconnected:
-                            // When the sub producer is initialized, the Disconnected event will be triggered.
-                            // So we need to first subtract from _initialProducersCount.
-                            if (_initialProducersCount == 0)
-                                Interlocked.Decrement(ref _connectedProducersCount);
-                            else
-                                Interlocked.Decrement(ref _initialProducersCount);
-                            break;
-                        case ProducerState.Faulted:
-                            _stateManager.SetState(ProducerState.Faulted);
-                            break;
-                    }
-
-                    break;
-                case UpdatePartitions updatePartitions:
-                    if (updatePartitions.PartitionsCount == 0)
-                    {
-                        Interlocked.Exchange(ref _initialProducersCount, 1);
-                        Interlocked.Exchange(ref _partitionsCount, 1);
-                    }
-                    else
-                    {
-                        Interlocked.Add(ref _initialProducersCount, (int) updatePartitions.PartitionsCount - _partitionsCount);
-                        Interlocked.Exchange(ref _partitionsCount, (int) updatePartitions.PartitionsCount);
-                    }
-
-                    break;
-            }
-        }
-
         protected override void CalculateState()
         {
             if (_stateManager.IsFinalState())
                 return;
 
-            if (!IsSubProducer()) // parent producer process
-            {
-                if (_connectedProducersCount <= 0)
-                    _stateManager.SetState(ProducerState.Disconnected);
-                else if (_connectedProducersCount == _partitionsCount)
-                    _stateManager.SetState(ProducerState.Connected);
-                else
-                    _stateManager.SetState(ProducerState.PartiallyConnected);
-
-                if (_partitionsCount == 0)
-                    _ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
-                return;
-            }
-
             if (ExecutorState == ExecutorState.Faulted)
             {
-                SetState(ProducerState.Faulted);
+                _stateManager.SetState(ProducerState.Faulted);
                 return;
             }
 
@@ -127,27 +54,13 @@
             {
                 case ChannelState.ClosedByServer:
                 case ChannelState.Disconnected:
-                    SetState(ProducerState.Disconnected);
+                    _stateManager.SetState(ProducerState.Disconnected);
                     _ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
                     return;
                 case ChannelState.Connected:
-                    SetState(ProducerState.Connected);
+                    _stateManager.SetState(ProducerState.Connected);
                     return;
             }
         }
-
-        /// <summary>
-        /// Check if this is the sub producer process of the partitioned producer.
-        /// </summary>
-        private bool IsSubProducer()
-            => _partitionedProducerId.HasValue;
-
-        private void SetState(ProducerState state)
-        {
-            _stateManager.SetState(state);
-
-            if (IsSubProducer())
-                _processManager.Register(new PartitionedSubProducerStateChanged(_partitionedProducerId!.Value, state));
-        }
     }
 }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 453ba26..167bed8 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -20,7 +20,6 @@
     using Exceptions;
     using Internal;
     using Internal.Abstractions;
-    using Internal.Extensions;
     using System;
     using System.Linq;
     using System.Threading;
@@ -58,20 +57,6 @@
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
-        internal async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
-        {
-            var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
-            var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic };
-            var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
-
-            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
-
-            if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
-                response.PartitionMetadataResponse.Throw();
-
-            return response.PartitionMetadataResponse.Partitions;
-        }
-
         /// <summary>
         /// Create a producer.
         /// </summary>
@@ -79,34 +64,7 @@
         {
             ThrowIfDisposed();
 
-            var correlationId = Guid.NewGuid();
-            var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-
-            var producer = new Producer<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, executor, stateManager, options, this);
-
-            if (options.StateChangedHandler is not null)
-                _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
-            var process = new ProducerProcess(correlationId, stateManager, producer, _processManager);
-            _processManager.Add(process);
-            process.Start();
-            return producer;
-        }
-
-        /// <summary>
-        /// Create a producer internally.
-        /// This method is used to create internal producers for partitioned producer.
-        /// </summary>
-        internal SubProducer<TMessage> NewSubProducer<TMessage>(string topic, ProducerOptions<TMessage> options, IExecute executor, Guid partitionedProducerGuid,
-            uint? partitionIndex = null)
-        {
-            ThrowIfDisposed();
-
             ICompressorFactory? compressorFactory = null;
-
-            if (partitionIndex.HasValue)
-                topic = $"{topic}-partition-{partitionIndex}";
-
             if (options.CompressionType != CompressionType.None)
             {
                 var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
@@ -116,22 +74,11 @@
                     throw new CompressionException($"Support for {compressionType} compression was not found");
             }
 
-            var correlationId = Guid.NewGuid();
+            var producer = new Producer<TMessage>(ServiceUrl, options, _processManager, _exceptionHandler, _connectionPool, compressorFactory);
 
-            var producerName = options.ProducerName;
-            var schema = options.Schema;
-            var initialSequenceId = options.InitialSequenceId;
-
-            var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, compressorFactory);
-            var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-            var initialChannel = new NotReadyChannel<TMessage>();
-            var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
-
-            if (options.StateChangedHandler is not null && !partitionIndex.HasValue) // the StateChangeHandler of the sub producers in partitioned producers should be disabled.
+            if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
-            var process = new ProducerProcess(correlationId, stateManager, producer, _processManager, partitionedProducerGuid);
-            _processManager.Add(process);
-            process.Start();
+
             return producer;
         }
 
diff --git a/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs b/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
deleted file mode 100644
index 476a81a..0000000
--- a/tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Tests.Internal
-{
-    using Abstractions;
-    using DotPulsar.Internal;
-    using DotPulsar.Internal.Abstractions;
-    using DotPulsar.Internal.Events;
-    using NSubstitute;
-    using System;
-    using System.Collections.Concurrent;
-    using System.Collections.Generic;
-    using Xunit;
-
-    public class PartitionedProducerProcessTests
-    {
-        [Fact]
-        public void TestPartitionedProducerStateManage_WhenSubProducersStateChange_ThenPartitionedProducerStateChangeCorrectly()
-        {
-            var connectionPool = Substitute.For<IConnectionPool>();
-            var establishNewChannel = Substitute.For<IEstablishNewChannel>();
-            var producer = Substitute.For<IProducer>();
-
-            var processManager = new ProcessManager(connectionPool);
-
-            var producerGuids = new Dictionary<uint, Guid>(3);
-            var producersGroup = new ConcurrentDictionary<uint, IProducer>(Environment.ProcessorCount, 3);
-            var partitionedProducerGuid = Guid.NewGuid();
-
-            for (uint i = 0; i < 3; i++)
-            {
-                var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-                var correlationId = Guid.NewGuid();
-                var process = new ProducerProcess(correlationId, stateManager, establishNewChannel, processManager, partitionedProducerGuid);
-                producerGuids[i] = correlationId;
-                producersGroup[i] = producer;
-                processManager.Add(process);
-            }
-
-            var partitionedStateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-
-            var producerProcess = new ProducerProcess(partitionedProducerGuid, partitionedStateManager, establishNewChannel, new ProcessManager(connectionPool));
-            processManager.Add(producerProcess);
-            processManager.Register(new UpdatePartitions(partitionedProducerGuid, (uint) producersGroup.Count));
-
-            // Test initial channel
-            processManager.Register(new ChannelDisconnected(producerGuids[0]));
-            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
-            processManager.Register(new ChannelDisconnected(producerGuids[1]));
-            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
-            processManager.Register(new ChannelDisconnected(producerGuids[2]));
-            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
-
-            // Test connect
-            Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState);
-            processManager.Register(new ChannelConnected(producerGuids[0]));
-            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
-            processManager.Register(new ChannelConnected(producerGuids[1]));
-            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
-            processManager.Register(new ChannelConnected(producerGuids[2]));
-            Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
-
-            // Test disconnect
-            processManager.Register(new ChannelDisconnected(producerGuids[1]));
-            Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState);
-
-            // Test reconnect
-            processManager.Register(new ChannelConnected(producerGuids[1]));
-            Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState);
-
-            // Test fault
-            processManager.Register(new ExecutorFaulted(producerGuids[1]));
-            Assert.Equal(ProducerState.Faulted, partitionedStateManager.CurrentState);
-        }
-
-        [Fact]
-        public void TestUpdatePartitions_WhenIncreasePartitions_ThenPartitionedProducerStateChangeCorrectly()
-        {
-            var connectionPool = Substitute.For<IConnectionPool>();
-            var processManager = new ProcessManager(connectionPool);
-            var establishNewChannel = Substitute.For<IEstablishNewChannel>();
-
-            var guid = Guid.NewGuid();
-            var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-            var process = new ProducerProcess(guid, stateManager, establishNewChannel, new ProcessManager(connectionPool));
-            processManager.Add(process);
-            processManager.Register(new UpdatePartitions(guid, 1));
-
-            Assert.Equal(ProducerState.Disconnected, stateManager.CurrentState);
-            processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
-            Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
-            processManager.Register(new UpdatePartitions(guid, 2));
-            Assert.Equal(ProducerState.PartiallyConnected, stateManager.CurrentState);
-            processManager.Register(new PartitionedSubProducerStateChanged(guid, ProducerState.Connected));
-            Assert.Equal(ProducerState.Connected, stateManager.CurrentState);
-        }
-    }
-}