| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using Apache.Iggy.Configuration; |
| using Apache.Iggy.Encryption; |
| using Apache.Iggy.Enums; |
| using Apache.Iggy.Factory; |
| using Apache.Iggy.IggyClient; |
| using Microsoft.Extensions.Logging; |
| using Microsoft.Extensions.Logging.Abstractions; |
| using Partitioning = Apache.Iggy.Kinds.Partitioning; |
| |
| namespace Apache.Iggy.Publishers; |
| |
| /// <summary> |
| /// Fluent builder for creating and configuring <see cref="IggyPublisher" /> instances. |
| /// Provides a convenient API for setting up publishers with various configuration options |
| /// including connection settings, partitioning, encryption, retry logic, and background sending. |
| /// </summary> |
| public class IggyPublisherBuilder |
| { |
| protected EventHandler<PublisherErrorEventArgs>? _onBackgroundError; |
| protected EventHandler<MessageBatchFailedEventArgs>? _onMessageBatchFailed; |
| |
| /// <summary> |
| /// Gets or sets the publisher configuration. |
| /// </summary> |
| internal IggyPublisherConfig Config { get; set; } = new(); |
| |
| /// <summary> |
| /// Gets or sets the Iggy client instance to use. |
| /// When null and <see cref="IggyPublisherConfig.CreateIggyClient" /> is true, a new client will be created during |
| /// build. |
| /// </summary> |
| internal IIggyClient? IggyClient { get; set; } |
| |
| /// <summary> |
| /// Creates a new publisher builder using an existing Iggy client instance. |
| /// </summary> |
| /// <param name="iggyClient">The existing Iggy client to use.</param> |
| /// <param name="streamId">The identifier of the target stream.</param> |
| /// <param name="topicId">The identifier of the target topic.</param> |
| /// <returns>A new instance of <see cref="IggyPublisherBuilder" />.</returns> |
| public static IggyPublisherBuilder Create(IIggyClient iggyClient, Identifier streamId, Identifier topicId) |
| { |
| return new IggyPublisherBuilder |
| { |
| Config = new IggyPublisherConfig |
| { |
| CreateIggyClient = false, |
| StreamId = streamId, |
| TopicId = topicId |
| }, |
| IggyClient = iggyClient |
| }; |
| } |
| |
| /// <summary> |
| /// Creates a new publisher builder that will create its own Iggy client. |
| /// </summary> |
| /// <param name="streamId">The identifier of the target stream.</param> |
| /// <param name="topicId">The identifier of the target topic.</param> |
| /// <returns>A new instance of <see cref="IggyPublisherBuilder" />.</returns> |
| public static IggyPublisherBuilder Create(Identifier streamId, Identifier topicId) |
| { |
| return new IggyPublisherBuilder |
| { |
| Config = new IggyPublisherConfig |
| { |
| CreateIggyClient = true, |
| StreamId = streamId, |
| TopicId = topicId |
| } |
| }; |
| } |
| |
| /// <summary> |
| /// Creates a new publisher builder using an existing configuration. |
| /// </summary> |
| /// <param name="config">The configuration to use for the publisher.</param> |
| /// <returns>A new instance of <see cref="IggyPublisherBuilder" />.</returns> |
| public static IggyPublisherBuilder Create(IggyPublisherConfig config) |
| { |
| return new IggyPublisherBuilder { Config = config }; |
| } |
| |
| |
| /// <summary> |
| /// Configures the connection settings for the publisher's Iggy client. |
| /// Only used when the builder creates its own client. |
| /// </summary> |
| /// <param name="protocol">The protocol to use (TCP, QUIC, or HTTP).</param> |
| /// <param name="address">The server address to connect to (format depends on protocol).</param> |
| /// <param name="login">The login username for authentication.</param> |
| /// <param name="password">The password for authentication.</param> |
| /// <param name="receiveBufferSize">The size of the receive buffer in bytes. Default is 4096.</param> |
| /// <param name="sendBufferSize">The size of the send buffer in bytes. Default is 4096.</param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder WithConnection(Protocol protocol, string address, string login, string password, |
| int receiveBufferSize = 4096, int sendBufferSize = 4096) |
| { |
| Config.Protocol = protocol; |
| Config.Address = address; |
| Config.Login = login; |
| Config.Password = password; |
| Config.ReceiveBufferSize = receiveBufferSize; |
| Config.SendBufferSize = sendBufferSize; |
| |
| return this; |
| } |
| |
| /// <summary> |
| /// Configures the partitioning strategy for messages sent by the publisher. |
| /// Determines how messages are distributed across topic partitions. |
| /// </summary> |
| /// <param name="partitioning"> |
| /// The partitioning configuration to use (e.g., balanced, partition-specific, or message-key |
| /// based). |
| /// </param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder WithPartitioning(Partitioning partitioning) |
| { |
| Config.Partitioning = partitioning; |
| |
| return this; |
| } |
| |
| /// <summary> |
| /// Enables automatic stream creation if the target stream does not exist. |
| /// </summary> |
| /// <param name="name">The name to use when creating the stream.</param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder CreateStreamIfNotExists(string name) |
| { |
| Config.CreateStream = true; |
| Config.StreamName = name; |
| |
| return this; |
| } |
| |
| /// <summary> |
| /// Enables automatic topic creation if the target topic does not exist. |
| /// </summary> |
| /// <param name="name">The name to use when creating the topic.</param> |
| /// <param name="topicPartitionsCount">The number of partitions for the topic. Default is 1.</param> |
| /// <param name="compressionAlgorithm">The compression algorithm to use for messages in the topic. Default is None.</param> |
| /// <param name="replicationFactor">The replication factor for the topic. Null means server default.</param> |
| /// <param name="messageExpiry">The message expiry time in seconds (0 for no expiry). Default is 0.</param> |
| /// <param name="maxTopicSize">The maximum size of the topic in bytes (0 for unlimited). Default is 0.</param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder CreateTopicIfNotExists(string name, uint topicPartitionsCount = 1, |
| CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, byte? replicationFactor = null, |
| ulong messageExpiry = 0, ulong maxTopicSize = 0) |
| { |
| Config.CreateTopic = true; |
| Config.TopicName = name; |
| Config.TopicPartitionsCount = topicPartitionsCount; |
| Config.TopicCompressionAlgorithm = compressionAlgorithm; |
| Config.TopicReplicationFactor = replicationFactor; |
| Config.TopicMessageExpiry = messageExpiry; |
| Config.TopicMaxTopicSize = maxTopicSize; |
| |
| return this; |
| } |
| |
| /// <summary> |
| /// Configures message encryption using the specified encryptor. |
| /// </summary> |
| /// <param name="encryptor">The message encryptor to use for encrypting message payloads.</param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder WithEncryptor(IMessageEncryptor encryptor) |
| { |
| Config.MessageEncryptor = encryptor; |
| |
| return this; |
| } |
| |
| /// <summary> |
| /// Registers an event handler for background processing errors. |
| /// Only invoked when background sending is enabled. |
| /// </summary> |
| /// <param name="handler">The event handler to invoke when background errors occur.</param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder OnBackgroundError(EventHandler<PublisherErrorEventArgs> handler) |
| { |
| _onBackgroundError = handler; |
| return this; |
| } |
| |
| /// <summary> |
| /// Registers an event handler for failed message batch sends. |
| /// Invoked when a batch of messages fails to send after all retry attempts are exhausted. |
| /// </summary> |
| /// <param name="handler">The event handler to invoke when message batches fail to send.</param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder OnMessageBatchFailed(EventHandler<MessageBatchFailedEventArgs> handler) |
| { |
| _onMessageBatchFailed = handler; |
| return this; |
| } |
| |
| /// <summary> |
| /// Configures retry behavior for failed message sends. |
| /// Uses exponential backoff with configurable parameters. |
| /// </summary> |
| /// <param name="enabled">Whether retry is enabled. Default is true.</param> |
| /// <param name="maxAttempts">The maximum number of retry attempts. Default is 3.</param> |
| /// <param name="initialDelay">The initial delay before the first retry. Default is 100ms.</param> |
| /// <param name="maxDelay">The maximum delay between retries. Default is 10 seconds.</param> |
| /// <param name="backoffMultiplier">The multiplier for exponential backoff. Default is 2.0.</param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder WithRetry(bool enabled = true, int maxAttempts = 3, |
| TimeSpan? initialDelay = null, TimeSpan? maxDelay = null, double backoffMultiplier = 2.0) |
| { |
| Config.EnableRetry = enabled; |
| Config.MaxRetryAttempts = maxAttempts; |
| Config.InitialRetryDelay = initialDelay ?? TimeSpan.FromMilliseconds(100); |
| Config.MaxRetryDelay = maxDelay ?? TimeSpan.FromSeconds(10); |
| Config.RetryBackoffMultiplier = backoffMultiplier; |
| return this; |
| } |
| |
| /// <summary> |
| /// Configures background message sending for asynchronous, batched message delivery. |
| /// When enabled, messages are queued and sent in batches for improved throughput. |
| /// </summary> |
| /// <param name="enabled">Whether background sending is enabled. Default is true.</param> |
| /// <param name="queueCapacity">The maximum number of messages that can be queued. Default is 10,000.</param> |
| /// <param name="batchSize">The number of messages to send in each batch. Default is 100.</param> |
| /// <param name="flushInterval">The interval at which to flush pending messages. Default is 100ms.</param> |
| /// <param name="disposalTimeout"> |
| /// The timeout to wait for the background processor to complete during disposal. Default is |
| /// 5 seconds. |
| /// </param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder WithBackgroundSending(bool enabled = true, int queueCapacity = 10000, |
| int batchSize = 100, TimeSpan? flushInterval = null, TimeSpan? disposalTimeout = null) |
| { |
| Config.EnableBackgroundSending = enabled; |
| Config.BackgroundQueueCapacity = queueCapacity; |
| Config.BackgroundBatchSize = batchSize; |
| Config.BackgroundFlushInterval = flushInterval ?? TimeSpan.FromMilliseconds(100); |
| Config.BackgroundDisposalTimeout = disposalTimeout ?? TimeSpan.FromSeconds(5); |
| return this; |
| } |
| |
| /// <summary> |
| /// Configures the logger factory for diagnostic logging. |
| /// </summary> |
| /// <param name="loggerFactory">The logger factory to use for creating loggers.</param> |
| /// <returns>The builder instance for method chaining.</returns> |
| public IggyPublisherBuilder WithLogger(ILoggerFactory loggerFactory) |
| { |
| Config.LoggerFactory = loggerFactory; |
| return this; |
| } |
| |
| /// <summary> |
| /// Builds and returns a configured <see cref="IggyPublisher" /> instance. |
| /// Creates the Iggy client if needed, wires up all event handlers, and initializes the publisher. |
| /// </summary> |
| /// <returns>A fully configured <see cref="IggyPublisher" /> instance ready to send messages.</returns> |
| /// <exception cref="ArgumentNullException">Thrown when IggyClient is null and CreateIggyClient is false.</exception> |
| /// <exception cref="InvalidOperationException">Thrown when the configuration is invalid.</exception> |
| public IggyPublisher Build() |
| { |
| Validate(); |
| |
| if (Config.CreateIggyClient) |
| { |
| IggyClient = IggyClientFactory.CreateClient(new IggyClientConfigurator |
| { |
| Protocol = Config.Protocol, |
| BaseAddress = Config.Address, |
| ReceiveBufferSize = Config.ReceiveBufferSize, |
| SendBufferSize = Config.SendBufferSize |
| }); |
| } |
| |
| var publisher = new IggyPublisher(IggyClient!, Config, |
| Config.LoggerFactory?.CreateLogger<IggyPublisher>() ?? |
| NullLoggerFactory.Instance.CreateLogger<IggyPublisher>()); |
| |
| if (_onBackgroundError != null) |
| { |
| publisher.OnBackgroundError += _onBackgroundError; |
| } |
| |
| if (_onMessageBatchFailed != null) |
| { |
| publisher.OnMessageBatchFailed += _onMessageBatchFailed; |
| } |
| |
| return publisher; |
| } |
| |
| /// <summary> |
| /// Validates the publisher configuration and throws if invalid. |
| /// </summary> |
| /// <exception cref="InvalidOperationException">Thrown when the configuration is invalid.</exception> |
| protected virtual void Validate() |
| { |
| if (Config.CreateIggyClient) |
| { |
| if (string.IsNullOrWhiteSpace(Config.Address)) |
| { |
| throw new InvalidOperationException("Address must be provided when CreateIggyClient is true."); |
| } |
| |
| if (string.IsNullOrWhiteSpace(Config.Login)) |
| { |
| throw new InvalidOperationException("Login must be provided when CreateIggyClient is true."); |
| } |
| |
| if (string.IsNullOrWhiteSpace(Config.Password)) |
| { |
| throw new InvalidOperationException("Password must be provided when CreateIggyClient is true."); |
| } |
| } |
| else |
| { |
| if (IggyClient == null) |
| { |
| throw new InvalidOperationException( |
| "IggyClient must be provided when CreateIggyClient is false."); |
| } |
| } |
| |
| if (Config.CreateStream && string.IsNullOrWhiteSpace(Config.StreamName)) |
| { |
| throw new InvalidOperationException("StreamName must be provided when CreateStream is true."); |
| } |
| |
| if (Config.CreateTopic) |
| { |
| if (string.IsNullOrWhiteSpace(Config.TopicName)) |
| { |
| throw new InvalidOperationException("TopicName must be provided when CreateTopic is true."); |
| } |
| |
| if (Config.TopicPartitionsCount == 0) |
| { |
| throw new InvalidOperationException("TopicPartitionsCount must be greater than 0."); |
| } |
| } |
| |
| if (Config.ReceiveBufferSize <= 0) |
| { |
| throw new InvalidOperationException("ReceiveBufferSize must be greater than 0."); |
| } |
| |
| if (Config.SendBufferSize <= 0) |
| { |
| throw new InvalidOperationException("SendBufferSize must be greater than 0."); |
| } |
| |
| if (Config.EnableBackgroundSending) |
| { |
| if (Config.BackgroundQueueCapacity <= 0) |
| { |
| throw new InvalidOperationException( |
| "BackgroundQueueCapacity must be greater than 0 when EnableBackgroundSending is true."); |
| } |
| |
| if (Config.BackgroundBatchSize <= 0) |
| { |
| throw new InvalidOperationException( |
| "BackgroundBatchSize must be greater than 0 when EnableBackgroundSending is true."); |
| } |
| |
| if (Config.BackgroundFlushInterval <= TimeSpan.Zero) |
| { |
| throw new InvalidOperationException( |
| "BackgroundFlushInterval must be greater than zero when EnableBackgroundSending is true."); |
| } |
| |
| if (Config.BackgroundDisposalTimeout <= TimeSpan.Zero) |
| { |
| throw new InvalidOperationException( |
| "BackgroundDisposalTimeout must be greater than zero when EnableBackgroundSending is true."); |
| } |
| } |
| |
| if (Config.EnableRetry) |
| { |
| if (Config.MaxRetryAttempts <= 0) |
| { |
| throw new InvalidOperationException( |
| "MaxRetryAttempts must be greater than 0 when EnableRetry is true."); |
| } |
| |
| if (Config.InitialRetryDelay <= TimeSpan.Zero) |
| { |
| throw new InvalidOperationException( |
| "InitialRetryDelay must be greater than zero when EnableRetry is true."); |
| } |
| |
| if (Config.MaxRetryDelay <= TimeSpan.Zero) |
| { |
| throw new InvalidOperationException( |
| "MaxRetryDelay must be greater than zero when EnableRetry is true."); |
| } |
| |
| if (Config.InitialRetryDelay > Config.MaxRetryDelay) |
| { |
| throw new InvalidOperationException( |
| "InitialRetryDelay must be less than or equal to MaxRetryDelay."); |
| } |
| } |
| } |
| } |