code review change requests
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index f1c3117..a40a7eb 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -135,10 +135,7 @@
{
_queue.Dispose();
- await _connection.Send(new CommandCloseConsumer
- {
- ConsumerId = _id
- }, CancellationToken.None).ConfigureAwait(false);
+ await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None).ConfigureAwait(false);
}
catch
{
@@ -162,11 +159,7 @@
private async Task RejectPackage(MessagePackage messagePackage, CancellationToken cancellationToken)
{
- var ack = new CommandAck
- {
- Type = CommandAck.AckType.Individual,
- validation_error = CommandAck.ValidationError.ChecksumMismatch
- };
+ var ack = new CommandAck { Type = CommandAck.AckType.Individual, validation_error = CommandAck.ValidationError.ChecksumMismatch };
ack.MessageIds.Add(messagePackage.MessageId);
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index d6181e7..16542dd 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -32,16 +32,9 @@
public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection)
{
- _cachedMetadata = new MessageMetadata
- {
- ProducerName = name
- };
+ _cachedMetadata = new MessageMetadata { ProducerName = name };
- var commandSend = new CommandSend
- {
- ProducerId = id,
- NumMessages = 1
- };
+ var commandSend = new CommandSend { ProducerId = id, NumMessages = 1 };
_cachedSendPackage = new SendPackage(commandSend, _cachedMetadata);
@@ -54,10 +47,7 @@
{
try
{
- await _connection.Send(new CommandCloseProducer
- {
- ProducerId = _id
- }, CancellationToken.None).ConfigureAwait(false);
+ await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None).ConfigureAwait(false);
}
catch
{
@@ -65,19 +55,21 @@
}
}
- public async Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+ public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
_cachedSendPackage.Metadata = _cachedMetadata;
_cachedSendPackage.Payload = payload;
- return await SendPackage(true, cancellationToken).ConfigureAwait(false);
+
+ return SendPackage(true, cancellationToken);
}
- public async Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+ public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
metadata.ProducerName = _cachedMetadata.ProducerName;
_cachedSendPackage.Metadata = metadata;
_cachedSendPackage.Payload = payload;
- return await SendPackage(metadata.SequenceId == 0, cancellationToken).ConfigureAwait(false);
+
+ return SendPackage(metadata.SequenceId == 0, cancellationToken);
}
private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId, CancellationToken cancellationToken)
@@ -92,9 +84,7 @@
_cachedSendPackage.Metadata.SequenceId = _sequenceId.Current;
}
else
- {
_cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId;
- }
var response = await _connection.Send(_cachedSendPackage, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
@@ -108,7 +98,7 @@
{
// Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
if (autoAssignSequenceId)
- _cachedSendPackage.Metadata.SequenceId = 0;
+ _cachedSendPackage.Metadata.SequenceId = 0;
}
}
}
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 387e87d..201bf01 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -142,18 +142,14 @@
$"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'");
}
else
- {
throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
- }
+
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval);
var processManager = new ProcessManager(connectionPool);
-
- var exceptionHandlerPipeline = new ExceptionHandlerPipeline(new List<IHandleException>(_exceptionHandlers)
- {
- new DefaultExceptionHandler(_retryInterval)
- });
+ var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
+ var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline);
}
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index 66b10d9..46421b1 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -71,8 +71,10 @@
{
get
{
- foreach (var prop in Metadata.Properties)
+ for (var i = 0; i < Metadata.Properties.Count; i++)
{
+ var prop = Metadata.Properties[i];
+
if (prop.Key == key)
return prop.Value;
}
@@ -81,8 +83,10 @@
}
set
{
- foreach (var prop in Metadata.Properties)
+ for (var i = 0; i < Metadata.Properties.Count; i++)
{
+ var prop = Metadata.Properties[i];
+
if (prop.Key != key)
continue;
@@ -91,11 +95,7 @@
return;
}
- Metadata.Properties.Add(new KeyValue
- {
- Key = key,
- Value = value
- });
+ Metadata.Properties.Add(new KeyValue { Key = key, Value = value });
}
}