blob: 510b66f3cd42ad085fa521a5456a0ce5c5b78cb2 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Internal.Extensions;
using Microsoft.Extensions.ObjectPool;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.ExceptionServices;
public sealed class MessageProcessor<TMessage> : IDisposable
{
private readonly string _operationName;
private readonly KeyValuePair<string, object?>[] _activityTags;
private readonly KeyValuePair<string, object?>[] _meterTags;
private readonly IConsumer<TMessage> _consumer;
private readonly Func<IMessage<TMessage>, CancellationToken, ValueTask> _processor;
private readonly LinkedList<Task> _processorTasks;
private readonly ConcurrentQueue<ProcessInfo> _processingQueue;
private readonly SemaphoreSlim _receiveLock;
private readonly SemaphoreSlim _acknowledgeLock;
private readonly ObjectPool<ProcessInfo> _processInfoPool;
private readonly bool _linkTraces;
private readonly bool _ensureOrderedAcknowledgment;
private readonly int _maxDegreeOfParallelism;
private readonly int _maxMessagesPerTask;
private readonly TaskScheduler _taskScheduler;
public MessageProcessor(IConsumer<TMessage> consumer, Func<IMessage<TMessage>, CancellationToken, ValueTask> processor, ProcessingOptions options)
{
const string operation = "process";
_operationName = $"{consumer.Topic} {operation}";
_activityTags =
[
new("messaging.destination", consumer.Topic),
new("messaging.destination_kind", "topic"),
new("messaging.operation", operation),
new("messaging.system", "pulsar"),
new("messaging.url", consumer.ServiceUrl),
new("messaging.pulsar.subscription", consumer.SubscriptionName)
];
_meterTags =
[
new("topic", consumer.Topic),
new("subscription", consumer.SubscriptionName)
];
_consumer = consumer;
_processor = processor;
_processorTasks = new LinkedList<Task>();
_processingQueue = new ConcurrentQueue<ProcessInfo>();
_receiveLock = new SemaphoreSlim(1, 1);
_acknowledgeLock = new SemaphoreSlim(1, 1);
_processInfoPool = new DefaultObjectPool<ProcessInfo>(new DefaultPooledObjectPolicy<ProcessInfo>());
_linkTraces = options.LinkTraces;
_ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
_maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
_maxMessagesPerTask = options.MaxMessagesPerTask;
_taskScheduler = options.TaskScheduler;
}
public void Dispose()
{
_receiveLock.Dispose();
_acknowledgeLock.Dispose();
}
public async ValueTask Process(CancellationToken cancellationToken)
{
for (var i = 1; i < _maxDegreeOfParallelism; ++i)
{
StartNewProcessorTask(cancellationToken);
}
while (true)
{
StartNewProcessorTask(cancellationToken);
var completedTask = await Task.WhenAny(_processorTasks).ConfigureAwait(false);
if (completedTask.IsFaulted)
ExceptionDispatchInfo.Capture(completedTask.Exception!.InnerException!).Throw();
_processorTasks.Remove(completedTask);
cancellationToken.ThrowIfCancellationRequested();
}
}
private async ValueTask Processor(CancellationToken cancellationToken)
{
var messagesProcessed = 0;
var processInfo = new ProcessInfo();
var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgment && _maxDegreeOfParallelism > 1;
var isUnbounded = _maxMessagesPerTask == ProcessingOptions.Unbounded;
while (!cancellationToken.IsCancellationRequested)
{
if (needToEnsureOrderedAcknowledgement)
{
processInfo = _processInfoPool.Get();
await _receiveLock.WaitAsync(cancellationToken).ConfigureAwait(false);
}
var message = await _consumer.Receive(cancellationToken).ConfigureAwait(false);
if (needToEnsureOrderedAcknowledgement)
{
processInfo.MessageId = message.MessageId;
processInfo.IsProcessed = false;
_processingQueue.Enqueue(processInfo);
_receiveLock.Release();
}
var activity = DotPulsarActivitySource.StartConsumerActivity(message, _operationName, _activityTags, _linkTraces);
if (activity is not null && activity.IsAllDataRequested)
{
activity.SetMessageId(message.MessageId);
activity.SetPayloadSize(message.Data.Length);
activity.SetStatus(ActivityStatusCode.Ok);
}
var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;
try
{
await _processor(message, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
if (activity is not null && activity.IsAllDataRequested)
activity.AddException(exception);
}
if (startTimestamp != 0)
DotPulsarMeter.MessageProcessed(startTimestamp, _meterTags);
activity?.Dispose();
if (needToEnsureOrderedAcknowledgement)
{
await _acknowledgeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
processInfo.IsProcessed = true;
await AcknowledgeProcessedMessages(cancellationToken).ConfigureAwait(false);
_acknowledgeLock.Release();
}
else
await _consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
if (!isUnbounded && ++messagesProcessed == _maxMessagesPerTask)
return;
}
}
private async ValueTask AcknowledgeProcessedMessages(CancellationToken cancellationToken)
{
var messagesToAcknowledge = 0;
var messageId = MessageId.Earliest;
while (_processingQueue.TryPeek(out var processInfo))
{
if (!processInfo.IsProcessed)
break;
++messagesToAcknowledge;
if (_processingQueue.TryDequeue(out processInfo))
{
messageId = processInfo.MessageId;
_processInfoPool.Return(processInfo);
}
}
if (messagesToAcknowledge == 1)
await _consumer.Acknowledge(messageId, cancellationToken).ConfigureAwait(false);
else if (messagesToAcknowledge > 1)
await _consumer.AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false);
}
private void StartNewProcessorTask(CancellationToken cancellationToken)
{
var processorTask = Task.Factory.StartNew(
async () => await Processor(cancellationToken).ConfigureAwait(false),
cancellationToken,
TaskCreationOptions.DenyChildAttach,
_taskScheduler).Unwrap();
_processorTasks.AddLast(processorTask);
}
private sealed class ProcessInfo
{
public ProcessInfo()
{
MessageId = MessageId.Earliest;
IsProcessed = false;
}
public MessageId MessageId { get; set; }
public bool IsProcessed { get; set; }
}
}