blob: 5c3f138f9179f3d3fc87330a10596c80775697a5 [file] [log] [blame]
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal
{
public sealed class ProducerStreamFactory : IProducerStreamFactory
{
private readonly ConnectionPool _connectionTool;
private readonly ProducerOptions _options;
private readonly IFaultStrategy _faultStrategy;
private readonly SequenceId _sequenceId;
public ProducerStreamFactory(ConnectionPool connectionManager, ProducerOptions options, IFaultStrategy faultStrategy)
{
_connectionTool = connectionManager;
_options = options;
_faultStrategy = faultStrategy;
_sequenceId = new SequenceId(options.InitialSequenceId);
}
public async Task<IProducerStream> CreateStream(IProducerProxy proxy, CancellationToken cancellationToken)
{
var commandProducer = new CommandProducer
{
ProducerName = _options.ProducerName,
Topic = _options.Topic
};
while (true)
{
try
{
var connection = await _connectionTool.FindConnectionForTopic(_options.Topic, cancellationToken);
var response = await connection.Send(commandProducer, proxy);
return new ProducerStream(response.ProducerId, response.ProducerName, _sequenceId, connection, _faultStrategy, proxy);
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
throw;
else
await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
}
catch (Exception exception)
{
switch (_faultStrategy.DetermineFaultAction(exception))
{
case FaultAction.Relookup:
case FaultAction.Retry:
await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
continue;
}
throw;
}
}
}
}
}