blob: a4427e040b512ca5020e227fe846487b786706ea [file] [log] [blame]
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal
{
public sealed class Reader : IReader
{
private readonly Executor _executor;
private readonly IConsumerStreamFactory _streamFactory;
private readonly IFaultStrategy _faultStrategy;
private readonly StateManager<ReaderState> _stateManager;
private readonly CancellationTokenSource _connectTokenSource;
private readonly Task _connectTask;
private Action _throwIfClosedOrFaulted;
private IConsumerStream Stream { get; set; }
public Reader(IConsumerStreamFactory streamFactory, IFaultStrategy faultStrategy)
{
_executor = new Executor(ExecutorOnException);
_stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
_streamFactory = streamFactory;
_faultStrategy = faultStrategy;
_connectTokenSource = new CancellationTokenSource();
Stream = new NotReadyStream();
_connectTask = Connect(_connectTokenSource.Token);
_throwIfClosedOrFaulted = () => { };
}
public async Task<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
public async Task<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
public bool IsFinalState() => _stateManager.IsFinalState();
public bool IsFinalState(ReaderState state) => _stateManager.IsFinalState(state);
public void Dispose()
{
_executor.Dispose();
_connectTokenSource.Cancel();
_connectTask.Wait();
}
public async Task<Message> Receive(CancellationToken cancellationToken) => await _executor.Execute(() => Stream.Receive(cancellationToken), cancellationToken);
private async Task ExecutorOnException(Exception exception, CancellationToken cancellationToken)
{
_throwIfClosedOrFaulted();
switch (_faultStrategy.DetermineFaultAction(exception))
{
case FaultAction.Retry:
await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
break;
case FaultAction.Relookup:
await _stateManager.StateChangedTo(ReaderState.Connected, cancellationToken);
break;
case FaultAction.Fault:
HasFaulted(exception);
break;
}
_throwIfClosedOrFaulted();
}
private void HasFaulted(Exception exception)
{
_throwIfClosedOrFaulted = () => throw exception;
_stateManager.SetState(ReaderState.Faulted);
}
private void HasClosed()
{
_throwIfClosedOrFaulted = () => throw new ReaderClosedException();
_stateManager.SetState(ReaderState.Closed);
}
private async Task Connect(CancellationToken cancellationToken)
{
try
{
while (true)
{
using (var proxy = new ReaderProxy(_stateManager, new AsyncQueue<MessagePackage>()))
using (Stream = await _streamFactory.CreateStream(proxy, cancellationToken))
{
proxy.Active();
await _stateManager.StateChangedFrom(ReaderState.Connected, cancellationToken);
if (_stateManager.IsFinalState())
return;
}
}
}
catch (OperationCanceledException)
{
HasClosed();
}
catch (Exception exception)
{
HasFaulted(exception);
}
}
}
}