Make ready for 0.11.0
diff --git a/CHANGELOG.md b/CHANGELOG.md
index aaf2c1e..0081ed9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,50 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [0.11.0] - 2021-02-21
+
+### Added
+
+- The Consumer and Reader now share the IReceive interface for receiving a single message: ValueTask\<Message\> Receive(CancellationToken cancellationToken)
+- The Consumer and Reader now share the ISeek interface for seeking on message-id and publish time
+- The Consumer and Reader now share the IGetLastMessageId interface for getting the last message-id on a topic
+- The Consumer, Reader, and Producer now share the IState interface adding 'OnStateChangeFrom' and 'OnStateChangeTo'
+- The PulsarClient, Consumer, Reader, and Producer now have the read-only property 'ServiceUrl'
+- The Consumer now have the read-only property 'SubscriptionName'
+- All message compression types are now supported (listed below). Please read this [compression how-to](https://github.com/apache/pulsar-dotpulsar/wiki/Compression)
+ - LZ4
+ - SNAPPY
+ - ZLIB
+ - ZSTD
+
+### Changed
+
+- MessageId.ToString() now returns a string matching that of other clients: "{LedgerId}:{EntryId}:{Partition}:{BatchIndex}"
+- A lot of methods were made into extension methods and now require a using statement for 'DotPulsar.Extensions'
+ - Producer.StateChangedTo(ProducerState state, CancellationToken cancellationToken)
+ - Producer.StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
+ - Producer.Send(byte[] data, CancellationToken cancellationToken)
+ - Producer.Send(ReadOnlyMemory\<byte\> data, CancellationToken cancellationToken)
+ - Producer.Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken)
+ - Producer.Send(MessageMetadata metadata, ReadOnlyMemory\<byte\> data, CancellationToken cancellationToken)
+ - Consumer.Acknowledge(Message message, CancellationToken cancellationToken)
+ - Consumer.AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
+ - Consumer.StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
+ - Consumer.StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
+ - Consumer.Messages(CancellationToken cancellationToken)
+ - Consumer.Seek(DateTime publishTime, CancellationToken cancellationToken)
+ - Consumer.Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
+ - Reader.StateChangedTo(ReaderState state, CancellationToken cancellationToken)
+ - Reader.StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
+ - Reader.Messages(CancellationToken cancellationToken)
+ - Reader.Seek(DateTime publishTime, CancellationToken cancellationToken)
+ - Reader.Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
+
+### Fixed
+
+- Before the Consumer and Reader would throw an ArgumentOutOfRangeException if they encountered a compressed message. Now they will throw a CompressionException if the compression type is not supported
+- DotPulsarEventSource (performance counters) was only enabled for .NET Standard 2.1. Now it's enabled for all target frameworks except .NET Standard 2.0
+
## [0.10.1] - 2020-12-23
### Added
@@ -12,7 +56,7 @@
### Fixed
-- Do not throw exceptions when disposing consumers, readers or producers
+- Do not throw exceptions when disposing consumers, readers, or producers
## [0.10.0] - 2020-12-16
@@ -20,7 +64,7 @@
- Added performance improvements when receiving data
- Added the IHandleStateChanged\<TStateChanged\> interface for easier state monitoring
-- Added StateChangedHandler methods on ConsumerBuilder, ReaderBuilder and ProducerBuilder for easier state monitoring
+- Added StateChangedHandler methods on ConsumerBuilder, ReaderBuilder, and ProducerBuilder for easier state monitoring
- Added StateChangedHandler property on ConsumerOptions, ReaderOptions, and ProducerOptions for easier state monitoring
- Added four new DotPulsarExceptions: InvalidTransactionStatusException, NotAllowedException, TransactionConflictException and TransactionCoordinatorNotFoundException
- Added properties on Message to read EventTime and PublishTime as DateTime
diff --git a/README.md b/README.md
index c42e740..fe7afdc 100644
--- a/README.md
+++ b/README.md
@@ -2,11 +2,9 @@
![CI - Unit](https://github.com/apache/pulsar-dotpulsar/workflows/CI%20-%20Unit/badge.svg)
-.NET/C# client library for [Apache Pulsar](https://pulsar.apache.org/).
+The official .NET/C# client library for [Apache Pulsar](https://pulsar.apache.org/).
-DotPulsar is written entirely in C# and implements Apache Pulsar's [binary protocol](https://pulsar.apache.org/docs/en/develop-binary-protocol/). Other options was using the [C++ client library](https://pulsar.apache.org/docs/en/client-libraries-cpp/) (which is what the [Python client](https://pulsar.apache.org/docs/en/client-libraries-python/) and [Go client](https://pulsar.apache.org/docs/en/client-libraries-go/) do) or build on top of the [WebSocket API](https://pulsar.apache.org/docs/en/client-libraries-websocket/). We decided to implement the binary protocol to gain full control and maximize portability and performance.
-
-DotPulsar's API is strongly inspired by Apache Pulsar's official [Java client](https://pulsar.apache.org/docs/en/client-libraries-java/), but a 100% match is not a goal.
+DotPulsar is written entirely in C# and implements Apache Pulsar's [binary protocol](https://pulsar.apache.org/docs/en/develop-binary-protocol/).
## What's new?
@@ -52,48 +50,26 @@
- [X] TLS Authentication
- [X] JSON Web Token Authentication
- [X] Producer send with custom metadata
-- [X] Producer send with event time, sequence id and delayed message delivery
+- [X] Producer send with event time, sequence id, and delayed message delivery
- [X] Producer send with key and ordering key
- [X] Consumer subscription with initial position and priority level
-- [X] Consumer subscription types exclusive, shared, failover and key shared
+- [X] Consumer subscription types exclusive, shared, failover, and key shared
- [X] Consumer receive and single + cumulative acknowledge
-- [X] Consumer and Reader seek on message id and publish time
+- [X] Consumer and Reader seek on message-id and publish time
- [X] Consumer unsubscribe
- [X] Consume compacted topics
- [X] Reader API
- [X] Read/Consume/Acknowledge batched messages
- [X] Pulsar Proxy
+- [X] [LZ4 message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression)
+- [X] [ZLIB message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression)
+- [X] [ZSTD message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression)
+- [X] [SNAPPY message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression)
## Roadmap
Help prioritizing the roadmap is most welcome, so please reach out and tell us what you want and need.
-### 1.0.0
-
-We are feature complete for this release. We just need testing.
-
-- [X] Use IAsyncDisposable
-- [X] Use IAsyncEnumerable
-- [X] Use ValueTask instead of Task
-- [X] Make solution nullable
-- [X] Support .NET Standard 2.0 and 2.1
-
-### If requested by the community
-
-Let us know which features you need by creating an issue or by giving existing issues a "Thumbs up".
-
-* [Message encryption](https://github.com/apache/pulsar-dotpulsar/issues/8)
-* [Batching when producing](https://github.com/apache/pulsar-dotpulsar/issues/7)
-* [Schema](https://github.com/apache/pulsar-dotpulsar/issues/6)
-* [Partitioned topics](https://github.com/apache/pulsar-dotpulsar/issues/4)
-* [Multi-topic subscriptions](https://github.com/apache/pulsar-dotpulsar/issues/5)
-* Athenz Authentication
-* Kerberos Authentication
-* LZ4 message compression
-* ZLIB message compression
-* ZSTD message compression
-* SNAPPY message compression
-
## Join Our Community
Apache Pulsar has a [Slack instance](https://pulsar.apache.org/contact/) and there you'll find us in the #dev-dotpulsar channel. Just waiting for you to pop by :-)
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index b22d835..f89de85 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
- <Version>0.10.1</Version>
+ <Version>0.11.0</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>DanskeCommodities;dblank</Authors>
@@ -12,7 +12,7 @@
<PackageTags>Apache;Pulsar</PackageTags>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<PackageReleaseNotes>Please refer to CHANGELOG.md for details</PackageReleaseNotes>
- <Description>.NET/C# client library for Apache Pulsar</Description>
+ <Description>The official .NET/C# client library for Apache Pulsar</Description>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<IncludeSymbols>true</IncludeSymbols>
@@ -30,15 +30,6 @@
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
<PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
- <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
- </ItemGroup>
-
- <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
- <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
- </ItemGroup>
-
- <ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
- <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
</ItemGroup>
</Project>
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index 63b8d49..b61971c 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -15,10 +15,6 @@
namespace DotPulsar.Extensions
{
using DotPulsar.Abstractions;
- using DotPulsar.Internal;
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -40,76 +36,6 @@
=> await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
/// <summary>
- /// Process and auto-acknowledge a message.
- /// </summary>
- public static async ValueTask Process(this IConsumer consumer, Func<Message, CancellationToken, ValueTask> processor, CancellationToken cancellationToken = default) // TODO Allow user to set number of workers
- {
- const string operation = "process";
- var operationName = $"{consumer.Topic} {operation}";
-
- var tags = new List<KeyValuePair<string, object?>>
- {
- new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
- new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
- new KeyValuePair<string, object?>("messaging.operation", operation),
- new KeyValuePair<string, object?>("messaging.system", "pulsar"),
- new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
- new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName) // TODO Ask Pulsar community to define Pulsar specific tags
- };
-
- while (!cancellationToken.IsCancellationRequested)
- {
- var message = await consumer.Receive(cancellationToken);
-
- var activity = StartActivity(message, operationName, tags);
-
- if (activity is not null && activity.IsAllDataRequested)
- {
- activity.SetTag("messaging.message_id", message.MessageId.ToString());
- activity.SetTag("messaging.message_payload_size_bytes", message.Data.Length);
- }
-
- try
- {
- await processor(message, cancellationToken);
- }
- catch
- {
- // Ignore
- }
-
- activity?.Dispose();
-
- await consumer.Acknowledge(message.MessageId, cancellationToken);
- }
- }
-
- private static Activity? StartActivity(Message message, string operationName, IEnumerable<KeyValuePair<string, object?>> tags)
- {
- if (!DotPulsarActivitySource.ActivitySource.HasListeners())
- return null;
-
- var properties = message.Properties;
-
- if (properties.TryGetValue("traceparent", out var traceparent)) // TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
- {
- var tracestate = properties.ContainsKey("tracestate") ? properties["tracestrate"] : null;
- if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
- return DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
- }
-
- var activity = DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
-
- if (activity is not null && activity.IsAllDataRequested)
- {
- foreach (var tag in tags)
- activity.SetTag(tag.Key, tag.Value);
- }
-
- return activity;
- }
-
- /// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
/// <returns>
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
deleted file mode 100644
index 09c8464..0000000
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
-{
- using System.Diagnostics;
-
- public static class DotPulsarActivitySource
- {
- static DotPulsarActivitySource()
- {
- ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
- }
-
- public static ActivitySource ActivitySource { get; }
- }
-}