blob: c461283e5e8b4c2e1a4cdf63a3a2685892c6dc57 [file] [log] [blame]
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System;
namespace DotPulsar.Internal
{
public sealed class ConsumerManager : IDisposable
{
private readonly IdLookup<IConsumerProxy> _proxies;
public ConsumerManager() => _proxies = new IdLookup<IConsumerProxy>();
public bool HasConsumers => !_proxies.IsEmpty();
public void Outgoing(CommandSubscribe subscribe, IConsumerProxy proxy) => subscribe.ConsumerId = _proxies.Add(proxy);
public void Dispose()
{
foreach (var id in _proxies.AllIds())
{
RemoveConsumer(id);
}
}
public void Incoming(MessagePackage package)
{
var consumerId = package.Command.ConsumerId;
var proxy = _proxies[consumerId];
proxy?.Enqueue(package);
}
public void Incoming(CommandCloseConsumer command) => RemoveConsumer(command.ConsumerId);
public void Incoming(CommandActiveConsumerChange command)
{
var proxy = _proxies[command.ConsumerId];
if (proxy == null) return;
if (command.IsActive)
proxy.Active();
else
proxy.Inactive();
}
public void Incoming(CommandReachedEndOfTopic command)
{
var proxy = _proxies[command.ConsumerId];
proxy?.ReachedEndOfTopic();
}
public void Remove(ulong consumerId) => _proxies.Remove(consumerId);
private void RemoveConsumer(ulong consumerId)
{
var proxy = _proxies[consumerId];
if (proxy == null) return;
proxy.Disconnected();
_proxies.Remove(consumerId);
}
}
}