Updating NuGet packages. (Re)Adding tracing. Major refactorings to support schemas.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 1570efe..60d3eb1 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -18,8 +18,6 @@
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
using System;
- using System.Buffers;
- using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -39,7 +37,7 @@
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
- await using var consumer = client.NewConsumer()
+ await using var consumer = client.NewConsumer(Schema.String)
.StateChangedHandler(Monitor)
.SubscriptionName("MySubscription")
.Topic(myTopic)
@@ -50,14 +48,13 @@
await ConsumeMessages(consumer, cts.Token);
}
- private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
+ private static async Task ConsumeMessages(IConsumer<string> consumer, CancellationToken cancellationToken)
{
try
{
await foreach (var message in consumer.Messages(cancellationToken))
{
- var data = Encoding.UTF8.GetString(message.Data.ToArray());
- Console.WriteLine("Received: " + data);
+ Console.WriteLine("Received: " + message.Value);
await consumer.Acknowledge(message, cancellationToken);
}
}
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index a69bc38..5ea9a51 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -18,7 +18,6 @@
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
using System;
- using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -38,7 +37,7 @@
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
- await using var producer = client.NewProducer()
+ await using var producer = client.NewProducer(Schema.String)
.StateChangedHandler(Monitor)
.Topic(myTopic)
.Create();
@@ -48,7 +47,7 @@
await ProduceMessages(producer, cts.Token);
}
- private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
+ private static async Task ProduceMessages(IProducer<string> producer, CancellationToken cancellationToken)
{
var delay = TimeSpan.FromSeconds(5);
@@ -57,8 +56,7 @@
while (!cancellationToken.IsCancellationRequested)
{
var data = DateTime.UtcNow.ToLongTimeString();
- var bytes = Encoding.UTF8.GetBytes(data);
- _ = await producer.Send(bytes, cancellationToken);
+ _ = await producer.Send(data, cancellationToken);
Console.WriteLine("Sent: " + data);
await Task.Delay(delay, cancellationToken);
}
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 8a2f0fe..475fccd 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -18,8 +18,6 @@
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
using System;
- using System.Buffers;
- using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -39,7 +37,7 @@
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
- await using var reader = client.NewReader()
+ await using var reader = client.NewReader(Schema.String)
.StartMessageId(MessageId.Earliest)
.StateChangedHandler(Monitor)
.Topic(myTopic)
@@ -50,14 +48,13 @@
await ReadMessages(reader, cts.Token);
}
- private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
+ private static async Task ReadMessages(IReader<string> reader, CancellationToken cancellationToken)
{
try
{
await foreach (var message in reader.Messages(cancellationToken))
{
- var data = Encoding.UTF8.GetString(message.Data.ToArray());
- Console.WriteLine("Received: " + data);
+ Console.WriteLine("Received: " + message.Value);
}
}
catch (OperationCanceledException) { }
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 976f54b..6996825 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,7 +22,7 @@
/// <summary>
/// A consumer abstraction.
/// </summary>
- public interface IConsumer : IGetLastMessageId, IReceive, ISeek, IState<ConsumerState>, IAsyncDisposable
+ public interface IConsumer : IGetLastMessageId, ISeek, IState<ConsumerState>, IAsyncDisposable
{
/// <summary>
/// Acknowledge the consumption of a single message using the MessageId.
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index 4de94f1..8134eca 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -14,73 +14,59 @@
namespace DotPulsar.Abstractions
{
- using System;
- using System.Threading;
- using System.Threading.Tasks;
-
/// <summary>
/// A consumer building abstraction.
/// </summary>
- public interface IConsumerBuilder
+ public interface IConsumerBuilder<TMessage>
{
/// <summary>
/// Set the consumer name. This is optional.
/// </summary>
- IConsumerBuilder ConsumerName(string name);
+ IConsumerBuilder<TMessage> ConsumerName(string name);
/// <summary>
/// Set initial position for the subscription. The default is 'Latest'.
/// </summary>
- IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition);
+ IConsumerBuilder<TMessage> InitialPosition(SubscriptionInitialPosition initialPosition);
/// <summary>
/// Number of messages that will be prefetched. The default is 1000.
/// </summary>
- IConsumerBuilder MessagePrefetchCount(uint count);
+ IConsumerBuilder<TMessage> MessagePrefetchCount(uint count);
/// <summary>
/// Set the priority level for the shared subscription consumer. The default is 0.
/// </summary>
- IConsumerBuilder PriorityLevel(int priorityLevel);
+ IConsumerBuilder<TMessage> PriorityLevel(int priorityLevel);
/// <summary>
/// Whether to read from the compacted topic. The default is 'false'.
/// </summary>
- IConsumerBuilder ReadCompacted(bool readCompacted);
+ IConsumerBuilder<TMessage> ReadCompacted(bool readCompacted);
/// <summary>
/// Register a state changed handler.
/// </summary>
- IConsumerBuilder StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler);
-
- /// <summary>
- /// Register a state changed handler.
- /// </summary>
- IConsumerBuilder StateChangedHandler(Action<ConsumerStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Register a state changed handler.
- /// </summary>
- IConsumerBuilder StateChangedHandler(Func<ConsumerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+ IConsumerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler);
/// <summary>
/// Set the subscription name for this consumer. This is required.
/// </summary>
- IConsumerBuilder SubscriptionName(string name);
+ IConsumerBuilder<TMessage> SubscriptionName(string name);
/// <summary>
/// Set the subscription type for this consumer. The default is 'Exclusive'.
/// </summary>
- IConsumerBuilder SubscriptionType(SubscriptionType type);
+ IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type);
/// <summary>
/// Set the topic for this consumer. This is required.
/// </summary>
- IConsumerBuilder Topic(string topic);
+ IConsumerBuilder<TMessage> Topic(string topic);
/// <summary>
/// Create the consumer.
/// </summary>
- IConsumer Create();
+ IConsumer<TMessage> Create();
}
}
diff --git a/src/DotPulsar/Abstractions/IConsumerOfT.cs b/src/DotPulsar/Abstractions/IConsumerOfT.cs
new file mode 100644
index 0000000..0affa98
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IConsumerOfT.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A generic consumer abstraction.
+ /// </summary>
+ public interface IConsumer<TMessage> : IConsumer, IReceive<IMessage<TMessage>> { }
+}
diff --git a/src/DotPulsar/Abstractions/IMessage.cs b/src/DotPulsar/Abstractions/IMessage.cs
new file mode 100644
index 0000000..c10204c
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IMessage.cs
@@ -0,0 +1,126 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ using System;
+ using System.Buffers;
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// A message abstraction.
+ /// </summary>
+ public interface IMessage
+ {
+ /// <summary>
+ /// The id of the message.
+ /// </summary>
+ MessageId MessageId { get; }
+
+ /// <summary>
+ /// The raw payload of the message.
+ /// </summary>
+ ReadOnlySequence<byte> Data { get; }
+
+ /// <summary>
+ /// The name of the producer who produced the message.
+ /// </summary>
+ string ProducerName { get; }
+
+ /// <summary>
+ /// The schema version of the message.
+ /// </summary>
+ byte[]? SchemaVersion { get; }
+
+ /// <summary>
+ /// The sequence id of the message.
+ /// </summary>
+ ulong SequenceId { get; }
+
+ /// <summary>
+ /// The redelivery count (maintained by the broker) of the message.
+ /// </summary>
+ uint RedeliveryCount { get; }
+
+ /// <summary>
+ /// Check whether the message has an event time.
+ /// </summary>
+ bool HasEventTime { get; }
+
+ /// <summary>
+ /// The event time of the message as unix time in milliseconds.
+ /// </summary>
+ ulong EventTime { get; }
+
+ /// <summary>
+ /// The event time of the message as an UTC DateTime.
+ /// </summary>
+ public DateTime EventTimeAsDateTime { get; }
+
+ /// <summary>
+ /// The event time of the message as a DateTimeOffset with an offset of 0.
+ /// </summary>
+ public DateTimeOffset EventTimeAsDateTimeOffset { get; }
+
+ /// <summary>
+ /// Check whether the key been base64 encoded.
+ /// </summary>
+ bool HasBase64EncodedKey { get; }
+
+ /// <summary>
+ /// Check whether the message has a key.
+ /// </summary>
+ bool HasKey { get; }
+
+ /// <summary>
+ /// The key as a string.
+ /// </summary>
+ string? Key { get; }
+
+ /// <summary>
+ /// The key as bytes.
+ /// </summary>
+ byte[]? KeyBytes { get; }
+
+ /// <summary>
+ /// Check whether the message has an ordering key.
+ /// </summary>
+ bool HasOrderingKey { get; }
+
+ /// <summary>
+ /// The ordering key of the message.
+ /// </summary>
+ byte[]? OrderingKey { get; }
+
+ /// <summary>
+ /// The publish time of the message as unix time in milliseconds.
+ /// </summary>
+ ulong PublishTime { get; }
+
+ /// <summary>
+ /// The publish time of the message as an UTC DateTime.
+ /// </summary>
+ public DateTime PublishTimeAsDateTime { get; }
+
+ /// <summary>
+ /// The publish time of the message as a DateTimeOffset with an offset of 0.
+ /// </summary>
+ public DateTimeOffset PublishTimeAsDateTimeOffset { get; }
+
+ /// <summary>
+ /// The properties of the message.
+ /// </summary>
+ public IReadOnlyDictionary<string, string> Properties { get; }
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IMessageBuilder.cs b/src/DotPulsar/Abstractions/IMessageBuilder.cs
index 1720a4e..32c9d94 100644
--- a/src/DotPulsar/Abstractions/IMessageBuilder.cs
+++ b/src/DotPulsar/Abstractions/IMessageBuilder.cs
@@ -21,67 +21,72 @@
/// <summary>
/// A message building abstraction.
/// </summary>
- public interface IMessageBuilder
+ public interface IMessageBuilder<TMessage>
{
/// <summary>
/// Timestamp as unix time in milliseconds indicating when the message should be delivered to consumers.
/// </summary>
- IMessageBuilder DeliverAt(long timestamp);
+ IMessageBuilder<TMessage> DeliverAt(long timestamp);
/// <summary>
/// Timestamp as UTC DateTime indicating when the message should be delivered to consumers.
/// </summary>
- IMessageBuilder DeliverAt(DateTime timestamp);
+ IMessageBuilder<TMessage> DeliverAt(DateTime timestamp);
/// <summary>
/// Timestamp as DateTimeOffset indicating when the message should be delivered to consumers.
/// </summary>
- IMessageBuilder DeliverAt(DateTimeOffset timestamp);
+ IMessageBuilder<TMessage> DeliverAt(DateTimeOffset timestamp);
/// <summary>
/// The event time of the message as unix time in milliseconds.
/// </summary>
- IMessageBuilder EventTime(ulong eventTime);
+ IMessageBuilder<TMessage> EventTime(ulong eventTime);
/// <summary>
/// The event time of the message as an UTC DateTime.
/// </summary>
- IMessageBuilder EventTime(DateTime eventTime);
+ IMessageBuilder<TMessage> EventTime(DateTime eventTime);
/// <summary>
/// The event time of the message as a DateTimeOffset.
/// </summary>
- IMessageBuilder EventTime(DateTimeOffset eventTime);
+ IMessageBuilder<TMessage> EventTime(DateTimeOffset eventTime);
/// <summary>
/// Set the key of the message for routing policy.
/// </summary>
- IMessageBuilder Key(string key);
+ IMessageBuilder<TMessage> Key(string key);
/// <summary>
/// Set the key of the message for routing policy.
/// </summary>
- IMessageBuilder KeyBytes(byte[] key);
+ IMessageBuilder<TMessage> KeyBytes(byte[] key);
/// <summary>
/// Set the ordering key of the message for message dispatch in SubscriptionType.KeyShared mode.
/// The partition key will be used if the ordering key is not specified.
/// </summary>
- IMessageBuilder OrderingKey(byte[] key);
+ IMessageBuilder<TMessage> OrderingKey(byte[] key);
/// <summary>
/// Add/Set a property key/value on the message.
/// </summary>
- IMessageBuilder Property(string key, string value);
+ IMessageBuilder<TMessage> Property(string key, string value);
+
+ /// <summary>
+ /// Set the schema version of the message.
+ /// </summary>
+ IMessageBuilder<TMessage> SchemaVersion(byte[] schemaVersion);
/// <summary>
/// Set the sequence id of the message.
/// </summary>
- IMessageBuilder SequenceId(ulong sequenceId);
+ IMessageBuilder<TMessage> SequenceId(ulong sequenceId);
/// <summary>
- /// Set the consumer name.
+ /// Sends a message.
/// </summary>
- ValueTask<MessageId> Send(ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default);
+ ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/IMessageOfT.cs b/src/DotPulsar/Abstractions/IMessageOfT.cs
new file mode 100644
index 0000000..6b90ae5
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IMessageOfT.cs
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A generic message abstraction.
+ /// </summary>
+ public interface IMessage<TValue> : IMessage
+ {
+ /// <summary>
+ /// The value of the message.
+ /// </summary>
+ public TValue Value { get; }
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index baf3ca4..ac13630 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -19,7 +19,7 @@
/// <summary>
/// A producer abstraction.
/// </summary>
- public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
+ public interface IProducer : IState<ProducerState>, IAsyncDisposable
{
/// <summary>
/// The producer's service url.
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 930d2ca..17788a5 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -14,53 +14,39 @@
namespace DotPulsar.Abstractions
{
- using System;
- using System.Threading;
- using System.Threading.Tasks;
-
/// <summary>
/// A producer building abstraction.
/// </summary>
- public interface IProducerBuilder
+ public interface IProducerBuilder<TMessage>
{
/// <summary>
/// Set the compression type. The default is 'None'.
/// </summary>
- IProducerBuilder CompressionType(CompressionType compressionType);
+ IProducerBuilder<TMessage> CompressionType(CompressionType compressionType);
/// <summary>
/// Set the initial sequence id. The default is 0.
/// </summary>
- IProducerBuilder InitialSequenceId(ulong initialSequenceId);
+ IProducerBuilder<TMessage> InitialSequenceId(ulong initialSequenceId);
/// <summary>
/// Set the producer name. This is optional.
/// </summary>
- IProducerBuilder ProducerName(string name);
+ IProducerBuilder<TMessage> ProducerName(string name);
/// <summary>
/// Register a state changed handler.
/// </summary>
- IProducerBuilder StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler);
-
- /// <summary>
- /// Register a state changed handler.
- /// </summary>
- IProducerBuilder StateChangedHandler(Action<ProducerStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Register a state changed handler.
- /// </summary>
- IProducerBuilder StateChangedHandler(Func<ProducerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+ IProducerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler);
/// <summary>
/// Set the topic for this producer. This is required.
/// </summary>
- IProducerBuilder Topic(string topic);
+ IProducerBuilder<TMessage> Topic(string topic);
/// <summary>
/// Create the producer.
/// </summary>
- IProducer Create();
+ IProducer<TMessage> Create();
}
}
diff --git a/src/DotPulsar/Abstractions/IProducerOfT.cs b/src/DotPulsar/Abstractions/IProducerOfT.cs
new file mode 100644
index 0000000..da42458
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IProducerOfT.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A generic producer abstraction.
+ /// </summary>
+ public interface IProducer<TMessage> : IProducer, ISend<TMessage> { }
+}
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
index 7189257..64f6add 100644
--- a/src/DotPulsar/Abstractions/IPulsarClient.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -24,17 +24,17 @@
/// <summary>
/// Create a producer.
/// </summary>
- IProducer CreateProducer(ProducerOptions options);
+ IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options);
/// <summary>
/// Create a consumer.
/// </summary>
- IConsumer CreateConsumer(ConsumerOptions options);
+ IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options);
/// <summary>
/// Create a reader.
/// </summary>
- IReader CreateReader(ReaderOptions options);
+ IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options);
/// <summary>
/// The client's service url.
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 484a196..d632aae 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -16,7 +16,6 @@
{
using System;
using System.Security.Cryptography.X509Certificates;
- using System.Threading.Tasks;
/// <summary>
/// A pulsar client building abstraction.
@@ -44,16 +43,6 @@
IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler);
/// <summary>
- /// Register a custom exception handler that will be invoked before the default exception handler.
- /// </summary>
- IPulsarClientBuilder ExceptionHandler(Action<ExceptionContext> exceptionHandler);
-
- /// <summary>
- /// Register a custom exception handler that will be invoked before the default exception handler.
- /// </summary>
- IPulsarClientBuilder ExceptionHandler(Func<ExceptionContext, ValueTask> exceptionHandler);
-
- /// <summary>
/// The time to wait before retrying an operation or a reconnect. The default is 3 seconds.
/// </summary>
IPulsarClientBuilder RetryInterval(TimeSpan interval);
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 7962d54..3e438ac 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -19,7 +19,7 @@
/// <summary>
/// A reader abstraction.
/// </summary>
- public interface IReader : IGetLastMessageId, IReceive, ISeek, IState<ReaderState>, IAsyncDisposable
+ public interface IReader : IGetLastMessageId, ISeek, IState<ReaderState>, IAsyncDisposable
{
/// <summary>
/// The reader's service url.
diff --git a/src/DotPulsar/Abstractions/IReaderBuilder.cs b/src/DotPulsar/Abstractions/IReaderBuilder.cs
index 3e97705..18235a6 100644
--- a/src/DotPulsar/Abstractions/IReaderBuilder.cs
+++ b/src/DotPulsar/Abstractions/IReaderBuilder.cs
@@ -14,58 +14,44 @@
namespace DotPulsar.Abstractions
{
- using System;
- using System.Threading;
- using System.Threading.Tasks;
-
/// <summary>
/// A reader building abstraction.
/// </summary>
- public interface IReaderBuilder
+ public interface IReaderBuilder<TMessage>
{
/// <summary>
/// Number of messages that will be prefetched. The default is 1000.
/// </summary>
- IReaderBuilder MessagePrefetchCount(uint count);
+ IReaderBuilder<TMessage> MessagePrefetchCount(uint count);
/// <summary>
/// Whether to read from the compacted topic. The default is 'false'.
/// </summary>
- IReaderBuilder ReadCompacted(bool readCompacted);
+ IReaderBuilder<TMessage> ReadCompacted(bool readCompacted);
/// <summary>
/// Set the reader name. This is optional.
/// </summary>
- IReaderBuilder ReaderName(string name);
+ IReaderBuilder<TMessage> ReaderName(string name);
/// <summary>
/// The initial reader position is set to the specified message id. This is required.
/// </summary>
- IReaderBuilder StartMessageId(MessageId messageId);
+ IReaderBuilder<TMessage> StartMessageId(MessageId messageId);
/// <summary>
/// Register a state changed handler.
/// </summary>
- IReaderBuilder StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler);
-
- /// <summary>
- /// Register a state changed handler.
- /// </summary>
- IReaderBuilder StateChangedHandler(Action<ReaderStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Register a state changed handler.
- /// </summary>
- IReaderBuilder StateChangedHandler(Func<ReaderStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+ IReaderBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler);
/// <summary>
/// Set the topic for this reader. This is required.
/// </summary>
- IReaderBuilder Topic(string topic);
+ IReaderBuilder<TMessage> Topic(string topic);
/// <summary>
/// Create the reader.
/// </summary>
- IReader Create();
+ IReader<TMessage> Create();
}
}
diff --git a/src/DotPulsar/Abstractions/IReaderOfT.cs b/src/DotPulsar/Abstractions/IReaderOfT.cs
new file mode 100644
index 0000000..35f26c9
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IReaderOfT.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A generic reader abstraction.
+ /// </summary>
+ public interface IReader<TMessage> : IReader, IReceive<IMessage<TMessage>> { }
+}
diff --git a/src/DotPulsar/Abstractions/IReceive.cs b/src/DotPulsar/Abstractions/IReceive.cs
index 2ff275c..994638a 100644
--- a/src/DotPulsar/Abstractions/IReceive.cs
+++ b/src/DotPulsar/Abstractions/IReceive.cs
@@ -20,11 +20,11 @@
/// <summary>
/// An abstraction for receiving a single message.
/// </summary>
- public interface IReceive
+ public interface IReceive<TMessage>
{
/// <summary>
/// Receive a single message.
/// </summary>
- ValueTask<Message> Receive(CancellationToken cancellationToken = default);
+ ValueTask<TMessage> Receive(CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/ISchema.cs b/src/DotPulsar/Abstractions/ISchema.cs
new file mode 100644
index 0000000..698cad7
--- /dev/null
+++ b/src/DotPulsar/Abstractions/ISchema.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ using System.Buffers;
+
+ /// <summary>
+ /// A schema abstraction.
+ /// </summary>
+ public interface ISchema<T>
+ {
+ /// <summary>
+ /// Decode the raw bytes.
+ /// </summary>
+ public T Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null);
+
+ /// <summary>
+ /// Encode the message.
+ /// </summary>
+ public ReadOnlySequence<byte> Encode(T message);
+
+ /// <summary>
+ /// The schema info.
+ /// </summary>
+ public SchemaInfo SchemaInfo { get; }
+ }
+}
diff --git a/src/DotPulsar/Abstractions/ISend.cs b/src/DotPulsar/Abstractions/ISend.cs
index 2948c34..642f61d 100644
--- a/src/DotPulsar/Abstractions/ISend.cs
+++ b/src/DotPulsar/Abstractions/ISend.cs
@@ -14,23 +14,22 @@
namespace DotPulsar.Abstractions
{
- using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// An abstraction for sending a message.
/// </summary>
- public interface ISend
+ public interface ISend<TMessage>
{
/// <summary>
/// Sends a message.
/// </summary>
- ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+ ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default);
/// <summary>
/// Sends a message with metadata.
/// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+ ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 8496487..754f73a 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -19,7 +19,7 @@
/// <summary>
/// The consumer building options.
/// </summary>
- public sealed class ConsumerOptions
+ public sealed class ConsumerOptions<TMessage>
{
/// <summary>
/// The default initial position.
@@ -49,7 +49,7 @@
/// <summary>
/// Initializes a new instance using the specified subscription name and topic.
/// </summary>
- public ConsumerOptions(string subscriptionName, string topic)
+ public ConsumerOptions(string subscriptionName, string topic, ISchema<TMessage> schema)
{
InitialPosition = DefaultInitialPosition;
PriorityLevel = DefaultPriorityLevel;
@@ -58,6 +58,7 @@
SubscriptionType = DefaultSubscriptionType;
SubscriptionName = subscriptionName;
Topic = topic;
+ Schema = schema;
}
/// <summary>
@@ -86,6 +87,11 @@
public bool ReadCompacted { get; set; }
/// <summary>
+ /// Set the schema. This is required.
+ /// </summary>
+ public ISchema<TMessage> Schema { get; set; }
+
+ /// <summary>
/// Register a state changed handler. This is optional.
/// </summary>
public IHandleStateChanged<ConsumerStateChanged>? StateChangedHandler { get; set; }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 934b5a4..d766c2d 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -31,6 +31,15 @@
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
<PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
+ <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+ </ItemGroup>
+
+ <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
+ <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+ </ItemGroup>
+
+ <ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
+ <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
</ItemGroup>
<ItemGroup>
diff --git a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
index 98ad756..1ebdf11 100644
--- a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
@@ -1,10 +1,23 @@
-namespace DotPulsar.Exceptions
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
{
- using Internal;
using System;
public sealed class ConsumerDisposedException : ObjectDisposedException
{
- public ConsumerDisposedException() : base(typeof(Consumer).FullName) { }
+ public ConsumerDisposedException(string objectName) : base(objectName) { }
}
}
diff --git a/src/DotPulsar/Exceptions/ProducerDisposedException.cs b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
index eabfa73..f07fb1b 100644
--- a/src/DotPulsar/Exceptions/ProducerDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
@@ -1,10 +1,23 @@
-namespace DotPulsar.Exceptions
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
{
- using Internal;
using System;
public sealed class ProducerDisposedException : ObjectDisposedException
{
- public ProducerDisposedException() : base(typeof(Producer).FullName) { }
+ public ProducerDisposedException(string objectName) : base(objectName) { }
}
}
diff --git a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
index 1066730..3b3ceb6 100644
--- a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
+++ b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
@@ -1,4 +1,18 @@
-namespace DotPulsar.Exceptions
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
{
using System;
diff --git a/src/DotPulsar/Exceptions/ReaderDisposedException.cs b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
index df01535..1ab4b80 100644
--- a/src/DotPulsar/Exceptions/ReaderDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
@@ -1,10 +1,23 @@
-namespace DotPulsar.Exceptions
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
{
- using Internal;
using System;
public sealed class ReaderDisposedException : ObjectDisposedException
{
- public ReaderDisposedException() : base(typeof(Reader).FullName) { }
+ public ReaderDisposedException(string objectName) : base(objectName) { }
}
}
diff --git a/src/DotPulsar/Exceptions/SchemaSerializationException.cs b/src/DotPulsar/Exceptions/SchemaSerializationException.cs
new file mode 100644
index 0000000..900f8b1
--- /dev/null
+++ b/src/DotPulsar/Exceptions/SchemaSerializationException.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+ public sealed class SchemaSerializationException : DotPulsarException
+ {
+ public SchemaSerializationException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
new file mode 100644
index 0000000..5aa1e40
--- /dev/null
+++ b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Internal;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ public static class ConsumerBuilderExtensions
+ {
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IConsumerBuilder<TMessage> builder,
+ Action<ConsumerStateChanged, CancellationToken> handler,
+ CancellationToken cancellationToken = default)
+ {
+ builder.StateChangedHandler(new ActionStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IConsumerBuilder<TMessage> builder,
+ Func<ConsumerStateChanged, CancellationToken, ValueTask> handler,
+ CancellationToken cancellationToken = default)
+ {
+ builder.StateChangedHandler(new FuncStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken));
+ return builder;
+ }
+ }
+}
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index b61971c..13f2121 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -15,6 +15,10 @@
namespace DotPulsar.Extensions
{
using DotPulsar.Abstractions;
+ using DotPulsar.Internal;
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -26,16 +30,105 @@
/// <summary>
/// Acknowledge the consumption of a single message.
/// </summary>
- public static async ValueTask Acknowledge(this IConsumer consumer, Message message, CancellationToken cancellationToken = default)
+ public static async ValueTask Acknowledge(this IConsumer consumer, IMessage message, CancellationToken cancellationToken = default)
=> await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
/// <summary>
/// Acknowledge the consumption of all the messages in the topic up to and including the provided message.
/// </summary>
- public static async ValueTask AcknowledgeCumulative(this IConsumer consumer, Message message, CancellationToken cancellationToken = default)
+ public static async ValueTask AcknowledgeCumulative(this IConsumer consumer, IMessage message, CancellationToken cancellationToken = default)
=> await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
/// <summary>
+ /// Process and auto-acknowledge a message.
+ /// </summary>
+ public static async ValueTask Process<TMessage>(
+ this IConsumer<TMessage> consumer,
+ Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
+ CancellationToken cancellationToken = default)
+ {
+ const string operation = "process";
+ var operationName = $"{consumer.Topic} {operation}";
+
+ var tags = new List<KeyValuePair<string, object?>>
+ {
+ new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
+ new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+ new KeyValuePair<string, object?>("messaging.operation", operation),
+ new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+ new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
+ new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
+ };
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var message = await consumer.Receive(cancellationToken).ConfigureAwait(false);
+
+ var activity = StartActivity(message, operationName, tags);
+
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ activity.SetTag("messaging.message_id", message.MessageId.ToString());
+ activity.SetTag("messaging.message_payload_size_bytes", message.Data.Length);
+ activity.SetTag("otel.status_code", "OK");
+ }
+
+ try
+ {
+ await processor(message, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception exception)
+ {
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ activity.SetTag("otel.status_code", "ERROR");
+
+ var exceptionTags = new ActivityTagsCollection
+ {
+ { "exception.type", exception.GetType().FullName },
+ { "exception.stacktrace", exception.ToString() }
+ };
+
+ if (!string.IsNullOrWhiteSpace(exception.Message))
+ exceptionTags.Add("exception.message", exception.Message);
+
+ var activityEvent = new ActivityEvent("exception", default, exceptionTags);
+ activity.AddEvent(activityEvent);
+ }
+ }
+
+ activity?.Dispose();
+
+ await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ private static Activity? StartActivity(IMessage message, string operationName, IEnumerable<KeyValuePair<string, object?>> tags)
+ {
+ if (!DotPulsarActivitySource.ActivitySource.HasListeners())
+ return null;
+
+ var properties = message.Properties;
+
+ if (properties.TryGetValue("traceparent", out var traceparent)) // TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
+ {
+ var tracestate = properties.ContainsKey("tracestate") ? properties["tracestrate"] : null;
+ if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
+ return DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
+ }
+
+ var activity = DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
+
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ foreach (var tag in tags)
+ activity.SetTag(tag.Key, tag.Value);
+ }
+
+ return activity;
+ }
+
+ /// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
/// <returns>
diff --git a/src/DotPulsar/Extensions/MessageBuilderExtensions.cs b/src/DotPulsar/Extensions/MessageBuilderExtensions.cs
new file mode 100644
index 0000000..fe87ead
--- /dev/null
+++ b/src/DotPulsar/Extensions/MessageBuilderExtensions.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ public static class MessageBuilderExtensions
+ {
+ /// <summary>
+ /// Sends a message.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this IMessageBuilder<ReadOnlySequence<byte>> builder, byte[] payload, CancellationToken cancellationToken = default)
+ => await builder.Send(new ReadOnlySequence<byte>(payload), cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Sends a message.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this IMessageBuilder<ReadOnlySequence<byte>> builder, ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default)
+ => await builder.Send(new ReadOnlySequence<byte>(payload), cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
new file mode 100644
index 0000000..7c4a42c
--- /dev/null
+++ b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Internal;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ public static class ProducerBuilderExtensions
+ {
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IProducerBuilder<TMessage> builder,
+ Action<ProducerStateChanged, CancellationToken> handler,
+ CancellationToken cancellationToken = default)
+ {
+ builder.StateChangedHandler(new ActionStateChangedHandler<ProducerStateChanged>(handler, cancellationToken));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IProducerBuilder<TMessage> builder,
+ Func<ProducerStateChanged, CancellationToken, ValueTask> handler,
+ CancellationToken cancellationToken = default)
+ {
+ builder.StateChangedHandler(new FuncStateChangedHandler<ProducerStateChanged>(handler, cancellationToken));
+ return builder;
+ }
+ }
+}
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
index 8f02262..d292468 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -27,8 +27,8 @@
/// <summary>
/// Get a builder that can be used to configure and build a Message.
/// </summary>
- public static IMessageBuilder NewMessage(this IProducer producer)
- => new MessageBuilder(producer);
+ public static IMessageBuilder<TMessage> NewMessage<TMessage>(this IProducer<TMessage> producer)
+ => new MessageBuilder<TMessage>(producer);
/// <summary>
/// Wait for the state to change to a specific state.
diff --git a/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs b/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs
new file mode 100644
index 0000000..2ec6c8d
--- /dev/null
+++ b/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Internal;
+ using System;
+ using System.Threading.Tasks;
+
+ public static class PulsarClientBuilderExtensions
+ {
+ /// <summary>
+ /// Register a custom exception handler that will be invoked before the default exception handler.
+ /// </summary>
+ public static IPulsarClientBuilder ExceptionHandler(this IPulsarClientBuilder builder, Action<ExceptionContext> exceptionHandler)
+ {
+ builder.ExceptionHandler(new ActionExceptionHandler(exceptionHandler));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a custom exception handler that will be invoked before the default exception handler.
+ /// </summary>
+ public static IPulsarClientBuilder ExceptionHandler(this IPulsarClientBuilder builder, Func<ExceptionContext, ValueTask> exceptionHandler)
+ {
+ builder.ExceptionHandler(new FuncExceptionHandler(exceptionHandler));
+ return builder;
+ }
+ }
+}
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
index 574bce1..2cb3097 100644
--- a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -15,7 +15,9 @@
namespace DotPulsar.Extensions
{
using Abstractions;
+ using DotPulsar.Schemas;
using Internal;
+ using System.Buffers;
/// <summary>
/// Extensions for IPulsarClient.
@@ -25,19 +27,37 @@
/// <summary>
/// Get a builder that can be used to configure and build a Producer instance.
/// </summary>
- public static IProducerBuilder NewProducer(this IPulsarClient pulsarClient)
- => new ProducerBuilder(pulsarClient);
+ public static IProducerBuilder<ReadOnlySequence<byte>> NewProducer(this IPulsarClient pulsarClient)
+ => new ProducerBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
/// <summary>
/// Get a builder that can be used to configure and build a Consumer instance.
/// </summary>
- public static IConsumerBuilder NewConsumer(this IPulsarClient pulsarClient)
- => new ConsumerBuilder(pulsarClient);
+ public static IConsumerBuilder<ReadOnlySequence<byte>> NewConsumer(this IPulsarClient pulsarClient)
+ => new ConsumerBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
/// <summary>
/// Get a builder that can be used to configure and build a Reader instance.
/// </summary>
- public static IReaderBuilder NewReader(this IPulsarClient pulsarClient)
- => new ReaderBuilder(pulsarClient);
+ public static IReaderBuilder<ReadOnlySequence<byte>> NewReader(this IPulsarClient pulsarClient)
+ => new ReaderBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
+
+ /// <summary>
+ /// Get a builder that can be used to configure and build a Producer instance.
+ /// </summary>
+ public static IProducerBuilder<TMessage> NewProducer<TMessage>(this IPulsarClient pulsarClient, ISchema<TMessage> schema)
+ => new ProducerBuilder<TMessage>(pulsarClient, schema);
+
+ /// <summary>
+ /// Get a builder that can be used to configure and build a Consumer instance.
+ /// </summary>
+ public static IConsumerBuilder<TMessage> NewConsumer<TMessage>(this IPulsarClient pulsarClient, ISchema<TMessage> schema)
+ => new ConsumerBuilder<TMessage>(pulsarClient, schema);
+
+ /// <summary>
+ /// Get a builder that can be used to configure and build a Reader instance.
+ /// </summary>
+ public static IReaderBuilder<TMessage> NewReader<TMessage>(this IPulsarClient pulsarClient, ISchema<TMessage> schema)
+ => new ReaderBuilder<TMessage>(pulsarClient, schema);
}
}
diff --git a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
new file mode 100644
index 0000000..c9d4b69
--- /dev/null
+++ b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Internal;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ public static class ReaderBuilderExtensions
+ {
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IReaderBuilder<TMessage> builder,
+ Action<ReaderStateChanged, CancellationToken> handler,
+ CancellationToken cancellationToken = default)
+ {
+ builder.StateChangedHandler(new ActionStateChangedHandler<ReaderStateChanged>(handler, cancellationToken));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IReaderBuilder<TMessage> builder,
+ Func<ReaderStateChanged, CancellationToken, ValueTask> handler,
+ CancellationToken cancellationToken = default)
+ {
+ builder.StateChangedHandler(new FuncStateChangedHandler<ReaderStateChanged>(handler, cancellationToken));
+ return builder;
+ }
+ }
+}
diff --git a/src/DotPulsar/Extensions/ReceiveExtensions.cs b/src/DotPulsar/Extensions/ReceiveExtensions.cs
index c2262c6..5a165a6 100644
--- a/src/DotPulsar/Extensions/ReceiveExtensions.cs
+++ b/src/DotPulsar/Extensions/ReceiveExtensions.cs
@@ -27,7 +27,7 @@
/// <summary>
/// Get an IAsyncEnumerable for receiving messages.
/// </summary>
- public static async IAsyncEnumerable<Message> Messages(this IReceive receiver, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public static async IAsyncEnumerable<TMessage> Messages<TMessage>(this IReceive<TMessage> receiver, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
yield return await receiver.Receive(cancellationToken).ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index f4abd8b..368a111 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -40,5 +40,6 @@
Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken);
Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken);
Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index ef09431..a359508 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -14,19 +14,20 @@
namespace DotPulsar.Internal.Abstractions
{
+ using DotPulsar.Abstractions;
using PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;
- public interface IConsumerChannel : IAsyncDisposable
+ public interface IConsumerChannel<TMessage> : IAsyncDisposable
{
Task Send(CommandAck command, CancellationToken cancellationToken);
Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
Task Send(CommandUnsubscribe command, CancellationToken cancellationToken);
Task Send(CommandSeek command, CancellationToken cancellationToken);
Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
- ValueTask<Message> Receive(CancellationToken cancellationToken);
+ ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken);
ValueTask ClosedByClient(CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
index 3065119..76fb932 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
@@ -17,8 +17,8 @@
using System.Threading;
using System.Threading.Tasks;
- public interface IConsumerChannelFactory
+ public interface IConsumerChannelFactory<TMessage>
{
- Task<IConsumerChannel> Create(CancellationToken cancellationToken = default);
+ Task<IConsumerChannel<TMessage>> Create(CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IEstablishNewChannel.cs b/src/DotPulsar/Internal/Abstractions/IEstablishNewChannel.cs
new file mode 100644
index 0000000..bcf8723
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IEstablishNewChannel.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ public interface IEstablishNewChannel : IAsyncDisposable
+ {
+ Task EstablishNewChannel(CancellationToken cancellationToken);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IMessageFactory.cs b/src/DotPulsar/Internal/Abstractions/IMessageFactory.cs
new file mode 100644
index 0000000..ebdb3ee
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IMessageFactory.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Internal.PulsarApi;
+ using System.Buffers;
+
+ public interface IMessageFactory<TValue>
+ {
+ IMessage<TValue> Create(MessageId messageId, uint redeliveryCount, ReadOnlySequence<byte> data, MessageMetadata metadata, SingleMessageMetadata? singleMetadata = null);
+ }
+}
diff --git a/src/DotPulsar/Internal/Process.cs b/src/DotPulsar/Internal/Abstractions/Process.cs
similarity index 97%
rename from src/DotPulsar/Internal/Process.cs
rename to src/DotPulsar/Internal/Abstractions/Process.cs
index 40b9712..056471a 100644
--- a/src/DotPulsar/Internal/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -12,9 +12,8 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal.Abstractions
{
- using Abstractions;
using Events;
using System;
using System.Threading;
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index 9523627..60d2e9e 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -14,53 +14,64 @@
namespace DotPulsar.Internal
{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Internal.Abstractions;
using Extensions;
using PulsarApi;
using System.Buffers;
using System.Collections;
using System.Collections.Generic;
- public sealed class BatchHandler
+ public sealed class BatchHandler<TMessage>
{
private readonly object _lock;
private readonly bool _trackBatches;
- private readonly Queue<Message> _messages;
+ private readonly IMessageFactory<TMessage> _messageFactory;
+ private readonly Queue<IMessage<TMessage>> _messages;
private readonly LinkedList<Batch> _batches;
- public BatchHandler(bool trackBatches)
+ public BatchHandler(bool trackBatches, IMessageFactory<TMessage> messageFactory)
{
_lock = new object();
_trackBatches = trackBatches;
- _messages = new Queue<Message>();
+ _messageFactory = messageFactory;
+ _messages = new Queue<IMessage<TMessage>>();
_batches = new LinkedList<Batch>();
}
- public Message Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
+ public IMessage<TMessage> Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
{
+ var messages = new List<IMessage<TMessage>>(metadata.NumMessagesInBatch);
+
+ long index = 0;
+
+ for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+ {
+ var singleMetadataSize = data.ReadUInt32(index, true);
+ index += 4;
+ var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
+ index += singleMetadataSize;
+ var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
+ var message = _messageFactory.Create(singleMessageId, redeliveryCount, data.Slice(index, singleMetadata.PayloadSize), metadata, singleMetadata);
+ messages.Add(message);
+ index += (uint) singleMetadata.PayloadSize;
+ }
+
lock (_lock)
{
if (_trackBatches)
_batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
- long index = 0;
-
- for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+ foreach (var message in messages)
{
- var singleMetadataSize = data.ReadUInt32(index, true);
- index += 4;
- var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
- index += singleMetadataSize;
- var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
- var message = MessageFactory.Create(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
_messages.Enqueue(message);
- index += (uint) singleMetadata.PayloadSize;
}
return _messages.Dequeue();
}
}
- public Message? GetNext()
+ public IMessage<TMessage>? GetNext()
{
lock (_lock)
return _messages.Count == 0 ? null : _messages.Dequeue();
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 02280dd..56bd58d 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -184,6 +184,23 @@
return await response.ConfigureAwait(false);
}
+ public async Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ Task<BaseCommand>? responseTask;
+
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ {
+ var baseCommand = command.AsBaseCommand();
+ responseTask = _requestResponseHandler.Outgoing(baseCommand);
+ var sequence = Serializer.Serialize(baseCommand);
+ await _stream.Send(sequence).ConfigureAwait(false);
+ }
+
+ return await responseTask.ConfigureAwait(false);
+ }
+
private async Task<BaseCommand> SendRequestResponse(BaseCommand command, CancellationToken cancellationToken)
{
ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index c29db02..8b1b46b 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -27,14 +27,15 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class Consumer : IConsumer
+ public sealed class Consumer<TMessage> : IEstablishNewChannel, IConsumer<TMessage>
{
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
- private IConsumerChannel _channel;
+ private IConsumerChannel<TMessage> _channel;
private readonly ObjectPool<CommandAck> _commandAckPool;
private readonly IExecute _executor;
private readonly IStateChanged<ConsumerState> _state;
+ private readonly IConsumerChannelFactory<TMessage> _factory;
private int _isDisposed;
public Uri ServiceUrl { get; }
@@ -47,9 +48,10 @@
string subscriptionName,
string topic,
IRegisterEvent eventRegister,
- IConsumerChannel initialChannel,
+ IConsumerChannel<TMessage> initialChannel,
IExecute executor,
- IStateChanged<ConsumerState> state)
+ IStateChanged<ConsumerState> state,
+ IConsumerChannelFactory<TMessage> factory)
{
_correlationId = correlationId;
ServiceUrl = serviceUrl;
@@ -59,10 +61,11 @@
_channel = initialChannel;
_executor = executor;
_state = state;
+ _factory = factory;
_commandAckPool = new DefaultObjectPool<CommandAck>(new DefaultPooledObjectPolicy<CommandAck>());
_isDisposed = 0;
- _eventRegister.Register(new ConsumerCreated(_correlationId, this));
+ _eventRegister.Register(new ConsumerCreated(_correlationId));
}
public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, CancellationToken cancellationToken)
@@ -82,19 +85,19 @@
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ConsumerDisposed(_correlationId, this));
+ _eventRegister.Register(new ConsumerDisposed(_correlationId));
await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
- public async ValueTask<Message> Receive(CancellationToken cancellationToken)
+ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken)
{
ThrowIfDisposed();
return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<Message> ReceiveMessage(CancellationToken cancellationToken)
+ private async ValueTask<IMessage<TMessage>> ReceiveMessage(CancellationToken cancellationToken)
=> await _channel.Receive(cancellationToken).ConfigureAwait(false);
public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
@@ -183,13 +186,15 @@
private async ValueTask RedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
=> await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- internal async ValueTask SetChannel(IConsumerChannel channel)
+ private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- {
- await channel.DisposeAsync().ConfigureAwait(false);
- return;
- }
+ throw new ConsumerDisposedException(GetType().FullName!);
+ }
+
+ public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ {
+ var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
var oldChannel = _channel;
_channel = channel;
@@ -197,11 +202,5 @@
if (oldChannel is not null)
await oldChannel.DisposeAsync().ConfigureAwait(false);
}
-
- private void ThrowIfDisposed()
- {
- if (_isDisposed != 0)
- throw new ConsumerDisposedException();
- }
}
}
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 6deee54..3e48ece 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -16,13 +16,11 @@
{
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- public sealed class ConsumerBuilder : IConsumerBuilder
+ public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
{
private readonly IPulsarClient _pulsarClient;
+ private readonly ISchema<TMessage> _schema;
private string? _consumerName;
private SubscriptionInitialPosition _initialPosition;
private int _priorityLevel;
@@ -33,83 +31,72 @@
private string? _topic;
private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
- public ConsumerBuilder(IPulsarClient pulsarClient)
+ public ConsumerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
{
+ _schema = schema;
_pulsarClient = pulsarClient;
- _initialPosition = ConsumerOptions.DefaultInitialPosition;
- _priorityLevel = ConsumerOptions.DefaultPriorityLevel;
- _messagePrefetchCount = ConsumerOptions.DefaultMessagePrefetchCount;
- _readCompacted = ConsumerOptions.DefaultReadCompacted;
- _subscriptionType = ConsumerOptions.DefaultSubscriptionType;
+ _initialPosition = ConsumerOptions<TMessage>.DefaultInitialPosition;
+ _priorityLevel = ConsumerOptions<TMessage>.DefaultPriorityLevel;
+ _messagePrefetchCount = ConsumerOptions<TMessage>.DefaultMessagePrefetchCount;
+ _readCompacted = ConsumerOptions<TMessage>.DefaultReadCompacted;
+ _subscriptionType = ConsumerOptions<TMessage>.DefaultSubscriptionType;
}
- public IConsumerBuilder ConsumerName(string name)
+ public IConsumerBuilder<TMessage> ConsumerName(string name)
{
_consumerName = name;
return this;
}
- public IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition)
+ public IConsumerBuilder<TMessage> InitialPosition(SubscriptionInitialPosition initialPosition)
{
_initialPosition = initialPosition;
return this;
}
- public IConsumerBuilder MessagePrefetchCount(uint count)
+ public IConsumerBuilder<TMessage> MessagePrefetchCount(uint count)
{
_messagePrefetchCount = count;
return this;
}
- public IConsumerBuilder PriorityLevel(int priorityLevel)
+ public IConsumerBuilder<TMessage> PriorityLevel(int priorityLevel)
{
_priorityLevel = priorityLevel;
return this;
}
- public IConsumerBuilder ReadCompacted(bool readCompacted)
+ public IConsumerBuilder<TMessage> ReadCompacted(bool readCompacted)
{
_readCompacted = readCompacted;
return this;
}
- public IConsumerBuilder StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler)
+ public IConsumerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler)
{
_stateChangedHandler = handler;
return this;
}
- public IConsumerBuilder StateChangedHandler(Action<ConsumerStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
- {
- _stateChangedHandler = new ActionStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken);
- return this;
- }
-
- public IConsumerBuilder StateChangedHandler(Func<ConsumerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
- {
- _stateChangedHandler = new FuncStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken);
- return this;
- }
-
- public IConsumerBuilder SubscriptionName(string name)
+ public IConsumerBuilder<TMessage> SubscriptionName(string name)
{
_subscriptionName = name;
return this;
}
- public IConsumerBuilder SubscriptionType(SubscriptionType type)
+ public IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type)
{
_subscriptionType = type;
return this;
}
- public IConsumerBuilder Topic(string topic)
+ public IConsumerBuilder<TMessage> Topic(string topic)
{
_topic = topic;
return this;
}
- public IConsumer Create()
+ public IConsumer<TMessage> Create()
{
if (string.IsNullOrEmpty(_subscriptionName))
throw new ConfigurationException("SubscriptionName may not be null or empty");
@@ -117,7 +104,7 @@
if (string.IsNullOrEmpty(_topic))
throw new ConfigurationException("Topic may not be null or empty");
- var options = new ConsumerOptions(_subscriptionName!, _topic!)
+ var options = new ConsumerOptions<TMessage>(_subscriptionName!, _topic!, _schema)
{
ConsumerName = _consumerName,
InitialPosition = _initialPosition,
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 479e89e..3dfe343 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal
{
using Abstractions;
+ using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using Extensions;
using PulsarApi;
@@ -23,13 +24,14 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class ConsumerChannel : IConsumerChannel
+ public sealed class ConsumerChannel<TMessage> : IConsumerChannel<TMessage>
{
private readonly ulong _id;
private readonly AsyncQueue<MessagePackage> _queue;
private readonly IConnection _connection;
- private readonly BatchHandler _batchHandler;
+ private readonly BatchHandler<TMessage> _batchHandler;
private readonly CommandFlow _cachedCommandFlow;
+ private readonly IMessageFactory<TMessage> _messageFactory;
private readonly IDecompress?[] _decompressors;
private readonly AsyncLock _lock;
private uint _sendWhenZero;
@@ -40,13 +42,15 @@
uint messagePrefetchCount,
AsyncQueue<MessagePackage> queue,
IConnection connection,
- BatchHandler batchHandler,
+ BatchHandler<TMessage> batchHandler,
+ IMessageFactory<TMessage> messageFactory,
IEnumerable<IDecompressorFactory> decompressorFactories)
{
_id = id;
_queue = queue;
_connection = connection;
_batchHandler = batchHandler;
+ _messageFactory = messageFactory;
_decompressors = new IDecompress[5];
@@ -67,7 +71,7 @@
_firstFlow = true;
}
- public async ValueTask<Message> Receive(CancellationToken cancellationToken)
+ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken)
{
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
@@ -115,9 +119,20 @@
var messageId = messagePackage.MessageId;
var redeliveryCount = messagePackage.RedeliveryCount;
- return metadata.ShouldSerializeNumMessagesInBatch()
- ? _batchHandler.Add(messageId, redeliveryCount, metadata, data)
- : MessageFactory.Create(messageId.ToMessageId(), redeliveryCount, metadata, data);
+ if (metadata.ShouldSerializeNumMessagesInBatch())
+ {
+ try
+ {
+ return _batchHandler.Add(messageId, redeliveryCount, metadata, data);
+ }
+ catch
+ {
+ await RejectPackage(messagePackage, CommandAck.ValidationErrorType.BatchDeSerializeError, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
+ }
+
+ return _messageFactory.Create(messageId.ToMessageId(), redeliveryCount, data, metadata);
}
}
}
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 5d20ce5..24954f7 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -21,47 +21,44 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class ConsumerChannelFactory : IConsumerChannelFactory
+ public sealed class ConsumerChannelFactory<TMessage> : IConsumerChannelFactory<TMessage>
{
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
private readonly IConnectionPool _connectionPool;
- private readonly IExecute _executor;
private readonly CommandSubscribe _subscribe;
private readonly uint _messagePrefetchCount;
- private readonly BatchHandler _batchHandler;
+ private readonly BatchHandler<TMessage> _batchHandler;
+ private readonly IMessageFactory<TMessage> _messageFactory;
private readonly IEnumerable<IDecompressorFactory> _decompressorFactories;
public ConsumerChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
- IExecute executor,
CommandSubscribe subscribe,
uint messagePrefetchCount,
- BatchHandler batchHandler,
+ BatchHandler<TMessage> batchHandler,
+ IMessageFactory<TMessage> messageFactory,
IEnumerable<IDecompressorFactory> decompressorFactories)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
_connectionPool = connectionPool;
- _executor = executor;
_subscribe = subscribe;
_messagePrefetchCount = messagePrefetchCount;
_batchHandler = batchHandler;
+ _messageFactory = messageFactory;
_decompressorFactories = decompressorFactories;
}
- public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
- => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellationToken)
+ public async Task<IConsumerChannel<TMessage>> Create(CancellationToken cancellationToken)
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
- return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _decompressorFactories);
+ return new ConsumerChannel<TMessage>(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _messageFactory, _decompressorFactories);
}
}
}
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs b/src/DotPulsar/Internal/ConsumerProcess.cs
index a078146..f2296c2 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -21,19 +21,16 @@
public sealed class ConsumerProcess : Process
{
private readonly IStateManager<ConsumerState> _stateManager;
- private readonly IConsumerChannelFactory _factory;
- private readonly Consumer _consumer;
+ private readonly IEstablishNewChannel _consumer;
private readonly bool _isFailoverSubscription;
public ConsumerProcess(
Guid correlationId,
IStateManager<ConsumerState> stateManager,
- IConsumerChannelFactory factory,
- Consumer consumer,
+ IEstablishNewChannel consumer,
bool isFailoverSubscription) : base(correlationId)
{
_stateManager = stateManager;
- _factory = factory;
_consumer = consumer;
_isFailoverSubscription = isFailoverSubscription;
}
@@ -47,7 +44,7 @@
protected override void CalculateState()
{
- if (_consumer.IsFinalState())
+ if (_stateManager.IsFinalState())
return;
if (ExecutorState == ExecutorState.Faulted)
@@ -67,7 +64,7 @@
case ChannelState.ClosedByServer:
case ChannelState.Disconnected:
_stateManager.SetState(ConsumerState.Disconnected);
- SetupChannel();
+ _ = _consumer.EstablishNewChannel(CancellationTokenSource.Token);
return;
case ChannelState.Connected:
if (!_isFailoverSubscription)
@@ -81,18 +78,5 @@
return;
}
}
-
- private async void SetupChannel()
- {
- try
- {
- var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- await _consumer.SetChannel(channel).ConfigureAwait(false);
- }
- catch
- {
- // ignored
- }
- }
}
}
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
new file mode 100644
index 0000000..09c8464
--- /dev/null
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using System.Diagnostics;
+
+ public static class DotPulsarActivitySource
+ {
+ static DotPulsarActivitySource()
+ {
+ ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
+ }
+
+ public static ActivitySource ActivitySource { get; }
+ }
+}
diff --git a/src/DotPulsar/Internal/Events/ConsumerCreated.cs b/src/DotPulsar/Internal/Events/ConsumerCreated.cs
index 19edea8..41c5513 100644
--- a/src/DotPulsar/Internal/Events/ConsumerCreated.cs
+++ b/src/DotPulsar/Internal/Events/ConsumerCreated.cs
@@ -19,13 +19,9 @@
public sealed class ConsumerCreated : IEvent
{
- public ConsumerCreated(Guid correlationId, Consumer consumer)
- {
- CorrelationId = correlationId;
- Consumer = consumer;
- }
+ public ConsumerCreated(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
- public Consumer Consumer { get; }
}
}
diff --git a/src/DotPulsar/Internal/Events/ConsumerDisposed.cs b/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
index ffe3bdf..130f9c9 100644
--- a/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
@@ -19,13 +19,9 @@
public sealed class ConsumerDisposed : IEvent
{
- public ConsumerDisposed(Guid correlationId, Consumer consumer)
- {
- CorrelationId = correlationId;
- Consumer = consumer;
- }
+ public ConsumerDisposed(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
- public Consumer Consumer { get; }
}
}
diff --git a/src/DotPulsar/Internal/Events/ProducerCreated.cs b/src/DotPulsar/Internal/Events/ProducerCreated.cs
index 1c77f06..2a89db7 100644
--- a/src/DotPulsar/Internal/Events/ProducerCreated.cs
+++ b/src/DotPulsar/Internal/Events/ProducerCreated.cs
@@ -19,13 +19,9 @@
public sealed class ProducerCreated : IEvent
{
- public ProducerCreated(Guid correlationId, Producer producer)
- {
- CorrelationId = correlationId;
- Producer = producer;
- }
+ public ProducerCreated(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
- public Producer Producer { get; }
}
}
diff --git a/src/DotPulsar/Internal/Events/ProducerDisposed.cs b/src/DotPulsar/Internal/Events/ProducerDisposed.cs
index 849f6d9..223024f 100644
--- a/src/DotPulsar/Internal/Events/ProducerDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ProducerDisposed.cs
@@ -19,13 +19,9 @@
public sealed class ProducerDisposed : IEvent
{
- public ProducerDisposed(Guid correlationId, Producer producer)
- {
- CorrelationId = correlationId;
- Producer = producer;
- }
+ public ProducerDisposed(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
- public Producer Producer { get; }
}
}
diff --git a/src/DotPulsar/Internal/Events/ReaderCreated.cs b/src/DotPulsar/Internal/Events/ReaderCreated.cs
index 905e58d..1aa55f1 100644
--- a/src/DotPulsar/Internal/Events/ReaderCreated.cs
+++ b/src/DotPulsar/Internal/Events/ReaderCreated.cs
@@ -19,13 +19,9 @@
public sealed class ReaderCreated : IEvent
{
- public ReaderCreated(Guid correlationId, Reader reader)
- {
- CorrelationId = correlationId;
- Reader = reader;
- }
+ public ReaderCreated(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
- public Reader Reader { get; }
}
}
diff --git a/src/DotPulsar/Internal/Events/ReaderDisposed.cs b/src/DotPulsar/Internal/Events/ReaderDisposed.cs
index c1bd28c..33a9ac6 100644
--- a/src/DotPulsar/Internal/Events/ReaderDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ReaderDisposed.cs
@@ -19,13 +19,9 @@
public sealed class ReaderDisposed : IEvent
{
- public ReaderDisposed(Guid correlationId, Reader reader)
- {
- CorrelationId = correlationId;
- Reader = reader;
- }
+ public ReaderDisposed(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
- public Reader Reader { get; }
}
}
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 0365895..867c21d 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -40,8 +40,11 @@
public static void Throw(this CommandLookupTopicResponse command)
=> Throw(command.Error, command.Message);
- public static void Throw(this CommandError error)
- => Throw(error.Error, error.Message);
+ public static void Throw(this CommandError command)
+ => Throw(command.Error, command.Message);
+
+ public static void Throw(this CommandGetOrCreateSchemaResponse command)
+ => Throw(command.ErrorCode, command.ErrorMessage);
private static void Throw(ServerError error, string message)
=> throw (error switch
@@ -177,5 +180,12 @@
CommandType = BaseCommand.Type.RedeliverUnacknowledgedMessages,
RedeliverUnacknowledgedMessages = command
};
+
+ public static BaseCommand AsBaseCommand(this CommandGetOrCreateSchema command)
+ => new BaseCommand
+ {
+ CommandType = BaseCommand.Type.GetOrCreateSchema,
+ GetOrCreateSchema = command
+ };
}
}
diff --git a/src/DotPulsar/Internal/Message.cs b/src/DotPulsar/Internal/Message.cs
new file mode 100644
index 0000000..a5b0972
--- /dev/null
+++ b/src/DotPulsar/Internal/Message.cs
@@ -0,0 +1,98 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Generic;
+
+ public sealed class Message<TValue> : IMessage<TValue>
+ {
+ private readonly ISchema<TValue> _schema;
+
+ internal Message(
+ MessageId messageId,
+ ReadOnlySequence<byte> data,
+ string producerName,
+ ulong sequenceId,
+ uint redeliveryCount,
+ ulong eventTime,
+ ulong publishTime,
+ IReadOnlyDictionary<string, string> properties,
+ bool hasBase64EncodedKey,
+ string? key,
+ byte[]? orderingKey,
+ byte[]? schemaVersion,
+ ISchema<TValue> schema)
+ {
+ MessageId = messageId;
+ Data = data;
+ ProducerName = producerName;
+ SequenceId = sequenceId;
+ RedeliveryCount = redeliveryCount;
+ EventTime = eventTime;
+ PublishTime = publishTime;
+ Properties = properties;
+ HasBase64EncodedKey = hasBase64EncodedKey;
+ Key = key;
+ OrderingKey = orderingKey;
+ SchemaVersion = schemaVersion;
+ _schema = schema;
+ }
+
+ public MessageId MessageId { get; }
+
+ public ReadOnlySequence<byte> Data { get; }
+
+ public string ProducerName { get; }
+
+ public byte[]? SchemaVersion { get; }
+
+ public ulong SequenceId { get; }
+
+ public uint RedeliveryCount { get; }
+
+ public bool HasEventTime => EventTime != 0;
+
+ public ulong EventTime { get; }
+
+ public DateTime EventTimeAsDateTime => EventTimeAsDateTimeOffset.UtcDateTime;
+
+ public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) EventTime);
+
+ public bool HasBase64EncodedKey { get; }
+
+ public bool HasKey => Key is not null;
+
+ public string? Key { get; }
+
+ public byte[]? KeyBytes => Key is not null ? Convert.FromBase64String(Key) : null;
+
+ public bool HasOrderingKey => OrderingKey is not null;
+
+ public byte[]? OrderingKey { get; }
+
+ public ulong PublishTime { get; }
+
+ public DateTime PublishTimeAsDateTime => PublishTimeAsDateTimeOffset.UtcDateTime;
+
+ public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) PublishTime);
+
+ public IReadOnlyDictionary<string, string> Properties { get; }
+
+ public TValue Value => _schema.Decode(Data);
+ }
+}
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index c17ec19..44b3812 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -15,90 +15,95 @@
namespace DotPulsar.Internal
{
using DotPulsar.Abstractions;
- using DotPulsar.Extensions;
using Extensions;
using System;
using System.Threading;
using System.Threading.Tasks;
- public sealed class MessageBuilder : IMessageBuilder
+ public sealed class MessageBuilder<TMessage> : IMessageBuilder<TMessage>
{
- private readonly IProducer _producer;
+ private readonly IProducer<TMessage> _producer;
private readonly MessageMetadata _metadata;
- public MessageBuilder(IProducer producer)
+ public MessageBuilder(IProducer<TMessage> producer)
{
_producer = producer;
_metadata = new MessageMetadata();
}
- public IMessageBuilder DeliverAt(long timestamp)
+ public IMessageBuilder<TMessage> DeliverAt(long timestamp)
{
_metadata.Metadata.DeliverAtTime = timestamp;
return this;
}
- public IMessageBuilder DeliverAt(DateTime timestamp)
+ public IMessageBuilder<TMessage> DeliverAt(DateTime timestamp)
{
_metadata.Metadata.SetDeliverAtTime(timestamp);
return this;
}
- public IMessageBuilder DeliverAt(DateTimeOffset timestamp)
+ public IMessageBuilder<TMessage> DeliverAt(DateTimeOffset timestamp)
{
_metadata.Metadata.SetDeliverAtTime(timestamp);
return this;
}
- public IMessageBuilder EventTime(ulong eventTime)
+ public IMessageBuilder<TMessage> EventTime(ulong eventTime)
{
_metadata.Metadata.EventTime = eventTime;
return this;
}
- public IMessageBuilder EventTime(DateTime eventTime)
+ public IMessageBuilder<TMessage> EventTime(DateTime eventTime)
{
_metadata.Metadata.SetEventTime(eventTime);
return this;
}
- public IMessageBuilder EventTime(DateTimeOffset eventTime)
+ public IMessageBuilder<TMessage> EventTime(DateTimeOffset eventTime)
{
_metadata.Metadata.SetEventTime(eventTime);
return this;
}
- public IMessageBuilder Key(string key)
+ public IMessageBuilder<TMessage> Key(string key)
{
_metadata.Metadata.SetKey(key);
return this;
}
- public IMessageBuilder KeyBytes(byte[] key)
+ public IMessageBuilder<TMessage> KeyBytes(byte[] key)
{
_metadata.Metadata.SetKey(key);
return this;
}
- public IMessageBuilder OrderingKey(byte[] key)
+ public IMessageBuilder<TMessage> OrderingKey(byte[] key)
{
_metadata.Metadata.OrderingKey = key;
return this;
}
- public IMessageBuilder Property(string key, string value)
+ public IMessageBuilder<TMessage> Property(string key, string value)
{
_metadata[key] = value;
return this;
}
- public IMessageBuilder SequenceId(ulong sequenceId)
+ public IMessageBuilder<TMessage> SchemaVersion(byte[] schemaVersion)
+ {
+ _metadata.Metadata.SchemaVersion = schemaVersion;
+ return this;
+ }
+
+ public IMessageBuilder<TMessage> SequenceId(ulong sequenceId)
{
_metadata.Metadata.SequenceId = sequenceId;
return this;
}
- public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => await _producer.Send(_metadata, data, cancellationToken).ConfigureAwait(false);
+ public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
+ => await _producer.Send(_metadata, message, cancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/MessageFactory.cs b/src/DotPulsar/Internal/MessageFactory.cs
index 87bf110..1f68425 100644
--- a/src/DotPulsar/Internal/MessageFactory.cs
+++ b/src/DotPulsar/Internal/MessageFactory.cs
@@ -14,11 +14,13 @@
namespace DotPulsar.Internal
{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System.Buffers;
using System.Collections.Generic;
- public static class MessageFactory
+ public sealed class MessageFactory<TValue> : IMessageFactory<TValue>
{
private static readonly Dictionary<string, string> _empty;
@@ -40,13 +42,26 @@
return dictionary;
}
- public static Message Create(
+ private readonly ISchema<TValue> _schema;
+
+ public MessageFactory(ISchema<TValue> schema)
+ => _schema = schema;
+
+ public IMessage<TValue> Create(MessageId messageId, uint redeliveryCount, ReadOnlySequence<byte> data, MessageMetadata metadata, SingleMessageMetadata? singleMetadata = null)
+ {
+ if (singleMetadata is null)
+ return Create(messageId, redeliveryCount, metadata, data);
+
+ return Create(messageId, redeliveryCount, metadata, singleMetadata, data);
+ }
+
+ private Message<TValue> Create(
MessageId messageId,
uint redeliveryCount,
MessageMetadata metadata,
ReadOnlySequence<byte> data)
{
- return new Message(
+ return new Message<TValue>(
messageId: messageId,
data: data,
producerName: metadata.ProducerName,
@@ -57,17 +72,19 @@
properties: FromKeyValueList(metadata.Properties),
hasBase64EncodedKey: metadata.PartitionKeyB64Encoded,
key: metadata.PartitionKey,
- orderingKey: metadata.OrderingKey);
+ orderingKey: metadata.OrderingKey,
+ schemaVersion: metadata.SchemaVersion,
+ _schema);
}
- public static Message Create(
+ private Message<TValue> Create(
MessageId messageId,
uint redeliveryCount,
MessageMetadata metadata,
SingleMessageMetadata singleMetadata,
ReadOnlySequence<byte> data)
{
- return new Message(
+ return new Message<TValue>(
messageId: messageId,
data: data,
producerName: metadata.ProducerName,
@@ -78,23 +95,9 @@
properties: FromKeyValueList(singleMetadata.Properties),
hasBase64EncodedKey: singleMetadata.PartitionKeyB64Encoded,
key: singleMetadata.PartitionKey,
- orderingKey: singleMetadata.OrderingKey);
+ orderingKey: singleMetadata.OrderingKey,
+ schemaVersion: metadata.SchemaVersion,
+ _schema);
}
-
- /// <summary>
- /// Intended for testing.
- /// </summary>
- public static Message Create(
- MessageId messageId,
- ReadOnlySequence<byte> data,
- string producerName,
- ulong sequenceId,
- uint redeliveryCount,
- ulong eventTime,
- ulong publishTime,
- IReadOnlyDictionary<string, string> properties,
- bool hasBase64EncodedKey,
- string? key,
- byte[]? orderingKey) => new Message(messageId, data, producerName, sequenceId, redeliveryCount, eventTime, publishTime, properties, hasBase64EncodedKey, key, orderingKey);
}
}
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index 75e724f..4427752 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal
{
using Abstractions;
+ using DotPulsar.Abstractions;
using Exceptions;
using PulsarApi;
using System;
@@ -22,7 +23,7 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel
+ public sealed class NotReadyChannel<TMessage> : IConsumerChannel<TMessage>, IProducerChannel
{
public ValueTask DisposeAsync()
=> new ValueTask();
@@ -30,7 +31,7 @@
public ValueTask ClosedByClient(CancellationToken cancellationToken)
=> new ValueTask();
- public ValueTask<Message> Receive(CancellationToken cancellationToken = default)
+ public ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken = default)
=> throw GetException();
public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 02f9bd3..5264b25 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -25,7 +25,7 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class Producer : IProducer
+ public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
{
private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
private readonly Guid _correlationId;
@@ -33,6 +33,8 @@
private IProducerChannel _channel;
private readonly IExecute _executor;
private readonly IStateChanged<ProducerState> _state;
+ private readonly IProducerChannelFactory _factory;
+ private readonly ISchema<TMessage> _schema;
private readonly SequenceId _sequenceId;
private int _isDisposed;
@@ -47,7 +49,9 @@
IRegisterEvent registerEvent,
IProducerChannel initialChannel,
IExecute executor,
- IStateChanged<ProducerState> state)
+ IStateChanged<ProducerState> state,
+ IProducerChannelFactory factory,
+ ISchema<TMessage> schema)
{
var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
_messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
@@ -59,9 +63,11 @@
_channel = initialChannel;
_executor = executor;
_state = state;
+ _factory = factory;
+ _schema = schema;
_isDisposed = 0;
- _eventRegister.Register(new ProducerCreated(_correlationId, this));
+ _eventRegister.Register(new ProducerCreated(_correlationId));
}
public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
@@ -81,10 +87,15 @@
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ProducerDisposed(_correlationId, this));
+ _eventRegister.Register(new ProducerDisposed(_correlationId));
await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
+ public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
+ => await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
+ => await Send(metadata, _schema.Encode(message), cancellationToken).ConfigureAwait(false);
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
@@ -127,13 +138,9 @@
return response.MessageId.ToMessageId();
}
- internal async ValueTask SetChannel(IProducerChannel channel)
+ public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
- if (_isDisposed != 0)
- {
- await channel.DisposeAsync().ConfigureAwait(false);
- return;
- }
+ var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
var oldChannel = _channel;
_channel = channel;
@@ -145,7 +152,7 @@
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- throw new ProducerDisposedException();
+ throw new ProducerDisposedException(GetType().FullName!);
}
}
}
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 4d66d6d..420b2a2 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -16,74 +16,61 @@
{
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- public sealed class ProducerBuilder : IProducerBuilder
+ public sealed class ProducerBuilder<TMessage> : IProducerBuilder<TMessage>
{
private readonly IPulsarClient _pulsarClient;
+ private readonly ISchema<TMessage> _schema;
private string? _producerName;
private CompressionType _compressionType;
private ulong _initialSequenceId;
private string? _topic;
private IHandleStateChanged<ProducerStateChanged>? _stateChangedHandler;
- public ProducerBuilder(IPulsarClient pulsarClient)
+ public ProducerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
{
_pulsarClient = pulsarClient;
- _compressionType = ProducerOptions.DefaultCompressionType;
- _initialSequenceId = ProducerOptions.DefaultInitialSequenceId;
+ _schema = schema;
+ _compressionType = ProducerOptions<TMessage>.DefaultCompressionType;
+ _initialSequenceId = ProducerOptions<TMessage>.DefaultInitialSequenceId;
}
- public IProducerBuilder CompressionType(CompressionType compressionType)
+ public IProducerBuilder<TMessage> CompressionType(CompressionType compressionType)
{
_compressionType = compressionType;
return this;
}
- public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
+ public IProducerBuilder<TMessage> InitialSequenceId(ulong initialSequenceId)
{
_initialSequenceId = initialSequenceId;
return this;
}
- public IProducerBuilder ProducerName(string name)
+ public IProducerBuilder<TMessage> ProducerName(string name)
{
_producerName = name;
return this;
}
- public IProducerBuilder StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler)
+ public IProducerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler)
{
_stateChangedHandler = handler;
return this;
}
- public IProducerBuilder StateChangedHandler(Action<ProducerStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
- {
- _stateChangedHandler = new ActionStateChangedHandler<ProducerStateChanged>(handler, cancellationToken);
- return this;
- }
-
- public IProducerBuilder StateChangedHandler(Func<ProducerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
- {
- _stateChangedHandler = new FuncStateChangedHandler<ProducerStateChanged>(handler, cancellationToken);
- return this;
- }
-
- public IProducerBuilder Topic(string topic)
+ public IProducerBuilder<TMessage> Topic(string topic)
{
_topic = topic;
return this;
}
- public IProducer Create()
+ public IProducer<TMessage> Create()
{
if (string.IsNullOrEmpty(_topic))
throw new ConfigurationException("ProducerOptions.Topic may not be null or empty");
- var options = new ProducerOptions(_topic!)
+ var options = new ProducerOptions<TMessage>(_topic!, _schema)
{
CompressionType = _compressionType,
InitialSequenceId = _initialSequenceId,
@@ -91,7 +78,7 @@
StateChangedHandler = _stateChangedHandler
};
- return _pulsarClient.CreateProducer(options);
+ return _pulsarClient.CreateProducer<TMessage>(options);
}
}
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index fd9bd0c..20ae831 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -30,12 +30,14 @@
private readonly string _name;
private readonly IConnection _connection;
private readonly ICompressorFactory? _compressorFactory;
+ private readonly byte[]? _schemaVersion;
public ProducerChannel(
ulong id,
string name,
IConnection connection,
- ICompressorFactory? compressorFactory)
+ ICompressorFactory? compressorFactory,
+ byte[]? schemaVersion)
{
var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
_sendPackagePool = new DefaultObjectPool<SendPackage>(sendPackagePolicy);
@@ -43,6 +45,7 @@
_name = name;
_connection = connection;
_compressorFactory = compressorFactory;
+ _schemaVersion = schemaVersion;
}
public async ValueTask ClosedByClient(CancellationToken cancellationToken)
@@ -69,6 +72,9 @@
metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
metadata.ProducerName = _name;
+ if (metadata.SchemaVersion is null && _schemaVersion is not null)
+ metadata.SchemaVersion = _schemaVersion;
+
if (sendPackage.Command is null)
{
sendPackage.Command = new CommandSend
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index a869666..23ef8c5 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal
{
using Abstractions;
+ using DotPulsar.Internal.Extensions;
using PulsarApi;
using System;
using System.Threading;
@@ -25,41 +26,60 @@
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
private readonly IConnectionPool _connectionPool;
- private readonly IExecute _executor;
private readonly CommandProducer _commandProducer;
private readonly ICompressorFactory? _compressorFactory;
+ private readonly Schema? _schema;
public ProducerChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
- IExecute executor,
- ProducerOptions options,
+ string topic,
+ string? producerName,
+ SchemaInfo schemaInfo,
ICompressorFactory? compressorFactory)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
_connectionPool = connectionPool;
- _executor = executor;
_commandProducer = new CommandProducer
{
- ProducerName = options.ProducerName,
- Topic = options.Topic
+ ProducerName = producerName,
+ Topic = topic
};
_compressorFactory = compressorFactory;
+ _schema = schemaInfo.PulsarSchema;
}
public async Task<IProducerChannel> Create(CancellationToken cancellationToken)
- => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IProducerChannel> GetChannel(CancellationToken cancellationToken)
{
var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken).ConfigureAwait(false);
+ var schemaVersion = await GetSchemaVersion(connection, cancellationToken).ConfigureAwait(false);
var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
var response = await connection.Send(_commandProducer, channel, cancellationToken).ConfigureAwait(false);
- return new ProducerChannel(response.ProducerId, response.ProducerName, connection, _compressorFactory);
+ return new ProducerChannel(response.ProducerId, response.ProducerName, connection, _compressorFactory, schemaVersion);
+ }
+
+ private async ValueTask<byte[]?> GetSchemaVersion(IConnection connection, CancellationToken cancellationToken)
+ {
+ if (_schema is null)
+ return null;
+
+ var command = new CommandGetOrCreateSchema
+ {
+ Schema = _schema,
+ Topic = _commandProducer.Topic
+ };
+
+ var response = await connection.Send(command, cancellationToken).ConfigureAwait(false);
+
+ response.Expect(BaseCommand.Type.GetOrCreateSchemaResponse);
+ if (response.GetOrCreateSchemaResponse.ShouldSerializeErrorCode())
+ response.GetOrCreateSchemaResponse.Throw();
+
+ return response.GetOrCreateSchemaResponse.SchemaVersion;
}
}
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index 81861e3..7e0b37f 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -21,17 +21,14 @@
public sealed class ProducerProcess : Process
{
private readonly IStateManager<ProducerState> _stateManager;
- private readonly IProducerChannelFactory _factory;
- private readonly Producer _producer;
+ private readonly IEstablishNewChannel _producer;
public ProducerProcess(
Guid correlationId,
IStateManager<ProducerState> stateManager,
- IProducerChannelFactory factory,
- Producer producer) : base(correlationId)
+ IEstablishNewChannel producer) : base(correlationId)
{
_stateManager = stateManager;
- _factory = factory;
_producer = producer;
}
@@ -44,7 +41,7 @@
protected override void CalculateState()
{
- if (_producer.IsFinalState())
+ if (_stateManager.IsFinalState())
return;
if (ExecutorState == ExecutorState.Faulted)
@@ -58,25 +55,12 @@
case ChannelState.ClosedByServer:
case ChannelState.Disconnected:
_stateManager.SetState(ProducerState.Disconnected);
- SetupChannel();
+ _ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
return;
case ChannelState.Connected:
_stateManager.SetState(ProducerState.Connected);
return;
}
}
-
- private async void SetupChannel()
- {
- try
- {
- var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- await _producer.SetChannel(channel).ConfigureAwait(false);
- }
- catch
- {
- // ignored
- }
- }
}
}
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 120d5e5..fc1a2e0 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -21,7 +21,6 @@
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
- using System.Threading.Tasks;
public sealed class PulsarClientBuilder : IPulsarClientBuilder
{
@@ -79,18 +78,6 @@
return this;
}
- public IPulsarClientBuilder ExceptionHandler(Action<ExceptionContext> exceptionHandler)
- {
- _exceptionHandlers.Add(new ActionExceptionHandler(exceptionHandler));
- return this;
- }
-
- public IPulsarClientBuilder ExceptionHandler(Func<ExceptionContext, ValueTask> exceptionHandler)
- {
- _exceptionHandlers.Add(new FuncExceptionHandler(exceptionHandler));
- return this;
- }
-
public IPulsarClientBuilder RetryInterval(TimeSpan interval)
{
_retryInterval = interval;
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 731e4cd..bffcb2d 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -23,13 +23,14 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class Reader : IReader
+ public sealed class Reader<TMessage> : IEstablishNewChannel, IReader<TMessage>
{
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
- private IConsumerChannel _channel;
+ private IConsumerChannel<TMessage> _channel;
private readonly IExecute _executor;
private readonly IStateChanged<ReaderState> _state;
+ private readonly IConsumerChannelFactory<TMessage> _factory;
private int _isDisposed;
public Uri ServiceUrl { get; }
@@ -40,9 +41,10 @@
Uri serviceUrl,
string topic,
IRegisterEvent eventRegister,
- IConsumerChannel initialChannel,
+ IConsumerChannel<TMessage> initialChannel,
IExecute executor,
- IStateChanged<ReaderState> state)
+ IStateChanged<ReaderState> state,
+ IConsumerChannelFactory<TMessage> factory)
{
_correlationId = correlationId;
ServiceUrl = serviceUrl;
@@ -51,9 +53,10 @@
_channel = initialChannel;
_executor = executor;
_state = state;
+ _factory = factory;
_isDisposed = 0;
- _eventRegister.Register(new ReaderCreated(_correlationId, this));
+ _eventRegister.Register(new ReaderCreated(_correlationId));
}
public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state, CancellationToken cancellationToken)
@@ -79,14 +82,14 @@
private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
=> await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- public async ValueTask<Message> Receive(CancellationToken cancellationToken)
+ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken)
{
ThrowIfDisposed();
return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<Message> ReceiveMessage(CancellationToken cancellationToken)
+ private async ValueTask<IMessage<TMessage>> ReceiveMessage(CancellationToken cancellationToken)
=> await _channel.Receive(cancellationToken).ConfigureAwait(false);
public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
@@ -110,7 +113,7 @@
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ReaderDisposed(_correlationId, this));
+ _eventRegister.Register(new ReaderDisposed(_correlationId));
await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
@@ -118,13 +121,9 @@
private async Task Seek(CommandSeek command, CancellationToken cancellationToken)
=> await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- internal async ValueTask SetChannel(IConsumerChannel channel)
+ public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
- if (_isDisposed != 0)
- {
- await channel.DisposeAsync().ConfigureAwait(false);
- return;
- }
+ var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
var oldChannel = _channel;
_channel = channel;
@@ -136,7 +135,7 @@
private void ThrowIfDisposed()
{
if (_isDisposed != 0)
- throw new ReaderDisposedException();
+ throw new ReaderDisposedException(GetType().FullName!);
}
}
}
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
index 4936cd0..f9090da 100644
--- a/src/DotPulsar/Internal/ReaderBuilder.cs
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -16,13 +16,11 @@
{
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- public sealed class ReaderBuilder : IReaderBuilder
+ public sealed class ReaderBuilder<TMessage> : IReaderBuilder<TMessage>
{
private readonly IPulsarClient _pulsarClient;
+ private readonly ISchema<TMessage> _schema;
private string? _readerName;
private uint _messagePrefetchCount;
private bool _readCompacted;
@@ -30,62 +28,51 @@
private string? _topic;
private IHandleStateChanged<ReaderStateChanged>? _stateChangedHandler;
- public ReaderBuilder(IPulsarClient pulsarClient)
+ public ReaderBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
{
_pulsarClient = pulsarClient;
- _messagePrefetchCount = ReaderOptions.DefaultMessagePrefetchCount;
- _readCompacted = ReaderOptions.DefaultReadCompacted;
+ _schema = schema;
+ _messagePrefetchCount = ReaderOptions<TMessage>.DefaultMessagePrefetchCount;
+ _readCompacted = ReaderOptions<TMessage>.DefaultReadCompacted;
}
- public IReaderBuilder MessagePrefetchCount(uint count)
+ public IReaderBuilder<TMessage> MessagePrefetchCount(uint count)
{
_messagePrefetchCount = count;
return this;
}
- public IReaderBuilder ReadCompacted(bool readCompacted)
+ public IReaderBuilder<TMessage> ReadCompacted(bool readCompacted)
{
_readCompacted = readCompacted;
return this;
}
- public IReaderBuilder ReaderName(string name)
+ public IReaderBuilder<TMessage> ReaderName(string name)
{
_readerName = name;
return this;
}
- public IReaderBuilder StartMessageId(MessageId messageId)
+ public IReaderBuilder<TMessage> StartMessageId(MessageId messageId)
{
_startMessageId = messageId;
return this;
}
- public IReaderBuilder StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler)
+ public IReaderBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler)
{
_stateChangedHandler = handler;
return this;
}
- public IReaderBuilder StateChangedHandler(Action<ReaderStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
- {
- _stateChangedHandler = new ActionStateChangedHandler<ReaderStateChanged>(handler, cancellationToken);
- return this;
- }
-
- public IReaderBuilder StateChangedHandler(Func<ReaderStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
- {
- _stateChangedHandler = new FuncStateChangedHandler<ReaderStateChanged>(handler, cancellationToken);
- return this;
- }
-
- public IReaderBuilder Topic(string topic)
+ public IReaderBuilder<TMessage> Topic(string topic)
{
_topic = topic;
return this;
}
- public IReader Create()
+ public IReader<TMessage> Create()
{
if (_startMessageId is null)
throw new ConfigurationException("StartMessageId may not be null");
@@ -93,7 +80,7 @@
if (string.IsNullOrEmpty(_topic))
throw new ConfigurationException("Topic may not be null or empty");
- var options = new ReaderOptions(_startMessageId, _topic!)
+ var options = new ReaderOptions<TMessage>(_startMessageId, _topic!, _schema)
{
MessagePrefetchCount = _messagePrefetchCount,
ReadCompacted = _readCompacted,
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index 404de7e..5dd8cd1 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -21,17 +21,14 @@
public sealed class ReaderProcess : Process
{
private readonly IStateManager<ReaderState> _stateManager;
- private readonly IConsumerChannelFactory _factory;
- private readonly Reader _reader;
+ private readonly IEstablishNewChannel _reader;
public ReaderProcess(
Guid correlationId,
IStateManager<ReaderState> stateManager,
- IConsumerChannelFactory factory,
- Reader reader) : base(correlationId)
+ IEstablishNewChannel reader) : base(correlationId)
{
_stateManager = stateManager;
- _factory = factory;
_reader = reader;
}
@@ -44,7 +41,7 @@
protected override void CalculateState()
{
- if (_reader.IsFinalState())
+ if (_stateManager.IsFinalState())
return;
if (ExecutorState == ExecutorState.Faulted)
@@ -58,7 +55,7 @@
case ChannelState.ClosedByServer:
case ChannelState.Disconnected:
_stateManager.SetState(ReaderState.Disconnected);
- SetupChannel();
+ _ = _reader.EstablishNewChannel(CancellationTokenSource.Token);
return;
case ChannelState.Connected:
_stateManager.SetState(ReaderState.Connected);
@@ -68,18 +65,5 @@
return;
}
}
-
- private async void SetupChannel()
- {
- try
- {
- var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
- await _reader.SetChannel(channel).ConfigureAwait(false);
- }
- catch
- {
- // ignored
- }
- }
}
}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index ae7bdee..55777f6 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -79,6 +79,9 @@
case BaseCommand.Type.GetLastMessageId:
cmd.GetLastMessageId.RequestId = _requestId.FetchNext();
return;
+ case BaseCommand.Type.GetOrCreateSchema:
+ cmd.GetOrCreateSchema.RequestId = _requestId.FetchNext();
+ return;
}
}
@@ -103,6 +106,8 @@
BaseCommand.Type.CloseConsumer => cmd.CloseConsumer.RequestId.ToString(),
BaseCommand.Type.GetLastMessageId => cmd.GetLastMessageId.RequestId.ToString(),
BaseCommand.Type.GetLastMessageIdResponse => cmd.GetLastMessageIdResponse.RequestId.ToString(),
+ BaseCommand.Type.GetOrCreateSchema => cmd.GetOrCreateSchema.RequestId.ToString(),
+ BaseCommand.Type.GetOrCreateSchemaResponse => cmd.GetOrCreateSchemaResponse.RequestId.ToString(),
_ => throw new ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType, "CommandType not supported as request/response type")
};
}
diff --git a/src/DotPulsar/Extensions/SendExtensions.cs b/src/DotPulsar/Internal/SendExtensions.cs
similarity index 80%
rename from src/DotPulsar/Extensions/SendExtensions.cs
rename to src/DotPulsar/Internal/SendExtensions.cs
index 41ace1e..535408b 100644
--- a/src/DotPulsar/Extensions/SendExtensions.cs
+++ b/src/DotPulsar/Internal/SendExtensions.cs
@@ -28,25 +28,25 @@
/// <summary>
/// Sends a message.
/// </summary>
- public static async ValueTask<MessageId> Send(this ISend sender, byte[] data, CancellationToken cancellationToken = default)
+ public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, byte[] data, CancellationToken cancellationToken = default)
=> await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
/// <summary>
/// Sends a message.
/// </summary>
- public static async ValueTask<MessageId> Send(this ISend sender, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+ public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
=> await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
/// <summary>
/// Sends a message with metadata.
/// </summary>
- public static async ValueTask<MessageId> Send(this ISend sender, MessageMetadata metadata, byte[] data, CancellationToken cancellationToken = default)
+ public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, MessageMetadata metadata, byte[] data, CancellationToken cancellationToken = default)
=> await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
/// <summary>
/// Sends a message with metadata.
/// </summary>
- public static async ValueTask<MessageId> Send(this ISend sender, MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+ public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
=> await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
deleted file mode 100644
index 32575d4..0000000
--- a/src/DotPulsar/Message.cs
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar
-{
- using System;
- using System.Buffers;
- using System.Collections.Generic;
-
- /// <summary>
- /// The message received by consumers and readers.
- /// </summary>
- public sealed class Message
- {
- internal Message(
- MessageId messageId,
- ReadOnlySequence<byte> data,
- string producerName,
- ulong sequenceId,
- uint redeliveryCount,
- ulong eventTime,
- ulong publishTime,
- IReadOnlyDictionary<string, string> properties,
- bool hasBase64EncodedKey,
- string? key,
- byte[]? orderingKey)
- {
- MessageId = messageId;
- Data = data;
- ProducerName = producerName;
- SequenceId = sequenceId;
- RedeliveryCount = redeliveryCount;
- EventTime = eventTime;
- PublishTime = publishTime;
- Properties = properties;
- HasBase64EncodedKey = hasBase64EncodedKey;
- Key = key;
- OrderingKey = orderingKey;
- }
-
- /// <summary>
- /// The id of the message.
- /// </summary>
- public MessageId MessageId { get; }
-
- /// <summary>
- /// The raw payload of the message.
- /// </summary>
- public ReadOnlySequence<byte> Data { get; }
-
- /// <summary>
- /// The name of the producer who produced the message.
- /// </summary>
- public string ProducerName { get; }
-
- /// <summary>
- /// The sequence id of the message.
- /// </summary>
- public ulong SequenceId { get; }
-
- /// <summary>
- /// The redelivery count (maintained by the broker) of the message.
- /// </summary>
- public uint RedeliveryCount { get; }
-
- /// <summary>
- /// Check whether the message has an event time.
- /// </summary>
- public bool HasEventTime => EventTime != 0;
-
- /// <summary>
- /// The event time of the message as unix time in milliseconds.
- /// </summary>
- public ulong EventTime { get; }
-
- /// <summary>
- /// The event time of the message as an UTC DateTime.
- /// </summary>
- public DateTime EventTimeAsDateTime => EventTimeAsDateTimeOffset.UtcDateTime;
-
- /// <summary>
- /// The event time of the message as a DateTimeOffset with an offset of 0.
- /// </summary>
- public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) EventTime);
-
- /// <summary>
- /// Check whether the key been base64 encoded.
- /// </summary>
- public bool HasBase64EncodedKey { get; }
-
- /// <summary>
- /// Check whether the message has a key.
- /// </summary>
- public bool HasKey => Key is not null;
-
- /// <summary>
- /// The key as a string.
- /// </summary>
- public string? Key { get; }
-
- /// <summary>
- /// The key as bytes.
- /// </summary>
- public byte[]? KeyBytes => Key is not null ? Convert.FromBase64String(Key) : null;
-
- /// <summary>
- /// Check whether the message has an ordering key.
- /// </summary>
- public bool HasOrderingKey => OrderingKey is not null;
-
- /// <summary>
- /// The ordering key of the message.
- /// </summary>
- public byte[]? OrderingKey { get; }
-
- /// <summary>
- /// The publish time of the message as unix time in milliseconds.
- /// </summary>
- public ulong PublishTime { get; }
-
- /// <summary>
- /// The publish time of the message as an UTC DateTime.
- /// </summary>
- public DateTime PublishTimeAsDateTime => PublishTimeAsDateTimeOffset.UtcDateTime;
-
- /// <summary>
- /// The publish time of the message as a DateTimeOffset with an offset of 0.
- /// </summary>
- public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) PublishTime);
-
- /// <summary>
- /// The properties of the message.
- /// </summary>
- public IReadOnlyDictionary<string, string> Properties { get; }
- }
-}
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index b9109e9..e8fa47a 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -148,6 +148,15 @@
}
/// <summary>
+ /// The schema version of the message.
+ /// </summary>
+ public byte[]? SchemaVersion
+ {
+ get => Metadata.SchemaVersion;
+ set => Metadata.SchemaVersion = value;
+ }
+
+ /// <summary>
/// The sequence id of the message.
/// </summary>
public ulong SequenceId
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index f1ad688..958a1f8 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -19,7 +19,7 @@
/// <summary>
/// The producer building options.
/// </summary>
- public sealed class ProducerOptions
+ public sealed class ProducerOptions<TMessage>
{
/// <summary>
/// The default compression type.
@@ -34,11 +34,12 @@
/// <summary>
/// Initializes a new instance using the specified topic.
/// </summary>
- public ProducerOptions(string topic)
+ public ProducerOptions(string topic, ISchema<TMessage> schema)
{
CompressionType = DefaultCompressionType;
InitialSequenceId = DefaultInitialSequenceId;
Topic = topic;
+ Schema = schema;
}
/// <summary>
@@ -57,6 +58,11 @@
public string? ProducerName { get; set; }
/// <summary>
+ /// Set the schema. This is required.
+ /// </summary>
+ public ISchema<TMessage> Schema { get; set; }
+
+ /// <summary>
/// Register a state changed handler. This is optional.
/// </summary>
public IHandleStateChanged<ProducerStateChanged>? StateChangedHandler { get; set; }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 3ce1940..678ac83 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -60,7 +60,7 @@
/// <summary>
/// Create a producer.
/// </summary>
- public IProducer CreateProducer(ProducerOptions options)
+ public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
{
ThrowIfDisposed();
@@ -76,12 +76,18 @@
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, compressorFactory);
+ var topic = options.Topic;
+ var producerName = options.ProducerName;
+ var schema = options.Schema;
+ var initialSequenceId = options.InitialSequenceId;
+
+ var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, compressorFactory);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
- var producer = new Producer(correlationId, ServiceUrl, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
+ var initialChannel = new NotReadyChannel<TMessage>();
+ var producer = new Producer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
- var process = new ProducerProcess(correlationId, stateManager, factory, producer);
+ var process = new ProducerProcess(correlationId, stateManager, producer);
_processManager.Add(process);
process.Start();
return producer;
@@ -90,7 +96,7 @@
/// <summary>
/// Create a consumer.
/// </summary>
- public IConsumer CreateConsumer(ConsumerOptions options)
+ public IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options)
{
ThrowIfDisposed();
@@ -107,14 +113,16 @@
Type = (CommandSubscribe.SubType) options.SubscriptionType
};
var messagePrefetchCount = options.MessagePrefetchCount;
- var batchHandler = new BatchHandler(true);
+ var messageFactory = new MessageFactory<TMessage>(options.Schema);
+ var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
var decompressorFactories = CompressionFactories.DecompressorFactories();
- var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, subscribe, messagePrefetchCount, batchHandler, decompressorFactories);
+ var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
- var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+ var initialChannel = new NotReadyChannel<TMessage>();
+ var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
- var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
+ var process = new ConsumerProcess(correlationId, stateManager, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
return consumer;
@@ -123,7 +131,7 @@
/// <summary>
/// Create a reader.
/// </summary>
- public IReader CreateReader(ReaderOptions options)
+ public IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options)
{
ThrowIfDisposed();
@@ -139,14 +147,16 @@
Topic = options.Topic
};
var messagePrefetchCount = options.MessagePrefetchCount;
- var batchHandler = new BatchHandler(false);
+ var messageFactory = new MessageFactory<TMessage>(options.Schema);
+ var batchHandler = new BatchHandler<TMessage>(false, messageFactory);
var decompressorFactories = CompressionFactories.DecompressorFactories();
- var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, subscribe, messagePrefetchCount, batchHandler, decompressorFactories);
+ var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
- var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+ var initialChannel = new NotReadyChannel<TMessage>();
+ var reader = new Reader<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
- var process = new ReaderProcess(correlationId, stateManager, factory, reader);
+ var process = new ReaderProcess(correlationId, stateManager, reader);
_processManager.Add(process);
process.Start();
return reader;
diff --git a/src/DotPulsar/ReaderOptions.cs b/src/DotPulsar/ReaderOptions.cs
index 42e4f3c..4a1e596 100644
--- a/src/DotPulsar/ReaderOptions.cs
+++ b/src/DotPulsar/ReaderOptions.cs
@@ -19,7 +19,7 @@
/// <summary>
/// The reader building options.
/// </summary>
- public sealed class ReaderOptions
+ public sealed class ReaderOptions<TMessage>
{
/// <summary>
/// The default message prefetch count.
@@ -34,12 +34,13 @@
/// <summary>
/// Initializes a new instance using the specified startMessageId and topic.
/// </summary>
- public ReaderOptions(MessageId startMessageId, string topic)
+ public ReaderOptions(MessageId startMessageId, string topic, ISchema<TMessage> schema)
{
MessagePrefetchCount = DefaultMessagePrefetchCount;
ReadCompacted = DefaultReadCompacted;
StartMessageId = startMessageId;
Topic = topic;
+ Schema = schema;
}
/// <summary>
@@ -58,6 +59,11 @@
public string? ReaderName { get; set; }
/// <summary>
+ /// Set the schema. This is required.
+ /// </summary>
+ public ISchema<TMessage> Schema { get; set; }
+
+ /// <summary>
/// The initial reader position is set to the specified message id. This is required.
/// </summary>
public MessageId StartMessageId { get; set; }
diff --git a/src/DotPulsar/Schema.cs b/src/DotPulsar/Schema.cs
new file mode 100644
index 0000000..6aff024
--- /dev/null
+++ b/src/DotPulsar/Schema.cs
@@ -0,0 +1,94 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+ using DotPulsar.Schemas;
+
+ /// <summary>
+ /// Message schema definitions.
+ /// </summary>
+ public static class Schema
+ {
+ static Schema()
+ {
+ Bytes = new ByteArraySchema();
+ String = StringSchema.UTF8;
+ Boolean = new BooleanSchema();
+ Int8 = new ByteSchema();
+ Int16 = new ShortSchema();
+ Int32 = new IntegerSchema();
+ Int64 = new LongSchema();
+ Float = new FloatSchema();
+ TimeStamp = TimestampSchema.Timestamp;
+ Date = TimestampSchema.Date;
+ Time = new TimeSchema();
+ }
+
+ /// <summary>
+ /// Raw bytes schema using byte[].
+ /// </summary>
+ public static ByteArraySchema Bytes { get; }
+
+ /// <summary>
+ /// UTF-8 schema.
+ /// </summary>
+ public static StringSchema String { get; }
+
+ /// <summary>
+ /// Boolean schema.
+ /// </summary>
+ public static BooleanSchema Boolean { get; }
+
+ /// <summary>
+ /// Byte schema.
+ /// </summary>
+ public static ByteSchema Int8 { get; }
+
+ /// <summary>
+ /// Short schema.
+ /// </summary>
+ public static ShortSchema Int16 { get; }
+
+ /// <summary>
+ /// Integer schema.
+ /// </summary>
+ public static IntegerSchema Int32 { get; }
+
+ /// <summary>
+ /// Long schema.
+ /// </summary>
+ public static LongSchema Int64 { get; }
+
+ /// <summary>
+ /// Float schema.
+ /// </summary>
+ public static FloatSchema Float { get; }
+
+ /// <summary>
+ /// Timestamp schema using DateTime.
+ /// </summary>
+ public static TimestampSchema TimeStamp { get; }
+
+ /// <summary>
+ /// Date schema using DateTime.
+ /// </summary>
+ public static TimestampSchema Date { get; }
+
+ /// <summary>
+ /// Time schema using TimeSpan.
+ /// </summary>
+ public static TimeSchema Time { get; }
+ }
+}
diff --git a/src/DotPulsar/SchemaInfo.cs b/src/DotPulsar/SchemaInfo.cs
new file mode 100644
index 0000000..8e0a375
--- /dev/null
+++ b/src/DotPulsar/SchemaInfo.cs
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+ using System.Collections.Generic;
+ using System.Linq;
+
+ /// <summary>
+ /// Information about the schema.
+ /// </summary>
+ public sealed class SchemaInfo
+ {
+ internal SchemaInfo(Internal.PulsarApi.Schema schema)
+ => PulsarSchema = schema;
+
+ public SchemaInfo(string name, byte[] data, SchemaType type, IReadOnlyDictionary<string, string> properties)
+ {
+ PulsarSchema = new Internal.PulsarApi.Schema
+ {
+ Name = name,
+ SchemaData = data,
+ Type = (Internal.PulsarApi.Schema.SchemaType) type,
+ };
+
+ foreach (var property in properties)
+ {
+ var keyValue = new Internal.PulsarApi.KeyValue
+ {
+ Key = property.Key,
+ Value = property.Value
+ };
+
+ PulsarSchema.Properties.Add(keyValue);
+ }
+ }
+
+ internal Internal.PulsarApi.Schema PulsarSchema { get; }
+
+ /// <summary>
+ /// The name of the schema.
+ /// </summary>
+ public string Name => PulsarSchema.Name;
+
+ /// <summary>
+ /// The data of the schema.
+ /// </summary>
+ public byte[] Data => PulsarSchema.SchemaData;
+
+ /// <summary>
+ /// The type of the schema.
+ /// </summary>
+ public SchemaType Type => (SchemaType) PulsarSchema.Type;
+
+ /// <summary>
+ /// The properties of the schema.
+ /// </summary>
+ public IReadOnlyDictionary<string, string> Properties => PulsarSchema.Properties.ToDictionary(p => p.Key, p => p.Value);
+ }
+}
diff --git a/src/DotPulsar/SchemaType.cs b/src/DotPulsar/SchemaType.cs
new file mode 100644
index 0000000..2c9f57a
--- /dev/null
+++ b/src/DotPulsar/SchemaType.cs
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+ /// <summary>
+ /// The supported schema types for messages.
+ /// </summary>
+ public enum SchemaType : byte
+ {
+ /// <summary>
+ /// No schema.
+ /// </summary>
+ None = 0,
+
+ /// <summary>
+ /// UTF-8 schema.
+ /// </summary>
+ String = 1,
+
+ /// <summary>
+ /// JSON schema.
+ /// </summary>
+ Json = 2,
+
+ /// <summary>
+ /// Protobuf schema.
+ /// </summary>
+ Protobuf = 3,
+
+ /// <summary>
+ /// Avro schema.
+ /// </summary>
+ Avro = 4,
+
+ /// <summary>
+ /// Boolean schema.
+ /// </summary>
+ Boolean = 5,
+
+ /// <summary>
+ /// 8-byte integer schema.
+ /// </summary>
+ Int8 = 6,
+
+ /// <summary>
+ /// 16-byte integer schema.
+ /// </summary>
+ Int16 = 7,
+
+ /// <summary>
+ ///32-byte integer schema.
+ /// </summary>
+ Int32 = 8,
+
+ /// <summary>
+ /// 64-byte integer schema.
+ /// </summary>
+ Int64 = 9,
+
+ /// <summary>
+ /// Float schema.
+ /// </summary>
+ Float = 10,
+
+ /// <summary>
+ /// Double schema.
+ /// </summary>
+ Double = 11,
+
+ /// <summary>
+ /// Date schema.
+ /// </summary>
+ Date = 12,
+
+ /// <summary>
+ /// Time schema.
+ /// </summary>
+ Time = 13,
+
+ /// <summary>
+ /// Timestamp schema.
+ /// </summary>
+ Timestamp = 14,
+
+ /// <summary>
+ /// KeyValue schema.
+ /// </summary>
+ KeyValue = 15,
+
+ /// <summary>
+ /// Instant schema.
+ /// </summary>
+ Instant = 16,
+
+ /// <summary>
+ /// Local date schema.
+ /// </summary>
+ LocalDate = 17,
+
+ /// <summary>
+ /// Local time schema.
+ /// </summary>
+ LocalTime = 18,
+
+ /// <summary>
+ /// Local data time schema.
+ /// </summary>
+ LocalDateTime = 19,
+
+ /// <summary>
+ /// Protobuf native schema.
+ /// </summary>
+ ProtobufNative = 20
+ }
+}
diff --git a/src/DotPulsar/Schemas/BooleanSchema.cs b/src/DotPulsar/Schemas/BooleanSchema.cs
new file mode 100644
index 0000000..82a7844
--- /dev/null
+++ b/src/DotPulsar/Schemas/BooleanSchema.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+
+ /// <summary>
+ /// Schema definition for Boolean messages.
+ /// </summary>
+ public sealed class BooleanSchema : ISchema<bool>
+ {
+ private readonly static ReadOnlySequence<byte> _true;
+ private readonly static ReadOnlySequence<byte> _false;
+
+ static BooleanSchema()
+ {
+ _true = new ReadOnlySequence<byte>(new byte[] { 1 });
+ _false = new ReadOnlySequence<byte>(new byte[] { 0 });
+ }
+
+ public BooleanSchema()
+ => SchemaInfo = new SchemaInfo("Boolean", Array.Empty<byte>(), SchemaType.Boolean, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public bool Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 1)
+ throw new SchemaSerializationException($"{nameof(BooleanSchema)} expected to decode 1 byte, but received {bytes} bytes");
+
+ return bytes.First.Span[0] != 0;
+ }
+
+ public ReadOnlySequence<byte> Encode(bool message)
+ => message ? _true : _false;
+ }
+}
diff --git a/src/DotPulsar/Schemas/ByteArraySchema.cs b/src/DotPulsar/Schemas/ByteArraySchema.cs
new file mode 100644
index 0000000..979562c
--- /dev/null
+++ b/src/DotPulsar/Schemas/ByteArraySchema.cs
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+
+ /// <summary>
+ /// Schema definition for raw messages using byte[].
+ /// </summary>
+ public sealed class ByteArraySchema : ISchema<byte[]>
+ {
+ public ByteArraySchema()
+ => SchemaInfo = new SchemaInfo("Bytes", Array.Empty<byte>(), SchemaType.None, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public byte[] Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ => bytes.ToArray();
+
+ public ReadOnlySequence<byte> Encode(byte[] message)
+ => new(message);
+ }
+}
diff --git a/src/DotPulsar/Schemas/ByteSchema.cs b/src/DotPulsar/Schemas/ByteSchema.cs
new file mode 100644
index 0000000..14caebc
--- /dev/null
+++ b/src/DotPulsar/Schemas/ByteSchema.cs
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+
+ /// <summary>
+ /// Schema definition for Byte (int8) messages.
+ /// </summary>
+ public sealed class ByteSchema : ISchema<byte>
+ {
+ public ByteSchema()
+ => SchemaInfo = new SchemaInfo("INT8", Array.Empty<byte>(), SchemaType.Int8, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public byte Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 1)
+ throw new SchemaSerializationException($"{nameof(ByteSchema)} expected to decode 1 byte, but received {bytes} bytes");
+
+ return bytes.First.Span[0];
+ }
+
+ public ReadOnlySequence<byte> Encode(byte message)
+ => new(new[] { message });
+ }
+}
diff --git a/src/DotPulsar/Schemas/ByteSequenceSchema.cs b/src/DotPulsar/Schemas/ByteSequenceSchema.cs
new file mode 100644
index 0000000..d8c8282
--- /dev/null
+++ b/src/DotPulsar/Schemas/ByteSequenceSchema.cs
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+
+ /// <summary>
+ /// Schema definition for raw messages using ReadOnlySequence of bytes.
+ /// </summary>
+ public sealed class ByteSequenceSchema : ISchema<ReadOnlySequence<byte>>
+ {
+ public ByteSequenceSchema()
+ => SchemaInfo = new SchemaInfo("Bytes", Array.Empty<byte>(), SchemaType.None, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public ReadOnlySequence<byte> Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ => bytes;
+
+ public ReadOnlySequence<byte> Encode(ReadOnlySequence<byte> message)
+ => message;
+ }
+}
diff --git a/src/DotPulsar/Schemas/DoubleSchema.cs b/src/DotPulsar/Schemas/DoubleSchema.cs
new file mode 100644
index 0000000..0f7552e
--- /dev/null
+++ b/src/DotPulsar/Schemas/DoubleSchema.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+ using System.Linq;
+ using System.Runtime.InteropServices;
+
+ /// <summary>
+ /// Schema definition for Double messages.
+ /// </summary>
+ public sealed class DoubleSchema : ISchema<double>
+ {
+ public DoubleSchema()
+ => SchemaInfo = new SchemaInfo("Double", Array.Empty<byte>(), SchemaType.Double, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public double Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 8)
+ throw new SchemaSerializationException($"{nameof(DoubleSchema)} expected to decode 8 bytes, but received {bytes} bytes");
+
+ var array = bytes.ToArray();
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return MemoryMarshal.Read<double>(array);
+ }
+
+ public ReadOnlySequence<byte> Encode(double message)
+ {
+ var array = BitConverter.GetBytes(message);
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return new(array);
+ }
+ }
+}
diff --git a/src/DotPulsar/Schemas/FloatSchema .cs b/src/DotPulsar/Schemas/FloatSchema .cs
new file mode 100644
index 0000000..3040669
--- /dev/null
+++ b/src/DotPulsar/Schemas/FloatSchema .cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+ using System.Linq;
+ using System.Runtime.InteropServices;
+
+ /// <summary>
+ /// Schema definition for Float messages.
+ /// </summary>
+ public sealed class FloatSchema : ISchema<float>
+ {
+ public FloatSchema()
+ => SchemaInfo = new SchemaInfo("Float", Array.Empty<byte>(), SchemaType.Float, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public float Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 4)
+ throw new SchemaSerializationException($"{nameof(FloatSchema)} expected to decode 4 bytes, but received {bytes} bytes");
+
+ var array = bytes.ToArray();
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return MemoryMarshal.Read<float>(array);
+ }
+
+ public ReadOnlySequence<byte> Encode(float message)
+ {
+ var array = BitConverter.GetBytes(message);
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return new(array);
+ }
+ }
+}
diff --git a/src/DotPulsar/Schemas/IntegerSchema.cs b/src/DotPulsar/Schemas/IntegerSchema.cs
new file mode 100644
index 0000000..69d079b
--- /dev/null
+++ b/src/DotPulsar/Schemas/IntegerSchema.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+ using System.Linq;
+ using System.Runtime.InteropServices;
+
+ /// <summary>
+ /// Schema definition for Integer (int32) messages.
+ /// </summary>
+ public sealed class IntegerSchema : ISchema<int>
+ {
+ public IntegerSchema()
+ => SchemaInfo = new SchemaInfo("INT32", Array.Empty<byte>(), SchemaType.Int32, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public int Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 4)
+ throw new SchemaSerializationException($"{nameof(IntegerSchema)} expected to decode 4 bytes, but received {bytes} bytes");
+
+ var array = bytes.ToArray();
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return MemoryMarshal.Read<int>(array);
+ }
+
+ public ReadOnlySequence<byte> Encode(int message)
+ {
+ var array = BitConverter.GetBytes(message);
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return new(array);
+ }
+ }
+}
diff --git a/src/DotPulsar/Schemas/LongSchema.cs b/src/DotPulsar/Schemas/LongSchema.cs
new file mode 100644
index 0000000..be40c4f
--- /dev/null
+++ b/src/DotPulsar/Schemas/LongSchema.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+ using System.Linq;
+ using System.Runtime.InteropServices;
+
+ /// <summary>
+ /// Schema definition for Long (int64) messages.
+ /// </summary>
+ public sealed class LongSchema : ISchema<long>
+ {
+ public LongSchema()
+ => SchemaInfo = new SchemaInfo("INT64", Array.Empty<byte>(), SchemaType.Int64, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public long Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 8)
+ throw new SchemaSerializationException($"{nameof(LongSchema)} expected to decode 8 bytes, but received {bytes} bytes");
+
+ var array = bytes.ToArray();
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return MemoryMarshal.Read<long>(array);
+ }
+
+ public ReadOnlySequence<byte> Encode(long message)
+ {
+ var array = BitConverter.GetBytes(message);
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return new(array);
+ }
+ }
+}
diff --git a/src/DotPulsar/Schemas/ShortSchema.cs b/src/DotPulsar/Schemas/ShortSchema.cs
new file mode 100644
index 0000000..3c84100
--- /dev/null
+++ b/src/DotPulsar/Schemas/ShortSchema.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+ using System.Linq;
+ using System.Runtime.InteropServices;
+
+ /// <summary>
+ /// Schema definition for Short (int16) messages.
+ /// </summary>
+ public sealed class ShortSchema : ISchema<short>
+ {
+ public ShortSchema()
+ => SchemaInfo = new SchemaInfo("INT16", Array.Empty<byte>(), SchemaType.Int16, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public short Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 2)
+ throw new SchemaSerializationException($"{nameof(ShortSchema)} expected to decode 2 bytes, but received {bytes} bytes");
+
+ var array = bytes.ToArray();
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return MemoryMarshal.Read<short>(array);
+ }
+
+ public ReadOnlySequence<byte> Encode(short message)
+ {
+ var array = BitConverter.GetBytes(message);
+
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(array);
+
+ return new(array);
+ }
+ }
+}
diff --git a/src/DotPulsar/Schemas/StringSchema.cs b/src/DotPulsar/Schemas/StringSchema.cs
new file mode 100644
index 0000000..50a688b
--- /dev/null
+++ b/src/DotPulsar/Schemas/StringSchema.cs
@@ -0,0 +1,110 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Generic;
+ using System.Text;
+
+ /// <summary>
+ /// Schema definition for UTF-8 (default), UTF-16 (unicode) or US-ASCII encoded strings.
+ /// </summary>
+ public sealed class StringSchema : ISchema<string>
+ {
+ private const string _charSetKey = "__charset";
+ private const string _utf8 = "UTF-8";
+ private const string _unicode = "UTF-16";
+ private const string _ascii = "US-ASCII";
+
+ static StringSchema()
+ {
+ UTF8 = new StringSchema(Encoding.UTF8);
+ Unicode = new StringSchema(Encoding.Unicode);
+ ASCII = new StringSchema(Encoding.ASCII);
+ }
+
+ /// <summary>
+ /// Schema definition for UTF-8 encoded strings.
+ /// </summary>
+ public static StringSchema UTF8 { get; }
+
+ /// <summary>
+ /// Schema definition for UTF-16 encoded strings.
+ /// </summary>
+ public static StringSchema Unicode { get; }
+
+ /// <summary>
+ /// Schema definition for US-ASCII encoded strings.
+ /// </summary>
+ public static StringSchema ASCII { get; }
+
+ private static string GetCharSet(string encodingName)
+ {
+ return encodingName switch
+ {
+ "Unicode (UTF-8)" => _utf8,
+ "Unicode" => _unicode,
+ "US-ASCII" => _ascii,
+ _ => throw new Exception($"Encoding '{encodingName}' is not supported!")
+ };
+ }
+
+ private static StringSchema GetSchema(string charSet)
+ {
+ return charSet switch
+ {
+ _utf8 => UTF8,
+ _unicode => Unicode,
+ _ascii => ASCII,
+ _ => throw new Exception($"CharSet '{charSet}' is not supported!")
+ };
+ }
+
+ public static StringSchema From(SchemaInfo schemaInfo)
+ {
+ if (schemaInfo.Type != SchemaType.String)
+ throw new Exception("Not a string schema!");
+
+ if (schemaInfo.Properties.TryGetValue(_charSetKey, out var charset))
+ return GetSchema(charset);
+ else
+ return UTF8;
+ }
+
+ private readonly Encoding _encoding;
+
+ public StringSchema(Encoding encoding)
+ {
+ _encoding = encoding;
+
+ var properties = new Dictionary<string, string>
+ {
+ { _charSetKey, GetCharSet(encoding.EncodingName) }
+ };
+
+ SchemaInfo = new SchemaInfo("String", Array.Empty<byte>(), SchemaType.String, properties);
+ }
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public string Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion)
+ => _encoding.GetString(bytes.ToArray());
+
+ public ReadOnlySequence<byte> Encode(string message)
+ => new(_encoding.GetBytes(message));
+ }
+}
diff --git a/src/DotPulsar/Schemas/TimeSchema.cs b/src/DotPulsar/Schemas/TimeSchema.cs
new file mode 100644
index 0000000..67d6775
--- /dev/null
+++ b/src/DotPulsar/Schemas/TimeSchema.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+
+ /// <summary>
+ /// Schema definition for Time (TimeSpan) messages.
+ /// </summary>
+ public sealed class TimeSchema : ISchema<TimeSpan>
+ {
+ public TimeSchema()
+ => SchemaInfo = new SchemaInfo("Time", Array.Empty<byte>(), SchemaType.Time, ImmutableDictionary<string, string>.Empty);
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public TimeSpan Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 8)
+ throw new SchemaSerializationException($"{nameof(TimestampSchema)} expected to decode 8 bytes, but received {bytes} bytes");
+
+ var milliseconds = Schema.Int64.Decode(bytes);
+ return TimeSpan.FromMilliseconds(milliseconds);
+ }
+
+ public ReadOnlySequence<byte> Encode(TimeSpan message)
+ {
+ var milliseconds = (long) message.TotalMilliseconds;
+ return Schema.Int64.Encode(milliseconds);
+ }
+ }
+}
diff --git a/src/DotPulsar/Schemas/TimestampSchema.cs b/src/DotPulsar/Schemas/TimestampSchema.cs
new file mode 100644
index 0000000..18bee0a
--- /dev/null
+++ b/src/DotPulsar/Schemas/TimestampSchema.cs
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using System;
+ using System.Buffers;
+ using System.Collections.Immutable;
+
+ /// <summary>
+ /// Schema definition for Timestamp (DateTime) messages.
+ /// </summary>
+ public sealed class TimestampSchema : ISchema<DateTime>
+ {
+ static TimestampSchema()
+ {
+ Timestamp = new TimestampSchema(new SchemaInfo("Timestamp", Array.Empty<byte>(), SchemaType.Timestamp, ImmutableDictionary<string, string>.Empty));
+ Date = new TimestampSchema(new SchemaInfo("Date", Array.Empty<byte>(), SchemaType.Date, ImmutableDictionary<string, string>.Empty));
+ }
+
+ public static TimestampSchema Timestamp { get; }
+ public static TimestampSchema Date { get; }
+
+ public TimestampSchema(SchemaInfo schemaInfo)
+ => SchemaInfo = schemaInfo;
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public DateTime Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ if (bytes.Length != 8)
+ throw new SchemaSerializationException($"{nameof(TimestampSchema)} expected to decode 8 bytes, but received {bytes} bytes");
+
+ var milliseconds = Schema.Int64.Decode(bytes);
+
+ if (milliseconds < -62135596800000)
+ return DateTime.MinValue;
+
+ if (milliseconds > 253402300799999)
+ return DateTime.MaxValue;
+
+ return DateTimeOffset.FromUnixTimeMilliseconds(milliseconds).DateTime;
+ }
+
+ public ReadOnlySequence<byte> Encode(DateTime message)
+ {
+ var milliseconds = new DateTimeOffset(message).ToUnixTimeMilliseconds();
+ return Schema.Int64.Encode(milliseconds);
+ }
+ }
+}
diff --git a/tests/DotPulsar.StressTests/ConnectionTests.cs b/tests/DotPulsar.StressTests/ConnectionTests.cs
index 3f350b4..da6e663 100644
--- a/tests/DotPulsar.StressTests/ConnectionTests.cs
+++ b/tests/DotPulsar.StressTests/ConnectionTests.cs
@@ -48,7 +48,7 @@
await using var client = builder.Build();
- await using var producer = client.NewProducer()
+ await using var producer = client.NewProducer(Schema.Bytes)
.ProducerName($"producer-{testRunId}")
.Topic(topic)
.Create();
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs b/tests/DotPulsar.StressTests/ConsumerTests.cs
index 2bc3421..df51edb 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -15,7 +15,7 @@
namespace DotPulsar.StressTests
{
using DotPulsar.Abstractions;
- using Extensions;
+ using DotPulsar.Extensions;
using Fixtures;
using FluentAssertions;
using System;
@@ -48,14 +48,14 @@
.ServiceUrl(new Uri("pulsar://localhost:54545"))
.Build();
- await using var consumer = client.NewConsumer()
+ await using var consumer = client.NewConsumer(Schema.Bytes)
.ConsumerName($"consumer-{testRunId}")
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName($"subscription-{testRunId}")
.Topic(topic)
.Create();
- await using var producer = client.NewProducer()
+ await using var producer = client.NewProducer(Schema.Bytes)
.ProducerName($"producer-{testRunId}")
.Topic(topic)
.Create();
@@ -70,7 +70,7 @@
consumed.Should().BeEquivalentTo(produced);
}
- private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer producer, int numberOfMessages, CancellationToken ct)
+ private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer<byte[]> producer, int numberOfMessages, CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
@@ -83,7 +83,7 @@
return messageIds;
}
- private static async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer consumer, int numberOfMessages, CancellationToken ct)
+ private static async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer<byte[]> consumer, int numberOfMessages, CancellationToken ct)
{
var messageIds = new List<MessageId>(numberOfMessages);
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 1550a89..4cdb2b0 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -6,13 +6,13 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="coverlet.collector" Version="3.0.2">
+ <PackageReference Include="coverlet.collector" Version="3.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index d460de3..cef7485 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-#pragma warning disable 8601
+#pragma warning disable 8601, 8618
namespace DotPulsar.StressTests
{
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 8d40129..9ffdd6c 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -7,13 +7,13 @@
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.10.3" />
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="coverlet.collector" Version="3.0.2">
+ <PackageReference Include="coverlet.collector" Version="3.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
diff --git a/tests/docker-compose-standalone-tests.yml b/tests/docker-compose-standalone-tests.yml
index 5dde15b..b9b3cd9 100644
--- a/tests/docker-compose-standalone-tests.yml
+++ b/tests/docker-compose-standalone-tests.yml
@@ -4,7 +4,7 @@
pulsar:
container_name: pulsar-stresstests
- image: 'apachepulsar/pulsar:2.6.1'
+ image: 'apachepulsar/pulsar:2.7.0'
ports:
- '54546:8080'
- '54545:6650'