| /* |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| namespace DotPulsar; |
| |
| using Abstractions; |
| using DotPulsar.Internal.Compression; |
| using DotPulsar.Internal.PulsarApi; |
| using Exceptions; |
| using Internal; |
| using Internal.Abstractions; |
| using System; |
| using System.Linq; |
| using System.Threading; |
| using System.Threading.Tasks; |
| |
| /// <summary> |
| /// Pulsar client for creating producers, consumers and readers. |
| /// </summary> |
| public sealed class PulsarClient : IPulsarClient |
| { |
| private readonly IConnectionPool _connectionPool; |
| private readonly ProcessManager _processManager; |
| private readonly IHandleException _exceptionHandler; |
| private int _isDisposed; |
| |
| public Uri ServiceUrl { get; } |
| |
| internal PulsarClient( |
| IConnectionPool connectionPool, |
| ProcessManager processManager, |
| IHandleException exceptionHandler, |
| Uri serviceUrl) |
| { |
| _connectionPool = connectionPool; |
| _processManager = processManager; |
| _exceptionHandler = exceptionHandler; |
| ServiceUrl = serviceUrl; |
| _isDisposed = 0; |
| DotPulsarMeter.ClientCreated(); |
| } |
| |
| /// <summary> |
| /// Get a builder that can be used to configure and build a PulsarClient instance. |
| /// </summary> |
| public static IPulsarClientBuilder Builder() |
| => new PulsarClientBuilder(); |
| |
| /// <summary> |
| /// Create a producer. |
| /// </summary> |
| public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options) |
| { |
| ThrowIfDisposed(); |
| |
| ICompressorFactory? compressorFactory = null; |
| if (options.CompressionType != CompressionType.None) |
| { |
| var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType; |
| compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType); |
| |
| if (compressorFactory is null) |
| throw new CompressionException($"Support for {compressionType} compression was not found"); |
| } |
| |
| var producer = new Producer<TMessage>(ServiceUrl, options, _processManager, _exceptionHandler, _connectionPool, compressorFactory); |
| |
| if (options.StateChangedHandler is not null) |
| _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler); |
| |
| return producer; |
| } |
| |
| /// <summary> |
| /// Create a consumer. |
| /// </summary> |
| public IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options) |
| { |
| ThrowIfDisposed(); |
| |
| var correlationId = Guid.NewGuid(); |
| var consumerName = options.ConsumerName ?? $"Consumer-{correlationId:N}"; |
| var subscribe = new CommandSubscribe |
| { |
| ConsumerName = consumerName, |
| InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition, |
| PriorityLevel = options.PriorityLevel, |
| ReadCompacted = options.ReadCompacted, |
| Subscription = options.SubscriptionName, |
| Topic = options.Topic, |
| Type = (CommandSubscribe.SubType) options.SubscriptionType |
| }; |
| var messagePrefetchCount = options.MessagePrefetchCount; |
| var messageFactory = new MessageFactory<TMessage>(options.Schema); |
| var batchHandler = new BatchHandler<TMessage>(true, messageFactory); |
| var decompressorFactories = CompressionFactories.DecompressorFactories(); |
| var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories); |
| var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted); |
| var initialChannel = new NotReadyChannel<TMessage>(); |
| var executor = new Executor(correlationId, _processManager, _exceptionHandler); |
| var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, initialChannel, executor, stateManager, factory); |
| if (options.StateChangedHandler is not null) |
| _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler); |
| var process = new ConsumerProcess(correlationId, stateManager, consumer, options.SubscriptionType == SubscriptionType.Failover); |
| _processManager.Add(process); |
| process.Start(); |
| return consumer; |
| } |
| |
| /// <summary> |
| /// Create a reader. |
| /// </summary> |
| public IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options) |
| { |
| ThrowIfDisposed(); |
| |
| var correlationId = Guid.NewGuid(); |
| var subscription = $"Reader-{correlationId:N}"; |
| var subscribe = new CommandSubscribe |
| { |
| ConsumerName = options.ReaderName ?? subscription, |
| Durable = false, |
| ReadCompacted = options.ReadCompacted, |
| StartMessageId = options.StartMessageId.ToMessageIdData(), |
| Subscription = subscription, |
| Topic = options.Topic |
| }; |
| var messagePrefetchCount = options.MessagePrefetchCount; |
| var messageFactory = new MessageFactory<TMessage>(options.Schema); |
| var batchHandler = new BatchHandler<TMessage>(false, messageFactory); |
| var decompressorFactories = CompressionFactories.DecompressorFactories(); |
| var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories); |
| var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted); |
| var initialChannel = new NotReadyChannel<TMessage>(); |
| var executor = new Executor(correlationId, _processManager, _exceptionHandler); |
| var reader = new Reader<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, initialChannel, executor, stateManager, factory); |
| if (options.StateChangedHandler is not null) |
| _ = StateMonitor.MonitorReader(reader, options.StateChangedHandler); |
| var process = new ReaderProcess(correlationId, stateManager, reader); |
| _processManager.Add(process); |
| process.Start(); |
| return reader; |
| } |
| |
| /// <summary> |
| /// Dispose the client and all its producers, consumers and readers. |
| /// </summary> |
| public async ValueTask DisposeAsync() |
| { |
| if (Interlocked.Exchange(ref _isDisposed, 1) != 0) |
| return; |
| |
| if (_processManager is IAsyncDisposable disposable) |
| await disposable.DisposeAsync().ConfigureAwait(false); |
| |
| DotPulsarMeter.ClientDisposed(); |
| } |
| |
| private void ThrowIfDisposed() |
| { |
| if (_isDisposed != 0) |
| throw new PulsarClientDisposedException(); |
| } |
| } |