blob: 5d15b0edfd08e544635bd13e02db174ab38ddc9f [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Internal
{
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Internal.Extensions;
using Events;
using Microsoft.Extensions.ObjectPool;
using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
public sealed class SubProducer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
{
private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
private IProducerChannel _channel;
private readonly IExecute _executor;
private readonly IStateChanged<ProducerState> _state;
private readonly IProducerChannelFactory _factory;
private readonly ISchema<TMessage> _schema;
private readonly SequenceId _sequenceId;
private int _isDisposed;
public Uri ServiceUrl { get; }
public string Topic { get; }
public SubProducer(
Guid correlationId,
Uri serviceUrl,
string topic,
ulong initialSequenceId,
IRegisterEvent registerEvent,
IProducerChannel initialChannel,
IExecute executor,
IStateChanged<ProducerState> state,
IProducerChannelFactory factory,
ISchema<TMessage> schema)
{
var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
_messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
_correlationId = correlationId;
ServiceUrl = serviceUrl;
Topic = topic;
_sequenceId = new SequenceId(initialSequenceId);
_eventRegister = registerEvent;
_channel = initialChannel;
_executor = executor;
_state = state;
_factory = factory;
_schema = schema;
_isDisposed = 0;
_eventRegister.Register(new ProducerCreated(_correlationId));
}
public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
=> await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
=> await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
public bool IsFinalState()
=> _state.IsFinalState();
public bool IsFinalState(ProducerState state)
=> _state.IsFinalState(state);
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
_eventRegister.Register(new ProducerDisposed(_correlationId));
await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
=> await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
=> await Send(metadata, _schema.Encode(message), cancellationToken).ConfigureAwait(false);
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var metadata = _messageMetadataPool.Get();
try
{
metadata.SequenceId = _sequenceId.FetchNext();
return await _executor.Execute(() => Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
}
finally
{
_messageMetadataPool.Return(metadata);
}
}
public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var autoAssignSequenceId = metadata.SequenceId == 0;
if (autoAssignSequenceId)
metadata.SequenceId = _sequenceId.FetchNext();
try
{
return await _executor.Execute(() => Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
}
finally
{
if (autoAssignSequenceId)
metadata.SequenceId = 0;
}
}
private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
return response.MessageId.ToMessageId();
}
public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
var oldChannel = _channel;
_channel = channel;
if (oldChannel is not null)
await oldChannel.DisposeAsync().ConfigureAwait(false);
}
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
throw new ProducerDisposedException(GetType().FullName!);
}
}
}