blob: 3317313125aaeb7e5aa007331a8ab46a571cd37d [file] [log] [blame]
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal
{
public sealed class Consumer : IConsumer
{
private readonly Executor _executor;
private readonly CommandAck _cachedCommandAck;
private readonly IConsumerStreamFactory _streamFactory;
private readonly IFaultStrategy _faultStrategy;
private readonly bool _setProxyState;
private readonly StateManager<ConsumerState> _stateManager;
private readonly CancellationTokenSource _connectTokenSource;
private readonly Task _connectTask;
private Action _throwIfClosedOrFaulted;
private IConsumerStream Stream { get; set; }
public Consumer(IConsumerStreamFactory streamFactory, IFaultStrategy faultStrategy, bool setProxyState)
{
_executor = new Executor(ExecutorOnException);
_cachedCommandAck = new CommandAck();
_stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
_streamFactory = streamFactory;
_faultStrategy = faultStrategy;
_setProxyState = setProxyState;
_connectTokenSource = new CancellationTokenSource();
Stream = new NotReadyStream();
_connectTask = Connect(_connectTokenSource.Token);
_throwIfClosedOrFaulted = () => { };
}
public async Task<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
public async Task<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
public bool IsFinalState() => _stateManager.IsFinalState();
public bool IsFinalState(ConsumerState state) => _stateManager.IsFinalState(state);
public void Dispose()
{
_executor.Dispose();
_connectTokenSource.Cancel();
_connectTask.Wait();
}
public async Task<Message> Receive(CancellationToken cancellationToken) => await _executor.Execute(() => Stream.Receive(cancellationToken), cancellationToken);
public async Task Acknowledge(Message message, CancellationToken cancellationToken)
=> await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken);
public async Task Acknowledge(MessageId messageId, CancellationToken cancellationToken)
=> await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken);
public async Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
=> await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
public async Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
=> await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
public async Task Unsubscribe(CancellationToken cancellationToken)
{
_ = await _executor.Execute(() => Stream.Send(new CommandUnsubscribe()), cancellationToken);
HasClosed();
}
public async Task Seek(MessageId messageId, CancellationToken cancellationToken)
{
var seek = new CommandSeek { MessageId = messageId.Data };
_ = await _executor.Execute(() => Stream.Send(seek), cancellationToken);
return;
}
public async Task<MessageId> GetLastMessageId(CancellationToken cancellationToken)
{
var response = await _executor.Execute(() => Stream.Send(new CommandGetLastMessageId()), cancellationToken);
return new MessageId(response.LastMessageId);
}
private async Task Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
{
await _executor.Execute(() =>
{
_cachedCommandAck.Type = ackType;
_cachedCommandAck.MessageIds.Clear();
_cachedCommandAck.MessageIds.Add(messageIdData);
return Stream.Send(_cachedCommandAck);
}, cancellationToken);
}
private async Task ExecutorOnException(Exception exception, CancellationToken cancellationToken)
{
_throwIfClosedOrFaulted();
switch (_faultStrategy.DetermineFaultAction(exception))
{
case FaultAction.Retry:
await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
break;
case FaultAction.Relookup:
await _stateManager.StateChangedFrom(ConsumerState.Disconnected, cancellationToken);
break;
case FaultAction.Fault:
HasFaulted(exception);
break;
}
_throwIfClosedOrFaulted();
}
private void HasFaulted(Exception exception)
{
_throwIfClosedOrFaulted = () => throw exception;
_stateManager.SetState(ConsumerState.Faulted);
}
private void HasClosed()
{
_throwIfClosedOrFaulted = () => throw new ConsumerClosedException();
_stateManager.SetState(ConsumerState.Closed);
}
private async Task Connect(CancellationToken cancellationToken)
{
try
{
while (true)
{
using (var proxy = new ConsumerProxy(_stateManager, new AsyncQueue<MessagePackage>()))
using (Stream = await _streamFactory.CreateStream(proxy, cancellationToken))
{
if (_setProxyState)
proxy.Active();
else
await _stateManager.StateChangedFrom(ConsumerState.Disconnected, cancellationToken);
await _stateManager.StateChangedTo(ConsumerState.Disconnected, cancellationToken);
if (_stateManager.IsFinalState())
return;
}
}
}
catch (OperationCanceledException)
{
HasClosed();
}
catch (Exception exception)
{
HasFaulted(exception);
}
}
}
}