Make ready for 1.0.0 and fix seek problem (broker is sending close command before success)
diff --git a/.asf.yaml b/.asf.yaml
index 319738d..d4007d6 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -18,7 +18,7 @@
#
github:
- description: "The official .NET/C# client library for Apache Pulsar"
+ description: "The official .NET client library for Apache Pulsar"
homepage: https://pulsar.apache.org/
labels:
- pulsar
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f6246fa..b4b558b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,14 +4,14 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [Unreleased]
+## [1.0.0] - 2021-03-17
### Added
- A number of resilience, correctness, and performance improvements
- The optional listener name can be set via the PulsarClientBuilder
- *Experimental*: Added an extension method for IConsumer that will 'Process' and auto-acknowledge messages while creating an Activity (useful for doing tracing)
-- Schemas with support for the following types
+- Schema support for the following types
- Boolean
- Bytes (using byte[] and ReadOnlySequence\<byte\>)
- String (UTF-8, UTF-16, and US-ASCII)
diff --git a/README.md b/README.md
index fe7afdc..a4a92ec 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
![CI - Unit](https://github.com/apache/pulsar-dotpulsar/workflows/CI%20-%20Unit/badge.svg)
-The official .NET/C# client library for [Apache Pulsar](https://pulsar.apache.org/).
+The official .NET client library for [Apache Pulsar](https://pulsar.apache.org/).
DotPulsar is written entirely in C# and implements Apache Pulsar's [binary protocol](https://pulsar.apache.org/docs/en/develop-binary-protocol/).
@@ -65,6 +65,14 @@
- [X] [ZLIB message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression)
- [X] [ZSTD message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression)
- [X] [SNAPPY message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression)
+- [X] Schemas
+ - Boolean
+ - Bytes (using byte[] and ReadOnlySequence\<byte\>)
+ - String (UTF-8, UTF-16, and US-ASCII)
+ - INT8, INT16, INT32, and INT64
+ - Float and Double
+ - Time (using TimeSpan)
+ - Timestamp and Date (using DateTime)
## Roadmap
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 3f0521a..88b5273 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
- <Version>0.11.0</Version>
+ <Version>1.0.0</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
@@ -13,7 +13,7 @@
<PackageIcon>PackageIcon.png</PackageIcon>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<PackageReleaseNotes>Please refer to CHANGELOG.md for details</PackageReleaseNotes>
- <Description>The official .NET/C# client library for Apache Pulsar</Description>
+ <Description>The official .NET client library for Apache Pulsar</Description>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<IncludeSymbols>true</IncludeSymbols>
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index 13f2121..f9a78a0 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -40,7 +40,7 @@
=> await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
/// <summary>
- /// Process and auto-acknowledge a message.
+ /// Process and auto-acknowledge a message. This is experimental.
/// </summary>
public static async ValueTask Process<TMessage>(
this IConsumer<TMessage> consumer,
diff --git a/src/DotPulsar/Internal/Abstractions/IRequest.cs b/src/DotPulsar/Internal/Abstractions/IRequest.cs
index 8ea7913..0841e26 100644
--- a/src/DotPulsar/Internal/Abstractions/IRequest.cs
+++ b/src/DotPulsar/Internal/Abstractions/IRequest.cs
@@ -14,11 +14,13 @@
namespace DotPulsar.Internal.Abstractions
{
+ using DotPulsar.Internal.PulsarApi;
using System;
public interface IRequest : IEquatable<IRequest>
{
bool SenderIsProducer(ulong producerId);
bool SenderIsConsumer(ulong consumerId);
+ bool IsCommandType(BaseCommand.Type commandType);
}
}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 5cbd632..9680d4c 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -108,7 +108,7 @@
public Task<BaseCommand> Outgoing(CommandSeek command)
{
command.RequestId = _requestId.FetchNext();
- return _requests.CreateTask(StandardRequest.WithConsumerId(command.RequestId, command.ConsumerId));
+ return _requests.CreateTask(StandardRequest.WithConsumerId(command.RequestId, command.ConsumerId, BaseCommand.Type.Seek));
}
public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
@@ -131,7 +131,12 @@
foreach (var request in requests)
{
if (request.SenderIsConsumer(command.ConsumerId))
- _requests.Cancel(request);
+ {
+ if (request.IsCommandType(BaseCommand.Type.Seek))
+ _requests.SetResult(request, new BaseCommand { CommandType = BaseCommand.Type.Success });
+ else
+ _requests.Cancel(request);
+ }
}
}
diff --git a/src/DotPulsar/Internal/Requests/ConnectRequest.cs b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
index 5ab0e20..18f977c 100644
--- a/src/DotPulsar/Internal/Requests/ConnectRequest.cs
+++ b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal.Requests
{
using DotPulsar.Internal.Abstractions;
+ using DotPulsar.Internal.PulsarApi;
using System.Diagnostics.CodeAnalysis;
public struct ConnectRequest : IRequest
@@ -25,6 +26,9 @@
public bool SenderIsProducer(ulong producerId)
=> false;
+ public bool IsCommandType(BaseCommand.Type commandType)
+ => commandType == BaseCommand.Type.Connect;
+
#if NETSTANDARD2_0
public bool Equals(IRequest other)
#else
diff --git a/src/DotPulsar/Internal/Requests/SendRequest.cs b/src/DotPulsar/Internal/Requests/SendRequest.cs
index 36498b6..6023a88 100644
--- a/src/DotPulsar/Internal/Requests/SendRequest.cs
+++ b/src/DotPulsar/Internal/Requests/SendRequest.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal.Requests
{
using DotPulsar.Internal.Abstractions;
+ using DotPulsar.Internal.PulsarApi;
using System;
using System.Diagnostics.CodeAnalysis;
@@ -35,6 +36,9 @@
public bool SenderIsProducer(ulong producerId)
=> _producerId == producerId;
+ public bool IsCommandType(BaseCommand.Type commandType)
+ => commandType == BaseCommand.Type.Send;
+
#if NETSTANDARD2_0
public bool Equals(IRequest other)
#else
diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs b/src/DotPulsar/Internal/Requests/StandardRequest.cs
index 666b227..69191a2 100644
--- a/src/DotPulsar/Internal/Requests/StandardRequest.cs
+++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal.Requests
{
using DotPulsar.Internal.Abstractions;
+ using DotPulsar.Internal.PulsarApi;
using System;
using System.Diagnostics.CodeAnalysis;
@@ -23,22 +24,24 @@
private readonly ulong _requestId;
private readonly ulong? _consumerId;
private readonly ulong? _producerId;
+ private readonly BaseCommand.Type? _commandType;
- private StandardRequest(ulong requestId, ulong? consumerId, ulong? producerId)
+ private StandardRequest(ulong requestId, ulong? consumerId, ulong? producerId, BaseCommand.Type? commandType)
{
_requestId = requestId;
_consumerId = consumerId;
_producerId = producerId;
+ _commandType = commandType;
}
public static StandardRequest WithRequestId(ulong requestId)
- => new(requestId, null, null);
+ => new(requestId, null, null, null);
- public static StandardRequest WithConsumerId(ulong requestId, ulong consumerId)
- => new(requestId, consumerId, null);
+ public static StandardRequest WithConsumerId(ulong requestId, ulong consumerId, BaseCommand.Type? commandType = null)
+ => new(requestId, consumerId, null, commandType);
- public static StandardRequest WithProducerId(ulong requestId, ulong producerId)
- => new (requestId, null, producerId);
+ public static StandardRequest WithProducerId(ulong requestId, ulong producerId, BaseCommand.Type? commandType = null)
+ => new (requestId, null, producerId, commandType);
public bool SenderIsConsumer(ulong consumerId)
=> _consumerId.HasValue && _consumerId.Value == consumerId;
@@ -46,6 +49,9 @@
public bool SenderIsProducer(ulong producerId)
=> _producerId.HasValue && _producerId.Value == producerId;
+ public bool IsCommandType(BaseCommand.Type commandType)
+ => _commandType.HasValue && _commandType.Value == commandType;
+
#if NETSTANDARD2_0
public bool Equals(IRequest other)
#else