blob: a74f015c3e9b648ead6a6c20499814575d45b1d9 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar
{
using Abstractions;
using DotPulsar.Internal.Compression;
using Exceptions;
using Internal;
using Internal.Abstractions;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Pulsar client for creating producers, consumers and readers.
/// </summary>
public sealed class PulsarClient : IPulsarClient
{
private readonly IConnectionPool _connectionPool;
private readonly ProcessManager _processManager;
private readonly IHandleException _exceptionHandler;
private int _isDisposed;
public Uri ServiceUrl { get; }
internal PulsarClient(
IConnectionPool connectionPool,
ProcessManager processManager,
IHandleException exceptionHandler,
Uri serviceUrl)
{
_connectionPool = connectionPool;
_processManager = processManager;
_exceptionHandler = exceptionHandler;
ServiceUrl = serviceUrl;
_isDisposed = 0;
DotPulsarEventSource.Log.ClientCreated();
}
/// <summary>
/// Get a builder that can be used to configure and build a PulsarClient instance.
/// </summary>
public static IPulsarClientBuilder Builder()
=> new PulsarClientBuilder();
/// <summary>
/// Create a producer.
/// </summary>
public IProducer CreateProducer(ProducerOptions options)
{
ThrowIfDisposed();
ICompressorFactory? compressorFactory = null;
if (options.CompressionType != CompressionType.None)
{
var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType);
if (compressorFactory is null)
throw new CompressionException($"Support for {compressionType} compression was not found");
}
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, compressorFactory);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var producer = new Producer(correlationId, ServiceUrl, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
var process = new ProducerProcess(correlationId, stateManager, factory, producer);
_processManager.Add(process);
process.Start();
return producer;
}
/// <summary>
/// Create a consumer.
/// </summary>
public IConsumer CreateConsumer(ConsumerOptions options)
{
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
return consumer;
}
/// <summary>
/// Create a reader.
/// </summary>
public IReader CreateReader(ReaderOptions options)
{
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
var process = new ReaderProcess(correlationId, stateManager, factory, reader);
_processManager.Add(process);
process.Start();
return reader;
}
/// <summary>
/// Dispose the client and all its producers, consumers and readers.
/// </summary>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
if (_processManager is IAsyncDisposable disposable)
await disposable.DisposeAsync().ConfigureAwait(false);
DotPulsarEventSource.Log.ClientDisposed();
}
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
throw new PulsarClientDisposedException();
}
}
}