blob: 7fa2219c01bcbba12c993794ef1e50337573c744 [file] [log] [blame]
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Internal;
using DotPulsar.Internal.Abstractions;
using System;
using System.Collections.Generic;
namespace DotPulsar
{
public sealed class PulsarClient : IPulsarClient
{
private readonly object _lock;
private readonly IFaultStrategy _faultStrategy;
private readonly LinkedList<IDisposable> _disposabels;
private readonly ConnectionPool _connectionPool;
private bool _isClosed;
internal PulsarClient(ConnectionPool connectionPool)
{
_lock = new object();
_faultStrategy = new FaultStrategy(3000);
_disposabels = new LinkedList<IDisposable>();
_connectionPool = connectionPool;
_isClosed = false;
}
public static IPulsarClientBuilder Builder() => new PulsarClientBuilder();
public IProducer CreateProducer(ProducerOptions options)
{
lock (_lock)
{
ThrowIfClosed();
var producer = new Producer(new ProducerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
_disposabels.AddFirst(producer);
producer.StateChangedTo(ProducerState.Closed, default).ContinueWith(t => Remove(producer));
return producer;
}
}
public IConsumer CreateConsumer(ConsumerOptions options)
{
lock (_lock)
{
ThrowIfClosed();
var consumer = new Consumer(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy, options.SubscriptionType != SubscriptionType.Failover);
_disposabels.AddFirst(consumer);
consumer.StateChangedTo(ConsumerState.Closed, default).ContinueWith(t => Remove(consumer));
return consumer;
}
}
public IReader CreateReader(ReaderOptions options)
{
lock (_lock)
{
ThrowIfClosed();
var reader = new Reader(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
_disposabels.AddFirst(reader);
reader.StateChangedTo(ReaderState.Closed, default).ContinueWith(t => Remove(reader));
return reader;
}
}
public void Dispose() //While we wait for IAsyncDisposable
{
lock (_lock)
{
ThrowIfClosed();
_isClosed = true;
foreach (var disposable in _disposabels)
{
disposable.Dispose();
}
}
_connectionPool.Dispose();
}
private void ThrowIfClosed()
{
if (_isClosed)
throw new PulsarClientClosedException();
}
private void Remove(IDisposable disposable)
{
lock (_lock)
{
_disposabels.Remove(disposable);
}
}
}
}