blob: ed3fa51282897fbbdef0d77ddb4c021860ed6a8d [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Internal
{
using Abstractions;
using Extensions;
using PulsarApi;
using System;
using System.Buffers;
using System.Threading.Tasks;
public sealed class ChannelManager : IDisposable
{
private readonly RequestResponseHandler _requestResponseHandler;
private readonly IdLookup<IChannel> _consumerChannels;
private readonly IdLookup<IChannel> _producerChannels;
private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> _incoming;
public ChannelManager()
{
_requestResponseHandler = new RequestResponseHandler();
_consumerChannels = new IdLookup<IChannel>();
_producerChannels = new IdLookup<IChannel>();
_incoming = new EnumLookup<BaseCommand.Type, Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
_incoming.Set(BaseCommand.Type.CloseConsumer, cmd => Incoming(cmd.CloseConsumer));
_incoming.Set(BaseCommand.Type.CloseProducer, cmd => Incoming(cmd.CloseProducer));
_incoming.Set(BaseCommand.Type.ActiveConsumerChange, cmd => Incoming(cmd.ActiveConsumerChange));
_incoming.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => Incoming(cmd.ReachedEndOfTopic));
}
public bool HasChannels()
=> !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel channel)
{
var producerId = _producerChannels.Add(channel);
command.ProducerId = producerId;
var response = _requestResponseHandler.Outgoing(command);
return response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Error)
{
_ = _producerChannels.Remove(producerId);
result.Result.Error.Throw();
}
channel.Connected();
return new ProducerResponse(producerId, result.Result.ProducerSuccess.ProducerName);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}
public Task<SubscribeResponse> Outgoing(CommandSubscribe command, IChannel channel)
{
var consumerId = _consumerChannels.Add(channel);
command.ConsumerId = consumerId;
var response = _requestResponseHandler.Outgoing(command);
return response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Error)
{
_ = _consumerChannels.Remove(consumerId);
result.Result.Error.Throw();
}
channel.Connected();
return new SubscribeResponse(consumerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}
public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
{
var consumerId = command.ConsumerId;
Task<BaseCommand> response;
using (TakeConsumerSenderLock(consumerId))
{
response = _requestResponseHandler.Outgoing(command);
}
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
_ = _consumerChannels.Remove(consumerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
return response;
}
public Task<BaseCommand> Outgoing(CommandCloseProducer command)
{
var producerId = command.ProducerId;
Task<BaseCommand> response;
using (TakeProducerSenderLock(producerId))
{
response = _requestResponseHandler.Outgoing(command);
}
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
_ = _producerChannels.Remove(producerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
return response;
}
public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
{
var consumerId = command.ConsumerId;
Task<BaseCommand> response;
using (TakeConsumerSenderLock(consumerId))
{
response = _requestResponseHandler.Outgoing(command);
}
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
_consumerChannels.Remove(consumerId)?.Unsubscribed();
}, TaskContinuationOptions.OnlyOnRanToCompletion);
return response;
}
public Task<BaseCommand> Outgoing(CommandSend command)
{
using (TakeProducerSenderLock(command.ProducerId))
{
return _requestResponseHandler.Outgoing(command);
}
}
public Task<BaseCommand> Outgoing(CommandGetOrCreateSchema command)
=> _requestResponseHandler.Outgoing(command);
public Task<BaseCommand> Outgoing(CommandConnect command)
=> _requestResponseHandler.Outgoing(command);
public Task<BaseCommand> Outgoing(CommandLookupTopic command)
=> _requestResponseHandler.Outgoing(command);
public Task<BaseCommand> Outgoing(CommandSeek command)
{
using (TakeConsumerSenderLock(command.ConsumerId))
{
return _requestResponseHandler.Outgoing(command);
}
}
public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
{
using (TakeConsumerSenderLock(command.ConsumerId))
{
return _requestResponseHandler.Outgoing(command);
}
}
public void Incoming(BaseCommand command)
=> _incoming.Get(command.CommandType)(command);
public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
=> _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, command.RedeliveryCount, data));
public void Dispose()
{
_requestResponseHandler.Dispose();
foreach (var channel in _consumerChannels.RemoveAll())
channel.Disconnected();
foreach (var channel in _producerChannels.RemoveAll())
channel.Disconnected();
}
private void Incoming(CommandReachedEndOfTopic command)
=> _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
private void Incoming(CommandCloseConsumer command)
{
var channel = _consumerChannels[command.ConsumerId];
if (channel is null)
return;
_ = _consumerChannels.Remove(command.ConsumerId);
_requestResponseHandler.Incoming(command);
channel.ClosedByServer();
}
private void Incoming(CommandCloseProducer command)
{
var channel = _producerChannels[command.ProducerId];
if (channel is null)
return;
_ = _producerChannels.Remove(command.ProducerId);
_requestResponseHandler.Incoming(command);
channel.ClosedByServer();
}
private void Incoming(CommandActiveConsumerChange command)
{
var channel = _consumerChannels[command.ConsumerId];
if (channel is null)
return;
if (command.IsActive)
channel.Activated();
else
channel.Deactivated();
}
private IDisposable TakeConsumerSenderLock(ulong consumerId)
{
var channel = _consumerChannels[consumerId];
if (channel is null)
throw new OperationCanceledException();
return channel.SenderLock();
}
private IDisposable TakeProducerSenderLock(ulong producerId)
{
var channel = _producerChannels[producerId];
if (channel is null)
throw new OperationCanceledException();
return channel.SenderLock();
}
}
}