blob: 8c9592137c05219d9f586976d15be19437862012 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using Apache.Iggy.Enums;
using Apache.Iggy.Exceptions;
using Apache.Iggy.IggyClient;
using Apache.Iggy.Messages;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Apache.Iggy.Publishers;
/// <summary>
/// High-level publisher for sending messages to Iggy streams and topics.
/// Supports background message batching, automatic retry, encryption, and stream/topic auto-creation.
/// </summary>
public partial class IggyPublisher : IAsyncDisposable
{
private readonly BackgroundMessageProcessor? _backgroundProcessor;
private readonly IIggyClient _client;
private readonly IggyPublisherConfig _config;
private readonly ILogger<IggyPublisher> _logger;
private bool _disposed;
private bool _isInitialized;
/// <summary>
/// Gets the identifier of the stream this publisher sends messages to.
/// </summary>
public Identifier StreamId => _config.StreamId;
/// <summary>
/// Gets the identifier of the topic this publisher sends messages to.
/// </summary>
public Identifier TopicId => _config.TopicId;
/// <summary>
/// Initializes a new instance of the <see cref="IggyPublisher" /> class.
/// </summary>
/// <param name="client">The Iggy client to use for communication.</param>
/// <param name="config">Publisher configuration settings.</param>
/// <param name="logger">Logger instance for diagnostic output.</param>
public IggyPublisher(IIggyClient client, IggyPublisherConfig config, ILogger<IggyPublisher> logger)
{
_client = client;
_config = config;
_logger = logger;
if (_config.EnableBackgroundSending)
{
LogInitializingBackgroundSending(_config.BackgroundQueueCapacity, _config.BackgroundBatchSize);
ILogger<BackgroundMessageProcessor> processorLogger
= _config.LoggerFactory?.CreateLogger<BackgroundMessageProcessor>()
?? NullLogger<BackgroundMessageProcessor>.Instance;
_backgroundProcessor = new BackgroundMessageProcessor(_client, _config, processorLogger);
}
}
/// <summary>
/// Disposes the publisher, stops the background processor if running,
/// and logs out and disposes the client if it was created by the publisher.
/// </summary>
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
LogDisposingPublisher();
if (_backgroundProcessor != null)
{
await _backgroundProcessor.DisposeAsync();
}
if (_config.CreateIggyClient && _isInitialized)
{
try
{
await _client.LogoutUser();
_client.Dispose();
}
catch (Exception e)
{
LogFailedToLogoutOrDispose(e);
}
}
_disposed = true;
LogPublisherDisposed();
}
/// <summary>
/// Fired when any error occurs in the background task
/// </summary>
public event EventHandler<PublisherErrorEventArgs>? OnBackgroundError
{
add
{
if (_backgroundProcessor != null)
{
_backgroundProcessor.OnBackgroundError += value;
}
}
remove
{
if (_backgroundProcessor != null)
{
_backgroundProcessor.OnBackgroundError -= value;
}
}
}
/// <summary>
/// Fired when a batch of messages fails to send
/// </summary>
public event EventHandler<MessageBatchFailedEventArgs>? OnMessageBatchFailed
{
add
{
if (_backgroundProcessor != null)
{
_backgroundProcessor.OnMessageBatchFailed += value;
}
}
remove
{
if (_backgroundProcessor != null)
{
_backgroundProcessor.OnMessageBatchFailed -= value;
}
}
}
/// <summary>
/// Initializes the publisher by authenticating, ensuring stream and topic exist,
/// and starting the background processor if enabled.
/// </summary>
/// <param name="ct">Cancellation token to cancel initialization.</param>
/// <exception cref="StreamNotFoundException">Thrown when the stream doesn't exist and auto-creation is disabled.</exception>
/// <exception cref="TopicNotFoundException">Thrown when the topic doesn't exist and auto-creation is disabled.</exception>
public async Task InitAsync(CancellationToken ct = default)
{
if (_isInitialized)
{
LogPublisherAlreadyInitialized();
return;
}
LogInitializingPublisher(_config.StreamId, _config.TopicId);
if (_config.CreateIggyClient)
{
await _client.LoginUser(_config.Login, _config.Password, ct);
LogUserLoggedIn(_config.Login);
}
await CreateStreamIfNeeded(ct);
await CreateTopicIfNeeded(ct);
if (_config.EnableBackgroundSending)
{
_backgroundProcessor?.Start();
LogBackgroundSendingStarted();
}
_isInitialized = true;
LogPublisherInitialized();
}
/// <summary>
/// Creates the stream if it doesn't exist and auto-creation is enabled in the configuration.
/// </summary>
/// <param name="ct">Cancellation token to cancel the operation.</param>
/// <exception cref="StreamNotFoundException">Thrown when the stream doesn't exist and auto-creation is disabled.</exception>
private async Task CreateStreamIfNeeded(CancellationToken ct)
{
if (await _client.GetStreamByIdAsync(_config.StreamId, ct) != null)
{
LogStreamAlreadyExists(_config.StreamId);
return;
}
if (!_config.CreateStream || string.IsNullOrEmpty(_config.StreamName))
{
LogStreamDoesNotExist(_config.StreamId);
throw new StreamNotFoundException(_config.StreamId);
}
LogCreatingStream(_config.StreamId, _config.StreamName);
if (_config.StreamId.Kind is IdKind.String)
{
await _client.CreateStreamAsync(_config.StreamId.GetString(), null, ct);
}
else
{
await _client.CreateStreamAsync(_config.StreamName, _config.StreamId.GetUInt32(), ct);
}
LogStreamCreated(_config.StreamId);
}
/// <summary>
/// Creates the topic if it doesn't exist and auto-creation is enabled in the configuration.
/// </summary>
/// <param name="ct">Cancellation token to cancel the operation.</param>
/// <exception cref="TopicNotFoundException">Thrown when the topic doesn't exist and auto-creation is disabled.</exception>
private async Task CreateTopicIfNeeded(CancellationToken ct)
{
if (await _client.GetTopicByIdAsync(_config.StreamId, _config.TopicId, ct) != null)
{
LogTopicAlreadyExists(_config.TopicId, _config.StreamId);
return;
}
if (!_config.CreateTopic || string.IsNullOrEmpty(_config.TopicName))
{
LogTopicDoesNotExist(_config.TopicId, _config.StreamId);
throw new TopicNotFoundException(_config.TopicId, _config.StreamId);
}
LogCreatingTopic(_config.TopicId, _config.TopicName, _config.StreamId);
if (_config.TopicId.Kind is IdKind.String)
{
await _client.CreateTopicAsync(_config.StreamId, _config.TopicId.GetString(),
_config.TopicPartitionsCount, _config.TopicCompressionAlgorithm, null,
_config.TopicReplicationFactor, _config.TopicMessageExpiry, _config.TopicMaxTopicSize, ct);
}
else
{
await _client.CreateTopicAsync(_config.StreamId, _config.TopicName, _config.TopicPartitionsCount,
_config.TopicCompressionAlgorithm, _config.TopicId.GetUInt32(), _config.TopicReplicationFactor,
_config.TopicMessageExpiry, _config.TopicMaxTopicSize, ct);
}
LogTopicCreated(_config.TopicId, _config.StreamId);
}
/// <summary>
/// Sends a collection of messages to the configured stream and topic.
/// If background sending is enabled, messages are queued for asynchronous processing.
/// Otherwise, messages are sent immediately.
/// </summary>
/// <param name="messages">The messages to send.</param>
/// <param name="ct">Cancellation token to cancel the send operation.</param>
/// <exception cref="PublisherNotInitializedException">Thrown when attempting to send before initialization.</exception>
public async Task SendMessages(IList<Message> messages, CancellationToken ct = default)
{
if (!_isInitialized)
{
LogSendBeforeInitialization();
throw new PublisherNotInitializedException();
}
if (messages.Count == 0)
{
return;
}
EncryptMessages(messages);
if (_config.EnableBackgroundSending && _backgroundProcessor != null)
{
LogQueuingMessages(messages.Count);
foreach (var message in messages)
{
await _backgroundProcessor.MessageWriter.WriteAsync(message, ct);
}
}
else
{
await _client.SendMessagesAsync(_config.StreamId, _config.TopicId, _config.Partitioning, messages, ct);
LogSuccessfullySentMessages(messages.Count);
}
}
/// <summary>
/// Waits until all queued messages have been sent by the background processor.
/// Only applicable when background sending is enabled. Returns immediately otherwise.
/// </summary>
/// <param name="ct">Cancellation token to cancel the wait operation.</param>
public async Task WaitUntilAllSends(CancellationToken ct = default)
{
if (!_config.EnableBackgroundSending || _backgroundProcessor == null)
{
return;
}
LogWaitingForPendingMessages();
while (_backgroundProcessor.MessageReader.Count > 0 ||
_backgroundProcessor.IsSending)
{
await Task.Delay(10, ct);
}
LogAllPendingMessagesSent();
}
/// <summary>
/// Encrypts all messages in the list using the configured message encryptor, if available.
/// Updates the payload length in the message header after encryption.
/// </summary>
/// <param name="messages">The messages to encrypt.</param>
private void EncryptMessages(IList<Message> messages)
{
if (_config.MessageEncryptor == null)
{
return;
}
foreach (var message in messages)
{
message.Payload = _config.MessageEncryptor.Encrypt(message.Payload);
message.Header.PayloadLength = message.Payload.Length;
}
}
}