blob: 2767568bf2c36f4d3906315cd5b8fbaef292161e [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar;
using Abstractions;
using DotPulsar.Internal.Compression;
using DotPulsar.Internal.PulsarApi;
using Exceptions;
using Internal;
using Internal.Abstractions;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Pulsar client for creating producers, consumers and readers.
/// </summary>
public sealed class PulsarClient : IPulsarClient
{
private readonly IConnectionPool _connectionPool;
private readonly ProcessManager _processManager;
private readonly IHandleException _exceptionHandler;
private int _isDisposed;
public Uri ServiceUrl { get; }
internal PulsarClient(
IConnectionPool connectionPool,
ProcessManager processManager,
IHandleException exceptionHandler,
Uri serviceUrl)
{
_connectionPool = connectionPool;
_processManager = processManager;
_exceptionHandler = exceptionHandler;
ServiceUrl = serviceUrl;
_isDisposed = 0;
DotPulsarMeter.ClientCreated();
}
/// <summary>
/// Get a builder that can be used to configure and build a PulsarClient instance.
/// </summary>
public static IPulsarClientBuilder Builder()
=> new PulsarClientBuilder();
/// <summary>
/// Create a producer.
/// </summary>
public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
{
ThrowIfDisposed();
ICompressorFactory? compressorFactory = null;
if (options.CompressionType != CompressionType.None)
{
var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType);
if (compressorFactory is null)
throw new CompressionException($"Support for {compressionType} compression was not found");
}
var producer = new Producer<TMessage>(ServiceUrl, options, _processManager, _exceptionHandler, _connectionPool, compressorFactory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
return producer;
}
/// <summary>
/// Create a consumer.
/// </summary>
public IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options)
{
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var consumerName = options.ConsumerName ?? $"Consumer-{correlationId:N}";
var subscribe = new CommandSubscribe
{
ConsumerName = consumerName,
InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
PriorityLevel = options.PriorityLevel,
ReadCompacted = options.ReadCompacted,
Subscription = options.SubscriptionName,
Topic = options.Topic,
Type = (CommandSubscribe.SubType) options.SubscriptionType
};
var messagePrefetchCount = options.MessagePrefetchCount;
var messageFactory = new MessageFactory<TMessage>(options.Schema);
var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
var decompressorFactories = CompressionFactories.DecompressorFactories();
var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
var process = new ConsumerProcess(correlationId, stateManager, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
return consumer;
}
/// <summary>
/// Create a reader.
/// </summary>
public IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options)
{
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var subscription = $"Reader-{correlationId:N}";
var subscribe = new CommandSubscribe
{
ConsumerName = options.ReaderName ?? subscription,
Durable = false,
ReadCompacted = options.ReadCompacted,
StartMessageId = options.StartMessageId.ToMessageIdData(),
Subscription = subscription,
Topic = options.Topic
};
var messagePrefetchCount = options.MessagePrefetchCount;
var messageFactory = new MessageFactory<TMessage>(options.Schema);
var batchHandler = new BatchHandler<TMessage>(false, messageFactory);
var decompressorFactories = CompressionFactories.DecompressorFactories();
var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var reader = new Reader<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
var process = new ReaderProcess(correlationId, stateManager, reader);
_processManager.Add(process);
process.Start();
return reader;
}
/// <summary>
/// Dispose the client and all its producers, consumers and readers.
/// </summary>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
if (_processManager is IAsyncDisposable disposable)
await disposable.DisposeAsync().ConfigureAwait(false);
DotPulsarMeter.ClientDisposed();
}
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
throw new PulsarClientDisposedException();
}
}