Made (partitioned) producer more resilient.
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 7bce3be..6079994 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -16,6 +16,7 @@
{
using Abstractions;
using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
@@ -24,7 +25,7 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class Producer<TMessage> : IProducer<TMessage>
+ public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
{
private readonly StateManager<ProducerState> _state;
private readonly IConnectionPool _connectionPool;
@@ -35,8 +36,10 @@
private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
private readonly IMessageRouter _messageRouter;
private readonly CancellationTokenSource _cts;
+ private readonly IExecute _executor;
private int _isDisposed;
private int _producerCount;
+ private Exception? _throw;
public Uri ServiceUrl { get; }
public string Topic { get; }
@@ -60,74 +63,84 @@
_processManager = processManager;
_messageRouter = options.MessageRouter;
_cts = new CancellationTokenSource();
+ _executor = new Executor(Guid.Empty, this, _exceptionHandler);
_producers = new ConcurrentDictionary<int, IProducer<TMessage>>();
- _ = Monitor();
+ _ = Setup();
}
- private async Task Monitor()
+ private async Task Setup()
{
await Task.Yield();
try
{
- var numberOfPartitions = await GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
- var isPartitionedTopic = numberOfPartitions != 0;
- var monitoringTasks = new Task<ProducerStateChanged>[isPartitionedTopic ? numberOfPartitions : 1];
+ await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
+ }
+ catch (Exception exception)
+ {
+ if (_cts.IsCancellationRequested)
+ return;
- var topic = Topic;
+ _throw = exception;
+ _state.SetState(ProducerState.Faulted);
+ }
+ }
- for (var partition = 0; partition < numberOfPartitions; ++partition)
+ private async Task Monitor()
+ {
+ var numberOfPartitions = await GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
+ var isPartitionedTopic = numberOfPartitions != 0;
+ var monitoringTasks = new Task<ProducerStateChanged>[isPartitionedTopic ? numberOfPartitions : 1];
+
+ var topic = Topic;
+
+ for (var partition = 0; partition < monitoringTasks.Length; ++partition)
+ {
+ if (isPartitionedTopic)
+ topic = $"{Topic}-partition-{partition}";
+
+ var producer = CreateSubProducer(topic);
+ _ = _producers.TryAdd(partition, producer);
+ monitoringTasks[partition] = producer.StateChangedFrom(ProducerState.Disconnected, _cts.Token).AsTask();
+ }
+
+ Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
+
+ var connectedProducers = 0;
+
+ while (true)
+ {
+ await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
+
+ for (var i = 0; i < monitoringTasks.Length; ++i)
{
- if (isPartitionedTopic)
- topic = $"{Topic}-partition-{partition}";
+ var task = monitoringTasks[i];
+ if (!task.IsCompleted)
+ continue;
- var producer = CreateSubProducer(topic);
- _ = _producers.TryAdd(partition, producer);
- monitoringTasks[partition] = producer.StateChangedFrom(ProducerState.Disconnected, _cts.Token).AsTask();
- }
-
- Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
-
- var connectedProducers = 0;
-
- while (true)
- {
- await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
-
- for (var i = 0; i < monitoringTasks.Length; ++i)
+ var state = task.Result.ProducerState;
+ switch (state)
{
- var task = monitoringTasks[i];
- if (!task.IsCompleted)
- continue;
-
- var state = task.Result.ProducerState;
- switch (state)
- {
- case ProducerState.Connected:
- ++connectedProducers;
- break;
- case ProducerState.Disconnected:
- --connectedProducers;
- break;
- case ProducerState.Faulted:
- throw new Exception("SubProducer faulted");
- }
-
- monitoringTasks[i] = task.Result.Producer.StateChangedFrom(state, _cts.Token).AsTask();
+ case ProducerState.Connected:
+ ++connectedProducers;
+ break;
+ case ProducerState.Disconnected:
+ --connectedProducers;
+ break;
+ case ProducerState.Faulted:
+ _state.SetState(ProducerState.Faulted);
+ return;
}
- if (connectedProducers == 0)
- _state.SetState(ProducerState.Disconnected);
- else if (connectedProducers == numberOfPartitions)
- _state.SetState(ProducerState.Connected);
- else
- _state.SetState(ProducerState.PartiallyConnected);
+ monitoringTasks[i] = task.Result.Producer.StateChangedFrom(state, _cts.Token).AsTask();
}
- }
- catch
- {
- if (!_cts.IsCancellationRequested)
- _state.SetState(ProducerState.Faulted);
+
+ if (connectedProducers == 0)
+ _state.SetState(ProducerState.Disconnected);
+ else if (connectedProducers == monitoringTasks.Length)
+ _state.SetState(ProducerState.Connected);
+ else
+ _state.SetState(ProducerState.PartiallyConnected);
}
}
@@ -182,18 +195,24 @@
_cts.Cancel();
_cts.Dispose();
+ _state.SetState(ProducerState.Closed);
+
foreach (var producer in _producers.Values)
{
await producer.DisposeAsync().ConfigureAwait(false);
}
-
- _state.SetState(ProducerState.Closed);
}
private async ValueTask<int> ChoosePartitions(DotPulsar.MessageMetadata? metadata, CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
if (_producerCount == 0)
- await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false);
+ {
+ _ = await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false);
+ if (_throw is not null)
+ throw _throw;
+ }
return _messageRouter.ChoosePartition(metadata, _producerCount);
}
@@ -203,5 +222,13 @@
public async ValueTask<MessageId> Send(DotPulsar.MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
=> await _producers[await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false)].Send(message, cancellationToken).ConfigureAwait(false);
+
+ private void ThrowIfDisposed()
+ {
+ if (_isDisposed != 0)
+ throw new ProducerDisposedException(GetType().FullName!);
+ }
+
+ public void Register(IEvent @event) { }
}
}
diff --git a/src/DotPulsar/Internal/SubProducer.cs b/src/DotPulsar/Internal/SubProducer.cs
index 5d15b0e..78022d0 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -152,7 +152,7 @@
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- throw new ProducerDisposedException(GetType().FullName!);
+ throw new ProducerDisposedException(typeof(Producer<TMessage>).FullName!);
}
}
}