Updated NuGet package and implemented faster enum-based lookup. Twice as fast as switching on the enum.
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index d766c2d..3f0521a 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -22,7 +22,7 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.3" />
+ <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.4" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="3.0.73" />
<PackageReference Include="System.IO.Pipelines" Version="5.0.1" />
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 8ac06f5..8b24df8 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,6 +18,7 @@
using Exceptions;
using Extensions;
using PulsarApi;
+ using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
@@ -232,6 +233,19 @@
{
await Task.Yield();
+ var lookup = new EnumLookup<BaseCommand.Type, Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
+
+ lookup.Set(BaseCommand.Type.CloseConsumer, cmd => _channelManager.Incoming(cmd.CloseConsumer));
+ lookup.Set(BaseCommand.Type.ActiveConsumerChange, cmd => _channelManager.Incoming(cmd.ActiveConsumerChange));
+ lookup.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => _channelManager.Incoming(cmd.ReachedEndOfTopic));
+ lookup.Set(BaseCommand.Type.Ping, cmd => _pingPongHandler.GotPing());
+ lookup.Set(BaseCommand.Type.CloseProducer, cmd =>
+ {
+ _channelManager.Incoming(cmd.CloseProducer);
+ _requestResponseHandler.Incoming(cmd.CloseProducer);
+ });
+
+
try
{
await foreach (var frame in _stream.Frames())
@@ -239,31 +253,10 @@
var commandSize = frame.ReadUInt32(0, true);
var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
- switch (command.CommandType)
- {
- case BaseCommand.Type.Message:
- _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
- break;
- case BaseCommand.Type.CloseConsumer:
- _channelManager.Incoming(command.CloseConsumer);
- break;
- case BaseCommand.Type.ActiveConsumerChange:
- _channelManager.Incoming(command.ActiveConsumerChange);
- break;
- case BaseCommand.Type.ReachedEndOfTopic:
- _channelManager.Incoming(command.ReachedEndOfTopic);
- break;
- case BaseCommand.Type.CloseProducer:
- _channelManager.Incoming(command.CloseProducer);
- _requestResponseHandler.Incoming(command.CloseProducer);
- break;
- case BaseCommand.Type.Ping:
- _pingPongHandler.GotPing();
- break;
- default:
- _requestResponseHandler.Incoming(command);
- break;
- }
+ if (command.CommandType == BaseCommand.Type.Message)
+ _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
+ else
+ lookup.Get(command.CommandType)(command);
}
}
catch
diff --git a/src/DotPulsar/Internal/EnumLookup.cs b/src/DotPulsar/Internal/EnumLookup.cs
new file mode 100644
index 0000000..5d7bc33
--- /dev/null
+++ b/src/DotPulsar/Internal/EnumLookup.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using System;
+ using System.Linq;
+ using System.Runtime.CompilerServices;
+
+ public sealed class EnumLookup<TKey, TValue> where TKey : Enum
+ {
+ private readonly TValue[] _values;
+
+ public EnumLookup(TValue defaultValue)
+ {
+ var max = Enum.GetValues(typeof(TKey)).Cast<int>().Max() + 1;
+ _values = new TValue[max];
+ for (var i = 0; i < max; ++i)
+ _values[i] = defaultValue;
+ }
+
+ public void Set(TKey key, TValue value)
+ => _values[Unsafe.As<TKey, int>(ref key)] = value;
+
+ public TValue Get(TKey key)
+ => _values[Unsafe.As<TKey, int>(ref key)];
+ }
+}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 5f9bc04..b3b7d0a 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -22,27 +22,64 @@
public sealed class RequestResponseHandler : IDisposable
{
- private readonly Awaiter<IRequest, BaseCommand> _requests;
private readonly RequestId _requestId;
+ private readonly Awaiter<IRequest, BaseCommand> _requests;
+ private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> _setRequestId;
+ private readonly EnumLookup<BaseCommand.Type, Func<BaseCommand, IRequest>> _getResponseIdentifier;
public RequestResponseHandler()
{
- _requests = new Awaiter<IRequest, BaseCommand>();
_requestId = new RequestId();
+
+ _requests = new Awaiter<IRequest, BaseCommand>();
+
+ _setRequestId = new EnumLookup<BaseCommand.Type, Action<BaseCommand>>(cmd => { });
+ _setRequestId.Set(BaseCommand.Type.Seek, cmd => cmd.Seek.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.Error, cmd => cmd.Error.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.Producer, cmd => cmd.Producer.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.Lookup, cmd => cmd.LookupTopic.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.Subscribe, cmd => cmd.Subscribe.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.Unsubscribe, cmd => cmd.Unsubscribe.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.CloseConsumer, cmd => cmd.CloseConsumer.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.CloseProducer, cmd => cmd.CloseProducer.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.GetLastMessageId, cmd => cmd.GetLastMessageId.RequestId = _requestId.FetchNext());
+ _setRequestId.Set(BaseCommand.Type.GetOrCreateSchema, cmd => cmd.GetOrCreateSchema.RequestId = _requestId.FetchNext());
+
+ _getResponseIdentifier = new EnumLookup<BaseCommand.Type, Func<BaseCommand, IRequest>>(cmd => throw new ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType, "CommandType not supported as request/response type"));
+ _getResponseIdentifier.Set(BaseCommand.Type.Connect, cmd => new ConnectRequest());
+ _getResponseIdentifier.Set(BaseCommand.Type.Connected, cmd => new ConnectRequest());
+ _getResponseIdentifier.Set(BaseCommand.Type.Seek, cmd => new StandardRequest(cmd.Seek.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Send, cmd => new SendRequest(cmd.Send.ProducerId, cmd.Send.SequenceId));
+ _getResponseIdentifier.Set(BaseCommand.Type.SendError, cmd => new SendRequest(cmd.SendError.ProducerId, cmd.SendError.SequenceId));
+ _getResponseIdentifier.Set(BaseCommand.Type.SendReceipt, cmd => new SendRequest(cmd.SendReceipt.ProducerId, cmd.SendReceipt.SequenceId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Producer, cmd => new StandardRequest(cmd.Producer.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd => new StandardRequest(cmd.ProducerSuccess.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd => new StandardRequest(cmd.CloseConsumer.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd => new StandardRequest(cmd.CloseProducer.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Lookup, cmd => new StandardRequest(cmd.LookupTopic.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd => new StandardRequest(cmd.LookupTopicResponse.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Subscribe, cmd => new StandardRequest(cmd.Subscribe.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Unsubscribe, cmd => new StandardRequest(cmd.Unsubscribe.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageId, cmd => new StandardRequest(cmd.GetLastMessageId.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd => new StandardRequest(cmd.GetLastMessageIdResponse.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchema, cmd => new StandardRequest(cmd.GetOrCreateSchema.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd => new StandardRequest(cmd.GetOrCreateSchemaResponse.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => new StandardRequest(cmd.Success.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd => !_requestId.IsPastInitialId() ? new ConnectRequest() : new StandardRequest(cmd.Error.RequestId));
}
public void Dispose()
- => _requests.Dispose();
+ => _requests.Dispose();
public Task<BaseCommand> Outgoing(BaseCommand command)
{
- SetRequestId(command);
- return _requests.CreateTask(GetResponseIdentifier(command));
+ _setRequestId.Get(command.CommandType)(command);
+ return _requests.CreateTask(_getResponseIdentifier.Get(command.CommandType)(command));
}
public void Incoming(BaseCommand command)
{
- var identifier = GetResponseIdentifier(command);
+ var identifier = _getResponseIdentifier.Get(command.CommandType)(command);
if (identifier is not null)
_requests.SetResult(identifier, command);
@@ -57,68 +94,5 @@
_requests.Cancel(request);
}
}
-
- private void SetRequestId(BaseCommand cmd)
- {
- switch (cmd.CommandType)
- {
- case BaseCommand.Type.Seek:
- cmd.Seek.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.Lookup:
- cmd.LookupTopic.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.Error:
- cmd.Error.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.Producer:
- cmd.Producer.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.CloseProducer:
- cmd.CloseProducer.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.Subscribe:
- cmd.Subscribe.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.Unsubscribe:
- cmd.Unsubscribe.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.CloseConsumer:
- cmd.CloseConsumer.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.GetLastMessageId:
- cmd.GetLastMessageId.RequestId = _requestId.FetchNext();
- return;
- case BaseCommand.Type.GetOrCreateSchema:
- cmd.GetOrCreateSchema.RequestId = _requestId.FetchNext();
- return;
- }
- }
-
- private IRequest GetResponseIdentifier(BaseCommand cmd)
- => cmd.CommandType switch
- {
- BaseCommand.Type.Send => new SendRequest(cmd.Send.ProducerId, cmd.Send.SequenceId),
- BaseCommand.Type.SendReceipt => new SendRequest(cmd.SendReceipt.ProducerId, cmd.SendReceipt.SequenceId),
- BaseCommand.Type.SendError => new SendRequest(cmd.SendError.ProducerId, cmd.SendError.SequenceId),
- BaseCommand.Type.Connect => new ConnectRequest(),
- BaseCommand.Type.Connected => new ConnectRequest(),
- BaseCommand.Type.Error => !_requestId.IsPastInitialId() ? new ConnectRequest() : new StandardRequest(cmd.Error.RequestId),
- BaseCommand.Type.Producer => new StandardRequest(cmd.Producer.RequestId),
- BaseCommand.Type.ProducerSuccess => new StandardRequest(cmd.ProducerSuccess.RequestId),
- BaseCommand.Type.CloseProducer => new StandardRequest(cmd.CloseProducer.RequestId),
- BaseCommand.Type.Lookup => new StandardRequest(cmd.LookupTopic.RequestId),
- BaseCommand.Type.LookupResponse => new StandardRequest(cmd.LookupTopicResponse.RequestId),
- BaseCommand.Type.Unsubscribe => new StandardRequest(cmd.Unsubscribe.RequestId),
- BaseCommand.Type.Subscribe => new StandardRequest(cmd.Subscribe.RequestId),
- BaseCommand.Type.Success => new StandardRequest(cmd.Success.RequestId),
- BaseCommand.Type.Seek => new StandardRequest(cmd.Seek.RequestId),
- BaseCommand.Type.CloseConsumer => new StandardRequest(cmd.CloseConsumer.RequestId),
- BaseCommand.Type.GetLastMessageId => new StandardRequest(cmd.GetLastMessageId.RequestId),
- BaseCommand.Type.GetLastMessageIdResponse => new StandardRequest(cmd.GetLastMessageIdResponse.RequestId),
- BaseCommand.Type.GetOrCreateSchema => new StandardRequest(cmd.GetOrCreateSchema.RequestId),
- BaseCommand.Type.GetOrCreateSchemaResponse => new StandardRequest(cmd.GetOrCreateSchemaResponse.RequestId),
- _ => throw new ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType, "CommandType not supported as request/response type")
- };
}
}