Revert "Move payload command component sizes into constants + make inactive connections closing interval configurable (#19)"
This reverts commit 21914ba7fe5d4e1891f54a8225f96eb338877ec4.
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 37e0e9f..3010b84 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -74,11 +74,6 @@
IPulsarClientBuilder VerifyCertificateName(bool verifyCertificateName);
/// <summary>
- /// The time to wait before checking for inactive connections that can be closed. The default is 60 seconds.
- /// </summary>
- IPulsarClientBuilder CloseInactiveConnectionsInterval(TimeSpan interval);
-
- /// <summary>
/// Create the client.
/// </summary>
IPulsarClient Build();
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index d3dff22..00b2907 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -36,7 +36,7 @@
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _closeInactiveConnections;
- public ConnectionPool(CommandConnect commandConnect, Uri serviceUrl, Connector connector, EncryptionPolicy encryptionPolicy, TimeSpan closeInactiveConnectionsInterval)
+ public ConnectionPool(CommandConnect commandConnect, Uri serviceUrl, Connector connector, EncryptionPolicy encryptionPolicy)
{
_lock = new AsyncLock();
_commandConnect = commandConnect;
@@ -45,7 +45,7 @@
_encryptionPolicy = encryptionPolicy;
_connections = new ConcurrentDictionary<Uri, Connection>();
_cancellationTokenSource = new CancellationTokenSource();
- _closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
+ _closeInactiveConnections = CloseInactiveConnections(TimeSpan.FromSeconds(60), _cancellationTokenSource.Token); //TODO Get '60' from configuration
}
public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index c1d5f1c..69b373a 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -28,8 +28,6 @@
DefaultPulsarPort = 6650;
DefaultPulsarSSLPort = 6651;
MagicNumber = new byte[] { 0x0e, 0x01 };
- MetadataSizeOffset = 6;
- MetadataOffset = 10;
}
public static string ClientVersion { get; }
@@ -39,7 +37,5 @@
public static int DefaultPulsarPort { get; }
public static int DefaultPulsarSSLPort { get; }
public static byte[] MagicNumber { get; }
- public static int MetadataSizeOffset { get; }
- public static int MetadataOffset { get; }
}
}
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 64b6c7a..d19f76d 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -103,6 +103,7 @@
PriorityLevel = _priorityLevel,
ReadCompacted = _readCompacted,
SubscriptionType = _subscriptionType
+
};
return _pulsarClient.CreateConsumer(options);
diff --git a/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
index 5a94334..45afe67 100644
--- a/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
@@ -19,13 +19,13 @@
public static class MessagePackageExtensions
{
public static uint GetMetadataSize(this MessagePackage package)
- => package.Data.ReadUInt32(Constants.MetadataSizeOffset, true);
-
- public static PulsarApi.MessageMetadata ExtractMetadata(this MessagePackage package, uint metadataSize)
- => Serializer.Deserialize<PulsarApi.MessageMetadata>(package.Data.Slice(Constants.MetadataOffset, metadataSize));
+ => package.Data.ReadUInt32(6, true); //TODO RK: 6 should be a constant
public static ReadOnlySequence<byte> ExtractData(this MessagePackage package, uint metadataSize)
- => package.Data.Slice(Constants.MetadataOffset + metadataSize);
+ => package.Data.Slice(10 + metadataSize); //TODO RK: 10 should be a constant
+
+ public static PulsarApi.MessageMetadata ExtractMetadata(this MessagePackage package, uint metadataSize)
+ => Serializer.Deserialize<PulsarApi.MessageMetadata>(package.Data.Slice(10, metadataSize)); //TODO RK: 10 should be a constant
public static bool IsValid(this MessagePackage package)
=> StartsWithMagicNumber(package.Data) && HasValidCheckSum(package.Data);
@@ -34,6 +34,6 @@
=> input.StartsWith(Constants.MagicNumber);
private static bool HasValidCheckSum(ReadOnlySequence<byte> input)
- => input.ReadUInt32(Constants.MagicNumber.Length, true) == Crc32C.Calculate(input.Slice(Constants.MetadataSizeOffset));
+ => input.ReadUInt32(2, true) == Crc32C.Calculate(input.Slice(6));
}
-}
+}
\ No newline at end of file
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index dc22d89..c2a61f5 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -34,7 +34,6 @@
private X509Certificate2Collection _clientCertificates;
private bool _verifyCertificateAuthority;
private bool _verifyCertificateName;
- private TimeSpan _closeInactiveConnectionsInterval;
public PulsarClientBuilder()
{
@@ -50,7 +49,6 @@
_clientCertificates = new X509Certificate2Collection();
_verifyCertificateAuthority = true;
_verifyCertificateName = false;
- _closeInactiveConnectionsInterval = TimeSpan.FromSeconds(60);
}
public IPulsarClientBuilder AuthenticateUsingClientCertificate(X509Certificate2 clientCertificate)
@@ -115,12 +113,6 @@
return this;
}
- public IPulsarClientBuilder CloseInactiveConnectionsInterval(TimeSpan interval)
- {
- _closeInactiveConnectionsInterval = interval;
- return this;
- }
-
public IPulsarClient Build()
{
var scheme = _serviceUrl.Scheme;
@@ -145,7 +137,7 @@
throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
- var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval);
+ var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value);
var exceptionHandlers = new List<IHandleException>(_exceptionHandlers)
{
new DefaultExceptionHandler(_retryInterval)
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index 53d0693..133169b 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -21,7 +21,7 @@
{
public sealed class Message
{
- private readonly List<Internal.PulsarApi.KeyValue> _keyValues;
+ private readonly List<Internal.PulsarApi.KeyValue> _keyVaues;
private IReadOnlyDictionary<string, string>? _properties;
internal Message(
@@ -42,7 +42,7 @@
Key = metadata.PartitionKey;
SequenceId = metadata.SequenceId;
OrderingKey = metadata.OrderingKey;
- _keyValues = metadata.Properties;
+ _keyVaues = metadata.Properties;
}
else
{
@@ -51,7 +51,7 @@
Key = singleMetadata.PartitionKey;
OrderingKey = singleMetadata.OrderingKey;
SequenceId = singleMetadata.SequenceId;
- _keyValues = singleMetadata.Properties;
+ _keyVaues = singleMetadata.Properties;
}
}
@@ -76,6 +76,6 @@
public ulong PublishTime { get; }
public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)PublishTime);
- public IReadOnlyDictionary<string, string> Properties => _properties ??= _keyValues.ToDictionary(p => p.Key, p => p.Value);
+ public IReadOnlyDictionary<string, string> Properties => _properties ??= _keyVaues.ToDictionary(p => p.Key, p => p.Value);
}
}