blob: 79c8533692bc2fa25c0258b544001469785eb9d2 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Internal
{
using System;
using System.Threading.Tasks;
using PulsarApi;
public sealed class RequestResponseHandler : IDisposable
{
private const string ConnectResponseIdentifier = "Connected";
private readonly Awaitor<string, BaseCommand> _responses;
private ulong _requestId;
public RequestResponseHandler()
{
_responses = new Awaitor<string, BaseCommand>();
_requestId = 1;
}
public void Dispose()
=> _responses.Dispose();
public Task<BaseCommand> Outgoing(BaseCommand command)
{
SetRequestId(command);
return _responses.CreateTask(GetResponseIdentifier(command));
}
public void Incoming(BaseCommand command)
{
var identifier = GetResponseIdentifier(command);
if (identifier != null)
_responses.SetResult(identifier, command);
}
private void SetRequestId(BaseCommand cmd)
{
switch (cmd.CommandType)
{
case BaseCommand.Type.Seek:
cmd.Seek.RequestId = _requestId++;
return;
case BaseCommand.Type.Lookup:
cmd.LookupTopic.RequestId = _requestId++;
return;
case BaseCommand.Type.Error:
cmd.Error.RequestId = _requestId++;
return;
case BaseCommand.Type.Producer:
cmd.Producer.RequestId = _requestId++;
return;
case BaseCommand.Type.CloseProducer:
cmd.CloseProducer.RequestId = _requestId++;
return;
case BaseCommand.Type.Subscribe:
cmd.Subscribe.RequestId = _requestId++;
return;
case BaseCommand.Type.Unsubscribe:
cmd.Unsubscribe.RequestId = _requestId++;
return;
case BaseCommand.Type.CloseConsumer:
cmd.CloseConsumer.RequestId = _requestId++;
return;
case BaseCommand.Type.GetLastMessageId:
cmd.GetLastMessageId.RequestId = _requestId++;
return;
}
}
private string GetResponseIdentifier(BaseCommand cmd)
{
switch (cmd.CommandType)
{
case BaseCommand.Type.Connect:
case BaseCommand.Type.Connected:
return ConnectResponseIdentifier;
case BaseCommand.Type.Send:
return cmd.Send.ProducerId.ToString() + '-' + cmd.Send.SequenceId;
case BaseCommand.Type.SendError:
return cmd.SendError.ProducerId.ToString() + '-' + cmd.SendError.SequenceId;
case BaseCommand.Type.SendReceipt:
return cmd.SendReceipt.ProducerId.ToString() + '-' + cmd.SendReceipt.SequenceId;
case BaseCommand.Type.Error:
return _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString();
case BaseCommand.Type.Producer:
return cmd.Producer.RequestId.ToString();
case BaseCommand.Type.ProducerSuccess:
return cmd.ProducerSuccess.RequestId.ToString();
case BaseCommand.Type.CloseProducer:
return cmd.CloseProducer.RequestId.ToString();
case BaseCommand.Type.Lookup:
return cmd.LookupTopic.RequestId.ToString();
case BaseCommand.Type.LookupResponse:
return cmd.LookupTopicResponse.RequestId.ToString();
case BaseCommand.Type.Unsubscribe:
return cmd.Unsubscribe.RequestId.ToString();
case BaseCommand.Type.Subscribe:
return cmd.Subscribe.RequestId.ToString();
case BaseCommand.Type.Success:
return cmd.Success.RequestId.ToString();
case BaseCommand.Type.Seek:
return cmd.Seek.RequestId.ToString();
case BaseCommand.Type.CloseConsumer:
return cmd.CloseConsumer.RequestId.ToString();
case BaseCommand.Type.GetLastMessageId:
return cmd.GetLastMessageId.RequestId.ToString();
case BaseCommand.Type.GetLastMessageIdResponse:
return cmd.GetLastMessageIdResponse.RequestId.ToString();
default:
throw new ArgumentOutOfRangeException("CommandType", cmd.CommandType, "CommandType not supported as request/response type");
}
}
}
}