blob: 776c3dfef285fafc672e214c5a5cbca664f6aa33 [file] [log] [blame]
using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal
{
public sealed class ConsumerStream : IConsumerStream
{
private readonly ulong _id;
private readonly IDequeue<MessagePackage> _dequeue;
private readonly Connection _connection;
private readonly IFaultStrategy _faultStrategy;
private readonly IConsumerProxy _proxy;
private readonly CommandFlow _commandFlow;
private uint _sendWhenZero;
private bool _firstBatch;
public ConsumerStream(ulong id, uint messagePrefetchCount, IDequeue<MessagePackage> dequeue, Connection connection, IFaultStrategy faultStrategy, IConsumerProxy proxy)
{
_id = id;
_dequeue = dequeue;
_connection = connection;
_faultStrategy = faultStrategy;
_proxy = proxy;
_commandFlow = new CommandFlow { ConsumerId = id, MessagePermits = messagePrefetchCount };
_sendWhenZero = 0;
_firstBatch = true;
}
public async ValueTask<Message> Receive(CancellationToken cancellationToken)
{
while (true)
{
if (_sendWhenZero == 0) //TODO should sending the flow command be handled on other thread and thereby not slow down the consumer?
{
await _connection.Send(_commandFlow);
if (_firstBatch)
{
_commandFlow.MessagePermits = (uint)Math.Ceiling(_commandFlow.MessagePermits * 0.5);
_firstBatch = false;
}
_sendWhenZero = _commandFlow.MessagePermits;
}
var messagePackage = await _dequeue.Dequeue(cancellationToken);
_sendWhenZero--;
try
{
return Serializer.Deserialize(messagePackage);
}
catch (ChecksumException)
{
var ack = new CommandAck
{
Type = CommandAck.AckType.Individual,
validation_error = CommandAck.ValidationError.ChecksumMismatch
};
ack.MessageIds.Add(messagePackage.Command.MessageId);
await Send(ack);
}
}
}
public async Task Send(CommandAck command)
{
try
{
command.ConsumerId = _id;
await _connection.Send(command);
}
catch (Exception exception)
{
OnException(exception);
throw;
}
}
public async Task<CommandSuccess> Send(CommandUnsubscribe command)
{
try
{
command.ConsumerId = _id;
var response = await _connection.Send(command);
response.Expect(BaseCommand.Type.Success);
return response.Success;
}
catch (Exception exception)
{
OnException(exception);
throw;
}
}
public async Task<CommandSuccess> Send(CommandSeek command)
{
try
{
command.ConsumerId = _id;
var response = await _connection.Send(command);
response.Expect(BaseCommand.Type.Success);
return response.Success;
}
catch (Exception exception)
{
OnException(exception);
throw;
}
}
public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command)
{
try
{
command.ConsumerId = _id;
var response = await _connection.Send(command);
response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
return response.GetLastMessageIdResponse;
}
catch (Exception exception)
{
OnException(exception);
throw;
}
}
public async ValueTask DisposeAsync()
{
try
{
await _connection.Send(new CommandCloseConsumer { ConsumerId = _id });
}
catch
{
// Ignore
}
}
private void OnException(Exception exception)
{
if (_faultStrategy.DetermineFaultAction(exception) == FaultAction.Relookup)
_proxy.Disconnected();
}
}
}