Updating the changelog
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0081ed9..6efd1ae 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,40 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+
+### Added
+
+- A number of resilience, correctness, and performance improvements
+- Experimental: Added an extension method for IConsumer that will 'Process' and auto-acknowledge messages while creating an Activity (useful for doing tracing)
+- Schemas with support for the following types
+ - Boolean
+ - Bytes (using byte[] and ReadOnlySequence\<byte\>)
+ - String (UTF-8, UTF-16, and US-ASCII)
+ - INT8, INT16, INT32, and INT64
+ - Float and Double
+ - Time (using TimeSpan)
+ - Timestamp and Date (using DateTime)
+
+### Changed
+
+- Breaking: Building a producer will now create an IProducer\<T\>
+ The non-generic IProducer interface is still there, but messages can only be sent (ISend) with IProducer\<T\>
+- Breaking: Building a reader or consumer will now create an IConsumer\<T\> or IReader\<T\>
+ The non-genric IReader and IConsumer are still there, but messages can only be consumed/read (IReceive) with IConsumer\<T\> and IReader\<T\>
+- Breaking: Receiving a message with now return an IMessage\<T\> instead of the Message class (which is now internal)
+ The non-generic IMessage can be used if 'Value()' (decoding the 'Data' bytes) is not used (when just handling raw messages)
+- Breaking: The message builder is now generic
+- Setting an Action and Func StateChangedHandler on the ConsumerBuilder, ReaderBuilder, and ProducerBuilder are now extension methods
+- Setting an Action and Func ExceptionHandler on the PulsarClientBuilder are now extension methods
+
+### Fixed
+
+- When the broker sends a CommandClose[Producer/Consumer] all in-flight (and following) requests to the broker are ignored.
+ Even though we reconnect the consumer, reader, or producer the tasks for the in-flight requests will hang as long as the connection is kept alive by other producers/consumers/readers.
+ This situation is now handled and the requests will be sent again on the new connection.
+ As a client library, we can only handle this (unexpected) behavior for requests that get a response. For "fire and forget" requests, like "Flow", this remains a problem and should be solved broker/server-side by sending a response for all requests in all situations.
+
## [0.11.0] - 2021-02-21
### Added
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
index 2cb3097..c736ffa 100644
--- a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -28,19 +28,19 @@
/// Get a builder that can be used to configure and build a Producer instance.
/// </summary>
public static IProducerBuilder<ReadOnlySequence<byte>> NewProducer(this IPulsarClient pulsarClient)
- => new ProducerBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
+ => new ProducerBuilder<ReadOnlySequence<byte>>(pulsarClient, Schema.ByteSequence);
/// <summary>
/// Get a builder that can be used to configure and build a Consumer instance.
/// </summary>
public static IConsumerBuilder<ReadOnlySequence<byte>> NewConsumer(this IPulsarClient pulsarClient)
- => new ConsumerBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
+ => new ConsumerBuilder<ReadOnlySequence<byte>>(pulsarClient, Schema.ByteSequence);
/// <summary>
/// Get a builder that can be used to configure and build a Reader instance.
/// </summary>
public static IReaderBuilder<ReadOnlySequence<byte>> NewReader(this IPulsarClient pulsarClient)
- => new ReaderBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
+ => new ReaderBuilder<ReadOnlySequence<byte>>(pulsarClient, Schema.ByteSequence);
/// <summary>
/// Get a builder that can be used to configure and build a Producer instance.
diff --git a/src/DotPulsar/Schema.cs b/src/DotPulsar/Schema.cs
index 6aff024..6116631 100644
--- a/src/DotPulsar/Schema.cs
+++ b/src/DotPulsar/Schema.cs
@@ -23,7 +23,8 @@
{
static Schema()
{
- Bytes = new ByteArraySchema();
+ ByteSequence = new ByteSequenceSchema();
+ ByteArray = new ByteArraySchema();
String = StringSchema.UTF8;
Boolean = new BooleanSchema();
Int8 = new ByteSchema();
@@ -37,9 +38,14 @@
}
/// <summary>
+ /// Raw bytes schema using ReadOnlySequence of bytes.
+ /// </summary>
+ public static ByteSequenceSchema ByteSequence { get; }
+
+ /// <summary>
/// Raw bytes schema using byte[].
/// </summary>
- public static ByteArraySchema Bytes { get; }
+ public static ByteArraySchema ByteArray { get; }
/// <summary>
/// UTF-8 schema.
diff --git a/tests/DotPulsar.StressTests/ConnectionTests.cs b/tests/DotPulsar.StressTests/ConnectionTests.cs
index da6e663..19bdbb3 100644
--- a/tests/DotPulsar.StressTests/ConnectionTests.cs
+++ b/tests/DotPulsar.StressTests/ConnectionTests.cs
@@ -48,7 +48,7 @@
await using var client = builder.Build();
- await using var producer = client.NewProducer(Schema.Bytes)
+ await using var producer = client.NewProducer(Schema.ByteArray)
.ProducerName($"producer-{testRunId}")
.Topic(topic)
.Create();
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs b/tests/DotPulsar.StressTests/ConsumerTests.cs
index df51edb..684205a 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -48,14 +48,14 @@
.ServiceUrl(new Uri("pulsar://localhost:54545"))
.Build();
- await using var consumer = client.NewConsumer(Schema.Bytes)
+ await using var consumer = client.NewConsumer(Schema.ByteArray)
.ConsumerName($"consumer-{testRunId}")
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName($"subscription-{testRunId}")
.Topic(topic)
.Create();
- await using var producer = client.NewProducer(Schema.Bytes)
+ await using var producer = client.NewProducer(Schema.ByteArray)
.ProducerName($"producer-{testRunId}")
.Topic(topic)
.Create();