blob: 5be266106348ebc35a8734376db2ed6b3e6c886f [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using DotPulsar.Internal;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Compression;
/// <summary>
/// Pulsar client for creating producers, consumers and readers.
/// </summary>
public sealed class PulsarClient : IPulsarClient
{
private readonly object _lock = new();
private readonly HashSet<IAsyncDisposable> _disposables;
private readonly IConnectionPool _connectionPool;
private readonly IHandleException _exceptionHandler;
private int _isDisposed;
public Uri ServiceUrl { get; }
internal PulsarClient(
IConnectionPool connectionPool,
IHandleException exceptionHandler,
Uri serviceUrl)
{
_disposables = [];
_connectionPool = connectionPool;
_exceptionHandler = exceptionHandler;
ServiceUrl = serviceUrl;
_isDisposed = 0;
DotPulsarMeter.ClientCreated();
}
/// <summary>
/// Get a builder that can be used to configure and build a PulsarClient instance.
/// </summary>
public static IPulsarClientBuilder Builder()
=> new PulsarClientBuilder();
/// <summary>
/// Create a producer.
/// </summary>
public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
{
ThrowIfDisposed();
ICompressorFactory? compressorFactory = null;
if (options.CompressionType != CompressionType.None)
{
var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType);
if (compressorFactory is null)
throw new CompressionException($"Support for {compressionType} compression was not found");
}
var producer = new Producer<TMessage>(ServiceUrl, options, _exceptionHandler, _connectionPool, compressorFactory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
AddDisposable(producer);
producer.StateChangedTo(ProducerState.Closed).AsTask().ContinueWith(_ => RemoveDisposable(producer));
return producer;
}
/// <summary>
/// Create a consumer.
/// </summary>
public IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options)
{
ThrowIfDisposed();
var consumer = new Consumer<TMessage>(ServiceUrl, options, _connectionPool, _exceptionHandler);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
AddDisposable(consumer);
consumer.StateChangedTo(ConsumerState.Closed).AsTask().ContinueWith(_ => RemoveDisposable(consumer));
return consumer;
}
/// <summary>
/// Create a reader.
/// </summary>
public IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options)
{
ThrowIfDisposed();
var reader = new Reader<TMessage>(ServiceUrl, options, _exceptionHandler, _connectionPool);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
AddDisposable(reader);
reader.StateChangedTo(ReaderState.Closed).AsTask().ContinueWith(_ => RemoveDisposable(reader));
return reader;
}
/// <summary>
/// Dispose the client and all its producers, consumers and readers.
/// </summary>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
IEnumerable<IAsyncDisposable> disposables;
lock (_lock)
{
disposables = _disposables.ToArray();
_disposables.Clear();
}
foreach (var item in disposables)
await item.DisposeAsync().ConfigureAwait(false);
await _connectionPool.DisposeAsync().ConfigureAwait(false);
DotPulsarMeter.ClientDisposed();
}
private void AddDisposable(IAsyncDisposable disposable)
{
lock (_lock)
{
_disposables.Add(disposable);
}
}
internal void RemoveDisposable(IAsyncDisposable disposable)
{
lock (_lock)
{
_disposables.Remove(disposable);
}
}
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
throw new PulsarClientDisposedException();
}
}