Updating NuGet packages. (Re)Adding tracing. Major refactorings to support schemas.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 1570efe..60d3eb1 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -18,8 +18,6 @@
     using DotPulsar.Abstractions;
     using DotPulsar.Extensions;
     using System;
-    using System.Buffers;
-    using System.Text;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -39,7 +37,7 @@
 
             await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
 
-            await using var consumer = client.NewConsumer()
+            await using var consumer = client.NewConsumer(Schema.String)
                 .StateChangedHandler(Monitor)
                 .SubscriptionName("MySubscription")
                 .Topic(myTopic)
@@ -50,14 +48,13 @@
             await ConsumeMessages(consumer, cts.Token);
         }
 
-        private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
+        private static async Task ConsumeMessages(IConsumer<string> consumer, CancellationToken cancellationToken)
         {
             try
             {
                 await foreach (var message in consumer.Messages(cancellationToken))
                 {
-                    var data = Encoding.UTF8.GetString(message.Data.ToArray());
-                    Console.WriteLine("Received: " + data);
+                    Console.WriteLine("Received: " + message.Value);
                     await consumer.Acknowledge(message, cancellationToken);
                 }
             }
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index a69bc38..5ea9a51 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -18,7 +18,6 @@
     using DotPulsar.Abstractions;
     using DotPulsar.Extensions;
     using System;
-    using System.Text;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -38,7 +37,7 @@
 
             await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
 
-            await using var producer = client.NewProducer()
+            await using var producer = client.NewProducer(Schema.String)
                 .StateChangedHandler(Monitor)
                 .Topic(myTopic)
                 .Create();
@@ -48,7 +47,7 @@
             await ProduceMessages(producer, cts.Token);
         }
 
-        private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
+        private static async Task ProduceMessages(IProducer<string> producer, CancellationToken cancellationToken)
         {
             var delay = TimeSpan.FromSeconds(5);
 
@@ -57,8 +56,7 @@
                 while (!cancellationToken.IsCancellationRequested)
                 {
                     var data = DateTime.UtcNow.ToLongTimeString();
-                    var bytes = Encoding.UTF8.GetBytes(data);
-                    _ = await producer.Send(bytes, cancellationToken);
+                    _ = await producer.Send(data, cancellationToken);
                     Console.WriteLine("Sent: " + data);
                     await Task.Delay(delay, cancellationToken);
                 }
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 8a2f0fe..475fccd 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -18,8 +18,6 @@
     using DotPulsar.Abstractions;
     using DotPulsar.Extensions;
     using System;
-    using System.Buffers;
-    using System.Text;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -39,7 +37,7 @@
 
             await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
 
-            await using var reader = client.NewReader()
+            await using var reader = client.NewReader(Schema.String)
                 .StartMessageId(MessageId.Earliest)
                 .StateChangedHandler(Monitor)
                 .Topic(myTopic)
@@ -50,14 +48,13 @@
             await ReadMessages(reader, cts.Token);
         }
 
-        private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
+        private static async Task ReadMessages(IReader<string> reader, CancellationToken cancellationToken)
         {
             try
             {
                 await foreach (var message in reader.Messages(cancellationToken))
                 {
-                    var data = Encoding.UTF8.GetString(message.Data.ToArray());
-                    Console.WriteLine("Received: " + data);
+                    Console.WriteLine("Received: " + message.Value);
                 }
             }
             catch (OperationCanceledException) { }
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 976f54b..6996825 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,7 +22,7 @@
     /// <summary>
     /// A consumer abstraction.
     /// </summary>
-    public interface IConsumer : IGetLastMessageId, IReceive, ISeek, IState<ConsumerState>, IAsyncDisposable
+    public interface IConsumer : IGetLastMessageId, ISeek, IState<ConsumerState>, IAsyncDisposable
     {
         /// <summary>
         /// Acknowledge the consumption of a single message using the MessageId.
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index 4de94f1..8134eca 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -14,73 +14,59 @@
 
 namespace DotPulsar.Abstractions
 {
-    using System;
-    using System.Threading;
-    using System.Threading.Tasks;
-
     /// <summary>
     /// A consumer building abstraction.
     /// </summary>
-    public interface IConsumerBuilder
+    public interface IConsumerBuilder<TMessage>
     {
         /// <summary>
         /// Set the consumer name. This is optional.
         /// </summary>
-        IConsumerBuilder ConsumerName(string name);
+        IConsumerBuilder<TMessage> ConsumerName(string name);
 
         /// <summary>
         /// Set initial position for the subscription. The default is 'Latest'.
         /// </summary>
-        IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition);
+        IConsumerBuilder<TMessage> InitialPosition(SubscriptionInitialPosition initialPosition);
 
         /// <summary>
         /// Number of messages that will be prefetched. The default is 1000.
         /// </summary>
-        IConsumerBuilder MessagePrefetchCount(uint count);
+        IConsumerBuilder<TMessage> MessagePrefetchCount(uint count);
 
         /// <summary>
         /// Set the priority level for the shared subscription consumer. The default is 0.
         /// </summary>
-        IConsumerBuilder PriorityLevel(int priorityLevel);
+        IConsumerBuilder<TMessage> PriorityLevel(int priorityLevel);
 
         /// <summary>
         /// Whether to read from the compacted topic. The default is 'false'.
         /// </summary>
-        IConsumerBuilder ReadCompacted(bool readCompacted);
+        IConsumerBuilder<TMessage> ReadCompacted(bool readCompacted);
 
         /// <summary>
         /// Register a state changed handler.
         /// </summary>
-        IConsumerBuilder StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler);
-
-        /// <summary>
-        /// Register a state changed handler.
-        /// </summary>
-        IConsumerBuilder StateChangedHandler(Action<ConsumerStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Register a state changed handler.
-        /// </summary>
-        IConsumerBuilder StateChangedHandler(Func<ConsumerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+        IConsumerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler);
 
         /// <summary>
         /// Set the subscription name for this consumer. This is required.
         /// </summary>
-        IConsumerBuilder SubscriptionName(string name);
+        IConsumerBuilder<TMessage> SubscriptionName(string name);
 
         /// <summary>
         /// Set the subscription type for this consumer. The default is 'Exclusive'.
         /// </summary>
-        IConsumerBuilder SubscriptionType(SubscriptionType type);
+        IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type);
 
         /// <summary>
         /// Set the topic for this consumer. This is required.
         /// </summary>
-        IConsumerBuilder Topic(string topic);
+        IConsumerBuilder<TMessage> Topic(string topic);
 
         /// <summary>
         /// Create the consumer.
         /// </summary>
-        IConsumer Create();
+        IConsumer<TMessage> Create();
     }
 }
diff --git a/src/DotPulsar/Abstractions/IConsumerOfT.cs b/src/DotPulsar/Abstractions/IConsumerOfT.cs
new file mode 100644
index 0000000..0affa98
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IConsumerOfT.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A generic consumer abstraction.
+    /// </summary>
+    public interface IConsumer<TMessage> : IConsumer, IReceive<IMessage<TMessage>> { }
+}
diff --git a/src/DotPulsar/Abstractions/IMessage.cs b/src/DotPulsar/Abstractions/IMessage.cs
new file mode 100644
index 0000000..c10204c
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IMessage.cs
@@ -0,0 +1,126 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+
+    /// <summary>
+    /// A message abstraction.
+    /// </summary>
+    public interface IMessage
+    {
+        /// <summary>
+        /// The id of the message.
+        /// </summary>
+        MessageId MessageId { get; }
+
+        /// <summary>
+        /// The raw payload of the message.
+        /// </summary>
+        ReadOnlySequence<byte> Data { get; }
+
+        /// <summary>
+        /// The name of the producer who produced the message.
+        /// </summary>
+        string ProducerName { get; }
+
+        /// <summary>
+        /// The schema version of the message.
+        /// </summary>
+        byte[]? SchemaVersion { get; }
+
+        /// <summary>
+        /// The sequence id of the message.
+        /// </summary>
+        ulong SequenceId { get; }
+
+        /// <summary>
+        /// The redelivery count (maintained by the broker) of the message.
+        /// </summary>
+        uint RedeliveryCount { get; }
+
+        /// <summary>
+        /// Check whether the message has an event time.
+        /// </summary>
+        bool HasEventTime { get; }
+
+        /// <summary>
+        /// The event time of the message as unix time in milliseconds.
+        /// </summary>
+        ulong EventTime { get; }
+
+        /// <summary>
+        /// The event time of the message as an UTC DateTime.
+        /// </summary>
+        public DateTime EventTimeAsDateTime { get; }
+
+        /// <summary>
+        /// The event time of the message as a DateTimeOffset with an offset of 0.
+        /// </summary>
+        public DateTimeOffset EventTimeAsDateTimeOffset { get; }
+
+        /// <summary>
+        /// Check whether the key been base64 encoded.
+        /// </summary>
+        bool HasBase64EncodedKey { get; }
+
+        /// <summary>
+        /// Check whether the message has a key.
+        /// </summary>
+        bool HasKey { get; }
+
+        /// <summary>
+        /// The key as a string.
+        /// </summary>
+        string? Key { get; }
+
+        /// <summary>
+        /// The key as bytes.
+        /// </summary>
+        byte[]? KeyBytes { get; }
+
+        /// <summary>
+        /// Check whether the message has an ordering key.
+        /// </summary>
+        bool HasOrderingKey { get; }
+
+        /// <summary>
+        /// The ordering key of the message.
+        /// </summary>
+        byte[]? OrderingKey { get; }
+
+        /// <summary>
+        /// The publish time of the message as unix time in milliseconds.
+        /// </summary>
+        ulong PublishTime { get; }
+
+        /// <summary>
+        /// The publish time of the message as an UTC DateTime.
+        /// </summary>
+        public DateTime PublishTimeAsDateTime { get; }
+
+        /// <summary>
+        /// The publish time of the message as a DateTimeOffset with an offset of 0.
+        /// </summary>
+        public DateTimeOffset PublishTimeAsDateTimeOffset { get; }
+
+        /// <summary>
+        /// The properties of the message.
+        /// </summary>
+        public IReadOnlyDictionary<string, string> Properties { get; }
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IMessageBuilder.cs b/src/DotPulsar/Abstractions/IMessageBuilder.cs
index 1720a4e..32c9d94 100644
--- a/src/DotPulsar/Abstractions/IMessageBuilder.cs
+++ b/src/DotPulsar/Abstractions/IMessageBuilder.cs
@@ -21,67 +21,72 @@
     /// <summary>
     /// A message building abstraction.
     /// </summary>
-    public interface IMessageBuilder
+    public interface IMessageBuilder<TMessage>
     {
         /// <summary>
         /// Timestamp as unix time in milliseconds indicating when the message should be delivered to consumers.
         /// </summary>
-        IMessageBuilder DeliverAt(long timestamp);
+        IMessageBuilder<TMessage> DeliverAt(long timestamp);
 
         /// <summary>
         /// Timestamp as UTC DateTime indicating when the message should be delivered to consumers.
         /// </summary>
-        IMessageBuilder DeliverAt(DateTime timestamp);
+        IMessageBuilder<TMessage> DeliverAt(DateTime timestamp);
 
         /// <summary>
         /// Timestamp as DateTimeOffset indicating when the message should be delivered to consumers.
         /// </summary>
-        IMessageBuilder DeliverAt(DateTimeOffset timestamp);
+        IMessageBuilder<TMessage> DeliverAt(DateTimeOffset timestamp);
 
         /// <summary>
         /// The event time of the message as unix time in milliseconds.
         /// </summary>
-        IMessageBuilder EventTime(ulong eventTime);
+        IMessageBuilder<TMessage> EventTime(ulong eventTime);
 
         /// <summary>
         /// The event time of the message as an UTC DateTime.
         /// </summary>
-        IMessageBuilder EventTime(DateTime eventTime);
+        IMessageBuilder<TMessage> EventTime(DateTime eventTime);
 
         /// <summary>
         /// The event time of the message as a DateTimeOffset.
         /// </summary>
-        IMessageBuilder EventTime(DateTimeOffset eventTime);
+        IMessageBuilder<TMessage> EventTime(DateTimeOffset eventTime);
 
         /// <summary>
         /// Set the key of the message for routing policy.
         /// </summary>
-        IMessageBuilder Key(string key);
+        IMessageBuilder<TMessage> Key(string key);
 
         /// <summary>
         /// Set the key of the message for routing policy.
         /// </summary>
-        IMessageBuilder KeyBytes(byte[] key);
+        IMessageBuilder<TMessage> KeyBytes(byte[] key);
 
         /// <summary>
         /// Set the ordering key of the message for message dispatch in SubscriptionType.KeyShared mode.
         /// The partition key will be used if the ordering key is not specified.
         /// </summary>
-        IMessageBuilder OrderingKey(byte[] key);
+        IMessageBuilder<TMessage> OrderingKey(byte[] key);
 
         /// <summary>
         /// Add/Set a property key/value on the message.
         /// </summary>
-        IMessageBuilder Property(string key, string value);
+        IMessageBuilder<TMessage> Property(string key, string value);
+
+        /// <summary>
+        /// Set the schema version of the message.
+        /// </summary>
+        IMessageBuilder<TMessage> SchemaVersion(byte[] schemaVersion);
 
         /// <summary>
         /// Set the sequence id of the message.
         /// </summary>
-        IMessageBuilder SequenceId(ulong sequenceId);
+        IMessageBuilder<TMessage> SequenceId(ulong sequenceId);
 
         /// <summary>
-        /// Set the consumer name.
+        /// Sends a message.
         /// </summary>
-        ValueTask<MessageId> Send(ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default);
+        ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Abstractions/IMessageOfT.cs b/src/DotPulsar/Abstractions/IMessageOfT.cs
new file mode 100644
index 0000000..6b90ae5
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IMessageOfT.cs
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A generic message abstraction.
+    /// </summary>
+    public interface IMessage<TValue> : IMessage
+    {
+        /// <summary>
+        /// The value of the message.
+        /// </summary>
+        public TValue Value { get; }
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index baf3ca4..ac13630 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -19,7 +19,7 @@
     /// <summary>
     /// A producer abstraction.
     /// </summary>
-    public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
+    public interface IProducer : IState<ProducerState>, IAsyncDisposable
     {
         /// <summary>
         /// The producer's service url.
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 930d2ca..17788a5 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -14,53 +14,39 @@
 
 namespace DotPulsar.Abstractions
 {
-    using System;
-    using System.Threading;
-    using System.Threading.Tasks;
-
     /// <summary>
     /// A producer building abstraction.
     /// </summary>
-    public interface IProducerBuilder
+    public interface IProducerBuilder<TMessage>
     {
         /// <summary>
         /// Set the compression type. The default is 'None'.
         /// </summary>
-        IProducerBuilder CompressionType(CompressionType compressionType);
+        IProducerBuilder<TMessage> CompressionType(CompressionType compressionType);
 
         /// <summary>
         /// Set the initial sequence id. The default is 0.
         /// </summary>
-        IProducerBuilder InitialSequenceId(ulong initialSequenceId);
+        IProducerBuilder<TMessage> InitialSequenceId(ulong initialSequenceId);
 
         /// <summary>
         /// Set the producer name. This is optional.
         /// </summary>
-        IProducerBuilder ProducerName(string name);
+        IProducerBuilder<TMessage> ProducerName(string name);
 
         /// <summary>
         /// Register a state changed handler.
         /// </summary>
-        IProducerBuilder StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler);
-
-        /// <summary>
-        /// Register a state changed handler.
-        /// </summary>
-        IProducerBuilder StateChangedHandler(Action<ProducerStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Register a state changed handler.
-        /// </summary>
-        IProducerBuilder StateChangedHandler(Func<ProducerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+        IProducerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler);
 
         /// <summary>
         /// Set the topic for this producer. This is required.
         /// </summary>
-        IProducerBuilder Topic(string topic);
+        IProducerBuilder<TMessage> Topic(string topic);
 
         /// <summary>
         /// Create the producer.
         /// </summary>
-        IProducer Create();
+        IProducer<TMessage> Create();
     }
 }
diff --git a/src/DotPulsar/Abstractions/IProducerOfT.cs b/src/DotPulsar/Abstractions/IProducerOfT.cs
new file mode 100644
index 0000000..da42458
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IProducerOfT.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A generic producer abstraction.
+    /// </summary>
+    public interface IProducer<TMessage> : IProducer, ISend<TMessage> { }
+}
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
index 7189257..64f6add 100644
--- a/src/DotPulsar/Abstractions/IPulsarClient.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -24,17 +24,17 @@
         /// <summary>
         /// Create a producer.
         /// </summary>
-        IProducer CreateProducer(ProducerOptions options);
+        IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options);
 
         /// <summary>
         /// Create a consumer.
         /// </summary>
-        IConsumer CreateConsumer(ConsumerOptions options);
+        IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options);
 
         /// <summary>
         /// Create a reader.
         /// </summary>
-        IReader CreateReader(ReaderOptions options);
+        IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options);
 
         /// <summary>
         /// The client's service url.
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 484a196..d632aae 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -16,7 +16,6 @@
 {
     using System;
     using System.Security.Cryptography.X509Certificates;
-    using System.Threading.Tasks;
 
     /// <summary>
     /// A pulsar client building abstraction.
@@ -44,16 +43,6 @@
         IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler);
 
         /// <summary>
-        /// Register a custom exception handler that will be invoked before the default exception handler.
-        /// </summary>
-        IPulsarClientBuilder ExceptionHandler(Action<ExceptionContext> exceptionHandler);
-
-        /// <summary>
-        /// Register a custom exception handler that will be invoked before the default exception handler.
-        /// </summary>
-        IPulsarClientBuilder ExceptionHandler(Func<ExceptionContext, ValueTask> exceptionHandler);
-
-        /// <summary>
         /// The time to wait before retrying an operation or a reconnect. The default is 3 seconds.
         /// </summary>
         IPulsarClientBuilder RetryInterval(TimeSpan interval);
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 7962d54..3e438ac 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -19,7 +19,7 @@
     /// <summary>
     /// A reader abstraction.
     /// </summary>
-    public interface IReader : IGetLastMessageId, IReceive, ISeek, IState<ReaderState>, IAsyncDisposable
+    public interface IReader : IGetLastMessageId, ISeek, IState<ReaderState>, IAsyncDisposable
     {
         /// <summary>
         /// The reader's service url.
diff --git a/src/DotPulsar/Abstractions/IReaderBuilder.cs b/src/DotPulsar/Abstractions/IReaderBuilder.cs
index 3e97705..18235a6 100644
--- a/src/DotPulsar/Abstractions/IReaderBuilder.cs
+++ b/src/DotPulsar/Abstractions/IReaderBuilder.cs
@@ -14,58 +14,44 @@
 
 namespace DotPulsar.Abstractions
 {
-    using System;
-    using System.Threading;
-    using System.Threading.Tasks;
-
     /// <summary>
     /// A reader building abstraction.
     /// </summary>
-    public interface IReaderBuilder
+    public interface IReaderBuilder<TMessage>
     {
         /// <summary>
         /// Number of messages that will be prefetched. The default is 1000.
         /// </summary>
-        IReaderBuilder MessagePrefetchCount(uint count);
+        IReaderBuilder<TMessage> MessagePrefetchCount(uint count);
 
         /// <summary>
         /// Whether to read from the compacted topic. The default is 'false'.
         /// </summary>
-        IReaderBuilder ReadCompacted(bool readCompacted);
+        IReaderBuilder<TMessage> ReadCompacted(bool readCompacted);
 
         /// <summary>
         /// Set the reader name. This is optional.
         /// </summary>
-        IReaderBuilder ReaderName(string name);
+        IReaderBuilder<TMessage> ReaderName(string name);
 
         /// <summary>
         /// The initial reader position is set to the specified message id. This is required.
         /// </summary>
-        IReaderBuilder StartMessageId(MessageId messageId);
+        IReaderBuilder<TMessage> StartMessageId(MessageId messageId);
 
         /// <summary>
         /// Register a state changed handler.
         /// </summary>
-        IReaderBuilder StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler);
-
-        /// <summary>
-        /// Register a state changed handler.
-        /// </summary>
-        IReaderBuilder StateChangedHandler(Action<ReaderStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Register a state changed handler.
-        /// </summary>
-        IReaderBuilder StateChangedHandler(Func<ReaderStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+        IReaderBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler);
 
         /// <summary>
         /// Set the topic for this reader. This is required.
         /// </summary>
-        IReaderBuilder Topic(string topic);
+        IReaderBuilder<TMessage> Topic(string topic);
 
         /// <summary>
         /// Create the reader.
         /// </summary>
-        IReader Create();
+        IReader<TMessage> Create();
     }
 }
diff --git a/src/DotPulsar/Abstractions/IReaderOfT.cs b/src/DotPulsar/Abstractions/IReaderOfT.cs
new file mode 100644
index 0000000..35f26c9
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IReaderOfT.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A generic reader abstraction.
+    /// </summary>
+    public interface IReader<TMessage> : IReader, IReceive<IMessage<TMessage>> { }
+}
diff --git a/src/DotPulsar/Abstractions/IReceive.cs b/src/DotPulsar/Abstractions/IReceive.cs
index 2ff275c..994638a 100644
--- a/src/DotPulsar/Abstractions/IReceive.cs
+++ b/src/DotPulsar/Abstractions/IReceive.cs
@@ -20,11 +20,11 @@
     /// <summary>
     /// An abstraction for receiving a single message.
     /// </summary>
-    public interface IReceive
+    public interface IReceive<TMessage>
     {
         /// <summary>
         /// Receive a single message.
         /// </summary>
-        ValueTask<Message> Receive(CancellationToken cancellationToken = default);
+        ValueTask<TMessage> Receive(CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Abstractions/ISchema.cs b/src/DotPulsar/Abstractions/ISchema.cs
new file mode 100644
index 0000000..698cad7
--- /dev/null
+++ b/src/DotPulsar/Abstractions/ISchema.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using System.Buffers;
+
+    /// <summary>
+    /// A schema abstraction.
+    /// </summary>
+    public interface ISchema<T>
+    {
+        /// <summary>
+        /// Decode the raw bytes.
+        /// </summary>
+        public T Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null);
+
+        /// <summary>
+        /// Encode the message.
+        /// </summary>
+        public ReadOnlySequence<byte> Encode(T message);
+
+        /// <summary>
+        /// The schema info.
+        /// </summary>
+        public SchemaInfo SchemaInfo { get; }
+    }
+}
diff --git a/src/DotPulsar/Abstractions/ISend.cs b/src/DotPulsar/Abstractions/ISend.cs
index 2948c34..642f61d 100644
--- a/src/DotPulsar/Abstractions/ISend.cs
+++ b/src/DotPulsar/Abstractions/ISend.cs
@@ -14,23 +14,22 @@
 
 namespace DotPulsar.Abstractions
 {
-    using System.Buffers;
     using System.Threading;
     using System.Threading.Tasks;
 
     /// <summary>
     /// An abstraction for sending a message.
     /// </summary>
-    public interface ISend
+    public interface ISend<TMessage>
     {
         /// <summary>
         /// Sends a message.
         /// </summary>
-        ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+        ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Sends a message with metadata.
         /// </summary>
-        ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+        ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index 8496487..754f73a 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -19,7 +19,7 @@
     /// <summary>
     /// The consumer building options.
     /// </summary>
-    public sealed class ConsumerOptions
+    public sealed class ConsumerOptions<TMessage>
     {
         /// <summary>
         /// The default initial position.
@@ -49,7 +49,7 @@
         /// <summary>
         /// Initializes a new instance using the specified subscription name and topic.
         /// </summary>
-        public ConsumerOptions(string subscriptionName, string topic)
+        public ConsumerOptions(string subscriptionName, string topic, ISchema<TMessage> schema)
         {
             InitialPosition = DefaultInitialPosition;
             PriorityLevel = DefaultPriorityLevel;
@@ -58,6 +58,7 @@
             SubscriptionType = DefaultSubscriptionType;
             SubscriptionName = subscriptionName;
             Topic = topic;
+            Schema = schema;
         }
 
         /// <summary>
@@ -86,6 +87,11 @@
         public bool ReadCompacted { get; set; }
 
         /// <summary>
+        /// Set the schema. This is required.
+        /// </summary>
+        public ISchema<TMessage> Schema { get; set; }
+
+        /// <summary>
         /// Register a state changed handler. This is optional.
         /// </summary>
         public IHandleStateChanged<ConsumerStateChanged>? StateChangedHandler { get; set; }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 934b5a4..d766c2d 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -31,6 +31,15 @@
   <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
     <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
     <PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+  </ItemGroup>
+
+  <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+  </ItemGroup>
+    
+  <ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
   </ItemGroup>
 
   <ItemGroup>
diff --git a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
index 98ad756..1ebdf11 100644
--- a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
@@ -1,10 +1,23 @@
-namespace DotPulsar.Exceptions
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
 {
-    using Internal;
     using System;
 
     public sealed class ConsumerDisposedException : ObjectDisposedException
     {
-        public ConsumerDisposedException() : base(typeof(Consumer).FullName) { }
+        public ConsumerDisposedException(string objectName) : base(objectName) { }
     }
 }
diff --git a/src/DotPulsar/Exceptions/ProducerDisposedException.cs b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
index eabfa73..f07fb1b 100644
--- a/src/DotPulsar/Exceptions/ProducerDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
@@ -1,10 +1,23 @@
-namespace DotPulsar.Exceptions
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
 {
-    using Internal;
     using System;
 
     public sealed class ProducerDisposedException : ObjectDisposedException
     {
-        public ProducerDisposedException() : base(typeof(Producer).FullName) { }
+        public ProducerDisposedException(string objectName) : base(objectName) { }
     }
 }
diff --git a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
index 1066730..3b3ceb6 100644
--- a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
+++ b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
@@ -1,4 +1,18 @@
-namespace DotPulsar.Exceptions
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
 {
     using System;
 
diff --git a/src/DotPulsar/Exceptions/ReaderDisposedException.cs b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
index df01535..1ab4b80 100644
--- a/src/DotPulsar/Exceptions/ReaderDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
@@ -1,10 +1,23 @@
-namespace DotPulsar.Exceptions
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
 {
-    using Internal;
     using System;
 
     public sealed class ReaderDisposedException : ObjectDisposedException
     {
-        public ReaderDisposedException() : base(typeof(Reader).FullName) { }
+        public ReaderDisposedException(string objectName) : base(objectName) { }
     }
 }
diff --git a/src/DotPulsar/Exceptions/SchemaSerializationException.cs b/src/DotPulsar/Exceptions/SchemaSerializationException.cs
new file mode 100644
index 0000000..900f8b1
--- /dev/null
+++ b/src/DotPulsar/Exceptions/SchemaSerializationException.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+    public sealed class SchemaSerializationException : DotPulsarException
+    {
+        public SchemaSerializationException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
new file mode 100644
index 0000000..5aa1e40
--- /dev/null
+++ b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public static class ConsumerBuilderExtensions
+    {
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
+            this IConsumerBuilder<TMessage> builder,
+            Action<ConsumerStateChanged, CancellationToken> handler,
+            CancellationToken cancellationToken = default)
+        {
+            builder.StateChangedHandler(new ActionStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken));
+            return builder;
+        }
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
+            this IConsumerBuilder<TMessage> builder,
+            Func<ConsumerStateChanged, CancellationToken, ValueTask> handler,
+            CancellationToken cancellationToken = default)
+        {
+            builder.StateChangedHandler(new FuncStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken));
+            return builder;
+        }
+    }
+}
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index b61971c..13f2121 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -15,6 +15,10 @@
 namespace DotPulsar.Extensions
 {
     using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -26,16 +30,105 @@
         /// <summary>
         /// Acknowledge the consumption of a single message.
         /// </summary>
-        public static async ValueTask Acknowledge(this IConsumer consumer, Message message, CancellationToken cancellationToken = default)
+        public static async ValueTask Acknowledge(this IConsumer consumer, IMessage message, CancellationToken cancellationToken = default)
             => await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
 
         /// <summary>
         /// Acknowledge the consumption of all the messages in the topic up to and including the provided message.
         /// </summary>
-        public static async ValueTask AcknowledgeCumulative(this IConsumer consumer, Message message, CancellationToken cancellationToken = default)
+        public static async ValueTask AcknowledgeCumulative(this IConsumer consumer, IMessage message, CancellationToken cancellationToken = default)
             => await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
 
         /// <summary>
+        /// Process and auto-acknowledge a message.
+        /// </summary>
+        public static async ValueTask Process<TMessage>(
+            this IConsumer<TMessage> consumer,
+            Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
+            CancellationToken cancellationToken = default)
+        {
+            const string operation = "process";
+            var operationName = $"{consumer.Topic} {operation}";
+
+            var tags = new List<KeyValuePair<string, object?>>
+            {
+                new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
+                new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+                new KeyValuePair<string, object?>("messaging.operation", operation),
+                new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+                new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
+                new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
+            };
+
+            while (!cancellationToken.IsCancellationRequested)
+            {
+                var message = await consumer.Receive(cancellationToken).ConfigureAwait(false);
+
+                var activity = StartActivity(message, operationName, tags);
+
+                if (activity is not null && activity.IsAllDataRequested)
+                {
+                    activity.SetTag("messaging.message_id", message.MessageId.ToString());
+                    activity.SetTag("messaging.message_payload_size_bytes", message.Data.Length);
+                    activity.SetTag("otel.status_code", "OK");
+                }
+
+                try
+                {
+                    await processor(message, cancellationToken).ConfigureAwait(false);
+                }
+                catch (Exception exception)
+                {
+                    if (activity is not null && activity.IsAllDataRequested)
+                    {
+                        activity.SetTag("otel.status_code", "ERROR");
+
+                        var exceptionTags = new ActivityTagsCollection
+                        {
+                            { "exception.type", exception.GetType().FullName },
+                            { "exception.stacktrace", exception.ToString() }
+                        };
+
+                        if (!string.IsNullOrWhiteSpace(exception.Message))
+                            exceptionTags.Add("exception.message", exception.Message);
+
+                        var activityEvent = new ActivityEvent("exception", default, exceptionTags);
+                        activity.AddEvent(activityEvent);
+                    }
+                }
+
+                activity?.Dispose();
+
+                await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
+            }
+        }
+
+        private static Activity? StartActivity(IMessage message, string operationName, IEnumerable<KeyValuePair<string, object?>> tags)
+        {
+            if (!DotPulsarActivitySource.ActivitySource.HasListeners())
+                return null;
+
+            var properties = message.Properties;
+
+            if (properties.TryGetValue("traceparent", out var traceparent))  // TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
+            {
+                var tracestate = properties.ContainsKey("tracestate") ? properties["tracestrate"] : null;
+                if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
+                    return DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
+            }
+
+            var activity = DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
+
+            if (activity is not null && activity.IsAllDataRequested)
+            {
+                foreach (var tag in tags)
+                    activity.SetTag(tag.Key, tag.Value);
+            }
+
+            return activity;
+        }
+
+        /// <summary>
         /// Wait for the state to change to a specific state.
         /// </summary>
         /// <returns>
diff --git a/src/DotPulsar/Extensions/MessageBuilderExtensions.cs b/src/DotPulsar/Extensions/MessageBuilderExtensions.cs
new file mode 100644
index 0000000..fe87ead
--- /dev/null
+++ b/src/DotPulsar/Extensions/MessageBuilderExtensions.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public static class MessageBuilderExtensions
+    {
+        /// <summary>
+        /// Sends a message.
+        /// </summary>
+        public static async ValueTask<MessageId> Send(this IMessageBuilder<ReadOnlySequence<byte>> builder, byte[] payload, CancellationToken cancellationToken = default)
+            => await builder.Send(new ReadOnlySequence<byte>(payload), cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Sends a message.
+        /// </summary>
+        public static async ValueTask<MessageId> Send(this IMessageBuilder<ReadOnlySequence<byte>> builder, ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default)
+            => await builder.Send(new ReadOnlySequence<byte>(payload), cancellationToken).ConfigureAwait(false);
+    }
+}
diff --git a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
new file mode 100644
index 0000000..7c4a42c
--- /dev/null
+++ b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public static class ProducerBuilderExtensions
+    {
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
+            this IProducerBuilder<TMessage> builder,
+            Action<ProducerStateChanged, CancellationToken> handler,
+            CancellationToken cancellationToken = default)
+        {
+            builder.StateChangedHandler(new ActionStateChangedHandler<ProducerStateChanged>(handler, cancellationToken));
+            return builder;
+        }
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
+            this IProducerBuilder<TMessage> builder,
+            Func<ProducerStateChanged, CancellationToken, ValueTask> handler,
+            CancellationToken cancellationToken = default)
+        {
+            builder.StateChangedHandler(new FuncStateChangedHandler<ProducerStateChanged>(handler, cancellationToken));
+            return builder;
+        }
+    }
+}
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
index 8f02262..d292468 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -27,8 +27,8 @@
         /// <summary>
         /// Get a builder that can be used to configure and build a Message.
         /// </summary>
-        public static IMessageBuilder NewMessage(this IProducer producer)
-            => new MessageBuilder(producer);
+        public static IMessageBuilder<TMessage> NewMessage<TMessage>(this IProducer<TMessage> producer)
+            => new MessageBuilder<TMessage>(producer);
 
         /// <summary>
         /// Wait for the state to change to a specific state.
diff --git a/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs b/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs
new file mode 100644
index 0000000..2ec6c8d
--- /dev/null
+++ b/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+    using System.Threading.Tasks;
+
+    public static class PulsarClientBuilderExtensions
+    {
+        /// <summary>
+        /// Register a custom exception handler that will be invoked before the default exception handler.
+        /// </summary>
+        public static IPulsarClientBuilder ExceptionHandler(this IPulsarClientBuilder builder, Action<ExceptionContext> exceptionHandler)
+        {
+            builder.ExceptionHandler(new ActionExceptionHandler(exceptionHandler));
+            return builder;
+        }
+
+        /// <summary>
+        /// Register a custom exception handler that will be invoked before the default exception handler.
+        /// </summary>
+        public static IPulsarClientBuilder ExceptionHandler(this IPulsarClientBuilder builder, Func<ExceptionContext, ValueTask> exceptionHandler)
+        {
+            builder.ExceptionHandler(new FuncExceptionHandler(exceptionHandler));
+            return builder;
+        }
+    }
+}
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
index 574bce1..2cb3097 100644
--- a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -15,7 +15,9 @@
 namespace DotPulsar.Extensions
 {
     using Abstractions;
+    using DotPulsar.Schemas;
     using Internal;
+    using System.Buffers;
 
     /// <summary>
     /// Extensions for IPulsarClient.
@@ -25,19 +27,37 @@
         /// <summary>
         /// Get a builder that can be used to configure and build a Producer instance.
         /// </summary>
-        public static IProducerBuilder NewProducer(this IPulsarClient pulsarClient)
-            => new ProducerBuilder(pulsarClient);
+        public static IProducerBuilder<ReadOnlySequence<byte>> NewProducer(this IPulsarClient pulsarClient)
+            => new ProducerBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
 
         /// <summary>
         /// Get a builder that can be used to configure and build a Consumer instance.
         /// </summary>
-        public static IConsumerBuilder NewConsumer(this IPulsarClient pulsarClient)
-            => new ConsumerBuilder(pulsarClient);
+        public static IConsumerBuilder<ReadOnlySequence<byte>> NewConsumer(this IPulsarClient pulsarClient)
+            => new ConsumerBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
 
         /// <summary>
         /// Get a builder that can be used to configure and build a Reader instance.
         /// </summary>
-        public static IReaderBuilder NewReader(this IPulsarClient pulsarClient)
-            => new ReaderBuilder(pulsarClient);
+        public static IReaderBuilder<ReadOnlySequence<byte>> NewReader(this IPulsarClient pulsarClient)
+            => new ReaderBuilder<ReadOnlySequence<byte>>(pulsarClient, new ByteSequenceSchema());
+
+        /// <summary>
+        /// Get a builder that can be used to configure and build a Producer instance.
+        /// </summary>
+        public static IProducerBuilder<TMessage> NewProducer<TMessage>(this IPulsarClient pulsarClient, ISchema<TMessage> schema)
+            => new ProducerBuilder<TMessage>(pulsarClient, schema);
+
+        /// <summary>
+        /// Get a builder that can be used to configure and build a Consumer instance.
+        /// </summary>
+        public static IConsumerBuilder<TMessage> NewConsumer<TMessage>(this IPulsarClient pulsarClient, ISchema<TMessage> schema)
+            => new ConsumerBuilder<TMessage>(pulsarClient, schema);
+
+        /// <summary>
+        /// Get a builder that can be used to configure and build a Reader instance.
+        /// </summary>
+        public static IReaderBuilder<TMessage> NewReader<TMessage>(this IPulsarClient pulsarClient, ISchema<TMessage> schema)
+            => new ReaderBuilder<TMessage>(pulsarClient, schema);
     }
 }
diff --git a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
new file mode 100644
index 0000000..c9d4b69
--- /dev/null
+++ b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public static class ReaderBuilderExtensions
+    {
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
+            this IReaderBuilder<TMessage> builder,
+            Action<ReaderStateChanged, CancellationToken> handler,
+            CancellationToken cancellationToken = default)
+        {
+            builder.StateChangedHandler(new ActionStateChangedHandler<ReaderStateChanged>(handler, cancellationToken));
+            return builder;
+        }
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
+            this IReaderBuilder<TMessage> builder,
+            Func<ReaderStateChanged, CancellationToken, ValueTask> handler,
+            CancellationToken cancellationToken = default)
+        {
+            builder.StateChangedHandler(new FuncStateChangedHandler<ReaderStateChanged>(handler, cancellationToken));
+            return builder;
+        }
+    }
+}
diff --git a/src/DotPulsar/Extensions/ReceiveExtensions.cs b/src/DotPulsar/Extensions/ReceiveExtensions.cs
index c2262c6..5a165a6 100644
--- a/src/DotPulsar/Extensions/ReceiveExtensions.cs
+++ b/src/DotPulsar/Extensions/ReceiveExtensions.cs
@@ -27,7 +27,7 @@
         /// <summary>
         /// Get an IAsyncEnumerable for receiving messages.
         /// </summary>
-        public static async IAsyncEnumerable<Message> Messages(this IReceive receiver, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+        public static async IAsyncEnumerable<TMessage> Messages<TMessage>(this IReceive<TMessage> receiver, [EnumeratorCancellation] CancellationToken cancellationToken = default)
         {
             while (!cancellationToken.IsCancellationRequested)
                 yield return await receiver.Receive(cancellationToken).ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index f4abd8b..368a111 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -40,5 +40,6 @@
         Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken);
         Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken);
         Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index ef09431..a359508 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -14,19 +14,20 @@
 
 namespace DotPulsar.Internal.Abstractions
 {
+    using DotPulsar.Abstractions;
     using PulsarApi;
     using System;
     using System.Threading;
     using System.Threading.Tasks;
 
-    public interface IConsumerChannel : IAsyncDisposable
+    public interface IConsumerChannel<TMessage> : IAsyncDisposable
     {
         Task Send(CommandAck command, CancellationToken cancellationToken);
         Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
         Task Send(CommandUnsubscribe command, CancellationToken cancellationToken);
         Task Send(CommandSeek command, CancellationToken cancellationToken);
         Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
-        ValueTask<Message> Receive(CancellationToken cancellationToken);
+        ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken);
         ValueTask ClosedByClient(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
index 3065119..76fb932 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
@@ -17,8 +17,8 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public interface IConsumerChannelFactory
+    public interface IConsumerChannelFactory<TMessage>
     {
-        Task<IConsumerChannel> Create(CancellationToken cancellationToken = default);
+        Task<IConsumerChannel<TMessage>> Create(CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IEstablishNewChannel.cs b/src/DotPulsar/Internal/Abstractions/IEstablishNewChannel.cs
new file mode 100644
index 0000000..bcf8723
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IEstablishNewChannel.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public interface IEstablishNewChannel : IAsyncDisposable
+    {
+        Task EstablishNewChannel(CancellationToken cancellationToken);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IMessageFactory.cs b/src/DotPulsar/Internal/Abstractions/IMessageFactory.cs
new file mode 100644
index 0000000..ebdb3ee
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IMessageFactory.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal.PulsarApi;
+    using System.Buffers;
+
+    public interface IMessageFactory<TValue>
+    {
+        IMessage<TValue> Create(MessageId messageId, uint redeliveryCount, ReadOnlySequence<byte> data, MessageMetadata metadata, SingleMessageMetadata? singleMetadata = null);
+    }
+}
diff --git a/src/DotPulsar/Internal/Process.cs b/src/DotPulsar/Internal/Abstractions/Process.cs
similarity index 97%
rename from src/DotPulsar/Internal/Process.cs
rename to src/DotPulsar/Internal/Abstractions/Process.cs
index 40b9712..056471a 100644
--- a/src/DotPulsar/Internal/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -12,9 +12,8 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal.Abstractions
 {
-    using Abstractions;
     using Events;
     using System;
     using System.Threading;
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index 9523627..60d2e9e 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -14,53 +14,64 @@
 
 namespace DotPulsar.Internal
 {
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal.Abstractions;
     using Extensions;
     using PulsarApi;
     using System.Buffers;
     using System.Collections;
     using System.Collections.Generic;
 
-    public sealed class BatchHandler
+    public sealed class BatchHandler<TMessage>
     {
         private readonly object _lock;
         private readonly bool _trackBatches;
-        private readonly Queue<Message> _messages;
+        private readonly IMessageFactory<TMessage> _messageFactory;
+        private readonly Queue<IMessage<TMessage>> _messages;
         private readonly LinkedList<Batch> _batches;
 
-        public BatchHandler(bool trackBatches)
+        public BatchHandler(bool trackBatches, IMessageFactory<TMessage> messageFactory)
         {
             _lock = new object();
             _trackBatches = trackBatches;
-            _messages = new Queue<Message>();
+            _messageFactory = messageFactory;
+            _messages = new Queue<IMessage<TMessage>>();
             _batches = new LinkedList<Batch>();
         }
 
-        public Message Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
+        public IMessage<TMessage> Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
         {
+            var messages = new List<IMessage<TMessage>>(metadata.NumMessagesInBatch);
+
+            long index = 0;
+
+            for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+            {
+                var singleMetadataSize = data.ReadUInt32(index, true);
+                index += 4;
+                var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
+                index += singleMetadataSize;
+                var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
+                var message = _messageFactory.Create(singleMessageId, redeliveryCount, data.Slice(index, singleMetadata.PayloadSize), metadata, singleMetadata);
+                messages.Add(message);
+                index += (uint) singleMetadata.PayloadSize;
+            }
+
             lock (_lock)
             {
                 if (_trackBatches)
                     _batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
 
-                long index = 0;
-
-                for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+                foreach (var message in messages)
                 {
-                    var singleMetadataSize = data.ReadUInt32(index, true);
-                    index += 4;
-                    var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
-                    index += singleMetadataSize;
-                    var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
-                    var message = MessageFactory.Create(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
                     _messages.Enqueue(message);
-                    index += (uint) singleMetadata.PayloadSize;
                 }
 
                 return _messages.Dequeue();
             }
         }
 
-        public Message? GetNext()
+        public IMessage<TMessage>? GetNext()
         {
             lock (_lock)
                 return _messages.Count == 0 ? null : _messages.Dequeue();
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 02280dd..56bd58d 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -184,6 +184,23 @@
             return await response.ConfigureAwait(false);
         }
 
+        public async Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            Task<BaseCommand>? responseTask;
+
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+            {
+                var baseCommand = command.AsBaseCommand();
+                responseTask = _requestResponseHandler.Outgoing(baseCommand);
+                var sequence = Serializer.Serialize(baseCommand);
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
+
+            return await responseTask.ConfigureAwait(false);
+        }
+
         private async Task<BaseCommand> SendRequestResponse(BaseCommand command, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index c29db02..8b1b46b 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -27,14 +27,15 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Consumer : IConsumer
+    public sealed class Consumer<TMessage> : IEstablishNewChannel, IConsumer<TMessage>
     {
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
-        private IConsumerChannel _channel;
+        private IConsumerChannel<TMessage> _channel;
         private readonly ObjectPool<CommandAck> _commandAckPool;
         private readonly IExecute _executor;
         private readonly IStateChanged<ConsumerState> _state;
+        private readonly IConsumerChannelFactory<TMessage> _factory;
         private int _isDisposed;
 
         public Uri ServiceUrl { get; }
@@ -47,9 +48,10 @@
             string subscriptionName,
             string topic,
             IRegisterEvent eventRegister,
-            IConsumerChannel initialChannel,
+            IConsumerChannel<TMessage> initialChannel,
             IExecute executor,
-            IStateChanged<ConsumerState> state)
+            IStateChanged<ConsumerState> state,
+            IConsumerChannelFactory<TMessage> factory)
         {
             _correlationId = correlationId;
             ServiceUrl = serviceUrl;
@@ -59,10 +61,11 @@
             _channel = initialChannel;
             _executor = executor;
             _state = state;
+            _factory = factory;
             _commandAckPool = new DefaultObjectPool<CommandAck>(new DefaultPooledObjectPolicy<CommandAck>());
             _isDisposed = 0;
 
-            _eventRegister.Register(new ConsumerCreated(_correlationId, this));
+            _eventRegister.Register(new ConsumerCreated(_correlationId));
         }
 
         public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, CancellationToken cancellationToken)
@@ -82,19 +85,19 @@
             if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
                 return;
 
-            _eventRegister.Register(new ConsumerDisposed(_correlationId, this));
+            _eventRegister.Register(new ConsumerDisposed(_correlationId));
             await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
-        public async ValueTask<Message> Receive(CancellationToken cancellationToken)
+        public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
             return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
-        private async ValueTask<Message> ReceiveMessage(CancellationToken cancellationToken)
+        private async ValueTask<IMessage<TMessage>> ReceiveMessage(CancellationToken cancellationToken)
             => await _channel.Receive(cancellationToken).ConfigureAwait(false);
 
         public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
@@ -183,13 +186,15 @@
         private async ValueTask RedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
             => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
-        internal async ValueTask SetChannel(IConsumerChannel channel)
+        private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-            {
-                await channel.DisposeAsync().ConfigureAwait(false);
-                return;
-            }
+                throw new ConsumerDisposedException(GetType().FullName!);
+        }
+
+        public async Task EstablishNewChannel(CancellationToken cancellationToken)
+        {
+            var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
 
             var oldChannel = _channel;
             _channel = channel;
@@ -197,11 +202,5 @@
             if (oldChannel is not null)
                 await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
-
-        private void ThrowIfDisposed()
-        {
-            if (_isDisposed != 0)
-                throw new ConsumerDisposedException();
-        }
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 6deee54..3e48ece 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -16,13 +16,11 @@
 {
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
-    using System;
-    using System.Threading;
-    using System.Threading.Tasks;
 
-    public sealed class ConsumerBuilder : IConsumerBuilder
+    public sealed class ConsumerBuilder<TMessage> : IConsumerBuilder<TMessage>
     {
         private readonly IPulsarClient _pulsarClient;
+        private readonly ISchema<TMessage> _schema;
         private string? _consumerName;
         private SubscriptionInitialPosition _initialPosition;
         private int _priorityLevel;
@@ -33,83 +31,72 @@
         private string? _topic;
         private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
 
-        public ConsumerBuilder(IPulsarClient pulsarClient)
+        public ConsumerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
         {
+            _schema = schema;
             _pulsarClient = pulsarClient;
-            _initialPosition = ConsumerOptions.DefaultInitialPosition;
-            _priorityLevel = ConsumerOptions.DefaultPriorityLevel;
-            _messagePrefetchCount = ConsumerOptions.DefaultMessagePrefetchCount;
-            _readCompacted = ConsumerOptions.DefaultReadCompacted;
-            _subscriptionType = ConsumerOptions.DefaultSubscriptionType;
+            _initialPosition = ConsumerOptions<TMessage>.DefaultInitialPosition;
+            _priorityLevel = ConsumerOptions<TMessage>.DefaultPriorityLevel;
+            _messagePrefetchCount = ConsumerOptions<TMessage>.DefaultMessagePrefetchCount;
+            _readCompacted = ConsumerOptions<TMessage>.DefaultReadCompacted;
+            _subscriptionType = ConsumerOptions<TMessage>.DefaultSubscriptionType;
         }
 
-        public IConsumerBuilder ConsumerName(string name)
+        public IConsumerBuilder<TMessage> ConsumerName(string name)
         {
             _consumerName = name;
             return this;
         }
 
-        public IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition)
+        public IConsumerBuilder<TMessage> InitialPosition(SubscriptionInitialPosition initialPosition)
         {
             _initialPosition = initialPosition;
             return this;
         }
 
-        public IConsumerBuilder MessagePrefetchCount(uint count)
+        public IConsumerBuilder<TMessage> MessagePrefetchCount(uint count)
         {
             _messagePrefetchCount = count;
             return this;
         }
 
-        public IConsumerBuilder PriorityLevel(int priorityLevel)
+        public IConsumerBuilder<TMessage> PriorityLevel(int priorityLevel)
         {
             _priorityLevel = priorityLevel;
             return this;
         }
 
-        public IConsumerBuilder ReadCompacted(bool readCompacted)
+        public IConsumerBuilder<TMessage> ReadCompacted(bool readCompacted)
         {
             _readCompacted = readCompacted;
             return this;
         }
 
-        public IConsumerBuilder StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler)
+        public IConsumerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler)
         {
             _stateChangedHandler = handler;
             return this;
         }
 
-        public IConsumerBuilder StateChangedHandler(Action<ConsumerStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
-        {
-            _stateChangedHandler = new ActionStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken);
-            return this;
-        }
-
-        public IConsumerBuilder StateChangedHandler(Func<ConsumerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
-        {
-            _stateChangedHandler = new FuncStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken);
-            return this;
-        }
-
-        public IConsumerBuilder SubscriptionName(string name)
+        public IConsumerBuilder<TMessage> SubscriptionName(string name)
         {
             _subscriptionName = name;
             return this;
         }
 
-        public IConsumerBuilder SubscriptionType(SubscriptionType type)
+        public IConsumerBuilder<TMessage> SubscriptionType(SubscriptionType type)
         {
             _subscriptionType = type;
             return this;
         }
 
-        public IConsumerBuilder Topic(string topic)
+        public IConsumerBuilder<TMessage> Topic(string topic)
         {
             _topic = topic;
             return this;
         }
 
-        public IConsumer Create()
+        public IConsumer<TMessage> Create()
         {
             if (string.IsNullOrEmpty(_subscriptionName))
                 throw new ConfigurationException("SubscriptionName may not be null or empty");
@@ -117,7 +104,7 @@
             if (string.IsNullOrEmpty(_topic))
                 throw new ConfigurationException("Topic may not be null or empty");
 
-            var options = new ConsumerOptions(_subscriptionName!, _topic!)
+            var options = new ConsumerOptions<TMessage>(_subscriptionName!, _topic!, _schema)
             {
                 ConsumerName = _consumerName,
                 InitialPosition = _initialPosition,
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 479e89e..3dfe343 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
+    using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
     using Extensions;
     using PulsarApi;
@@ -23,13 +24,14 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class ConsumerChannel : IConsumerChannel
+    public sealed class ConsumerChannel<TMessage> : IConsumerChannel<TMessage>
     {
         private readonly ulong _id;
         private readonly AsyncQueue<MessagePackage> _queue;
         private readonly IConnection _connection;
-        private readonly BatchHandler _batchHandler;
+        private readonly BatchHandler<TMessage> _batchHandler;
         private readonly CommandFlow _cachedCommandFlow;
+        private readonly IMessageFactory<TMessage> _messageFactory;
         private readonly IDecompress?[] _decompressors;
         private readonly AsyncLock _lock;
         private uint _sendWhenZero;
@@ -40,13 +42,15 @@
             uint messagePrefetchCount,
             AsyncQueue<MessagePackage> queue,
             IConnection connection,
-            BatchHandler batchHandler,
+            BatchHandler<TMessage> batchHandler,
+            IMessageFactory<TMessage> messageFactory,
             IEnumerable<IDecompressorFactory> decompressorFactories)
         {
             _id = id;
             _queue = queue;
             _connection = connection;
             _batchHandler = batchHandler;
+            _messageFactory = messageFactory;
 
             _decompressors = new IDecompress[5];
 
@@ -67,7 +71,7 @@
             _firstFlow = true;
         }
 
-        public async ValueTask<Message> Receive(CancellationToken cancellationToken)
+        public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken)
         {
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
@@ -115,9 +119,20 @@
                     var messageId = messagePackage.MessageId;
                     var redeliveryCount = messagePackage.RedeliveryCount;
 
-                    return metadata.ShouldSerializeNumMessagesInBatch()
-                        ? _batchHandler.Add(messageId, redeliveryCount, metadata, data)
-                        : MessageFactory.Create(messageId.ToMessageId(), redeliveryCount, metadata, data);
+                    if (metadata.ShouldSerializeNumMessagesInBatch())
+                    {
+                        try
+                        {
+                            return _batchHandler.Add(messageId, redeliveryCount, metadata, data);
+                        }
+                        catch
+                        {
+                            await RejectPackage(messagePackage, CommandAck.ValidationErrorType.BatchDeSerializeError, cancellationToken).ConfigureAwait(false);
+                            continue;
+                        }
+                    }
+
+                    return _messageFactory.Create(messageId.ToMessageId(), redeliveryCount, data, metadata);
                 }
             }
         }
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 5d20ce5..24954f7 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -21,47 +21,44 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class ConsumerChannelFactory : IConsumerChannelFactory
+    public sealed class ConsumerChannelFactory<TMessage> : IConsumerChannelFactory<TMessage>
     {
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
         private readonly IConnectionPool _connectionPool;
-        private readonly IExecute _executor;
         private readonly CommandSubscribe _subscribe;
         private readonly uint _messagePrefetchCount;
-        private readonly BatchHandler _batchHandler;
+        private readonly BatchHandler<TMessage> _batchHandler;
+        private readonly IMessageFactory<TMessage> _messageFactory;
         private readonly IEnumerable<IDecompressorFactory> _decompressorFactories;
 
         public ConsumerChannelFactory(
             Guid correlationId,
             IRegisterEvent eventRegister,
             IConnectionPool connectionPool,
-            IExecute executor,
             CommandSubscribe subscribe,
             uint messagePrefetchCount,
-            BatchHandler batchHandler,
+            BatchHandler<TMessage> batchHandler,
+            IMessageFactory<TMessage> messageFactory,
             IEnumerable<IDecompressorFactory> decompressorFactories)
         {
             _correlationId = correlationId;
             _eventRegister = eventRegister;
             _connectionPool = connectionPool;
-            _executor = executor;
             _subscribe = subscribe;
             _messagePrefetchCount = messagePrefetchCount;
             _batchHandler = batchHandler;
+            _messageFactory = messageFactory;
             _decompressorFactories = decompressorFactories;
         }
 
-        public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
-            => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
-
-        private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellationToken)
+        public async Task<IConsumerChannel<TMessage>> Create(CancellationToken cancellationToken)
         {
             var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, messageQueue);
             var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
-            return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _decompressorFactories);
+            return new ConsumerChannel<TMessage>(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _messageFactory, _decompressorFactories);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs b/src/DotPulsar/Internal/ConsumerProcess.cs
index a078146..f2296c2 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -21,19 +21,16 @@
     public sealed class ConsumerProcess : Process
     {
         private readonly IStateManager<ConsumerState> _stateManager;
-        private readonly IConsumerChannelFactory _factory;
-        private readonly Consumer _consumer;
+        private readonly IEstablishNewChannel _consumer;
         private readonly bool _isFailoverSubscription;
 
         public ConsumerProcess(
             Guid correlationId,
             IStateManager<ConsumerState> stateManager,
-            IConsumerChannelFactory factory,
-            Consumer consumer,
+            IEstablishNewChannel consumer,
             bool isFailoverSubscription) : base(correlationId)
         {
             _stateManager = stateManager;
-            _factory = factory;
             _consumer = consumer;
             _isFailoverSubscription = isFailoverSubscription;
         }
@@ -47,7 +44,7 @@
 
         protected override void CalculateState()
         {
-            if (_consumer.IsFinalState())
+            if (_stateManager.IsFinalState())
                 return;
 
             if (ExecutorState == ExecutorState.Faulted)
@@ -67,7 +64,7 @@
                 case ChannelState.ClosedByServer:
                 case ChannelState.Disconnected:
                     _stateManager.SetState(ConsumerState.Disconnected);
-                    SetupChannel();
+                    _ = _consumer.EstablishNewChannel(CancellationTokenSource.Token);
                     return;
                 case ChannelState.Connected:
                     if (!_isFailoverSubscription)
@@ -81,18 +78,5 @@
                     return;
             }
         }
-
-        private async void SetupChannel()
-        {
-            try
-            {
-                var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
-                await _consumer.SetChannel(channel).ConfigureAwait(false);
-            }
-            catch
-            {
-                // ignored
-            }
-        }
     }
 }
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
new file mode 100644
index 0000000..09c8464
--- /dev/null
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using System.Diagnostics;
+
+    public static class DotPulsarActivitySource
+    {
+        static DotPulsarActivitySource()
+        {
+            ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
+        }
+
+        public static ActivitySource ActivitySource { get; }
+    }
+}
diff --git a/src/DotPulsar/Internal/Events/ConsumerCreated.cs b/src/DotPulsar/Internal/Events/ConsumerCreated.cs
index 19edea8..41c5513 100644
--- a/src/DotPulsar/Internal/Events/ConsumerCreated.cs
+++ b/src/DotPulsar/Internal/Events/ConsumerCreated.cs
@@ -19,13 +19,9 @@
 
     public sealed class ConsumerCreated : IEvent
     {
-        public ConsumerCreated(Guid correlationId, Consumer consumer)
-        {
-            CorrelationId = correlationId;
-            Consumer = consumer;
-        }
+        public ConsumerCreated(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
-        public Consumer Consumer { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/Events/ConsumerDisposed.cs b/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
index ffe3bdf..130f9c9 100644
--- a/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
@@ -19,13 +19,9 @@
 
     public sealed class ConsumerDisposed : IEvent
     {
-        public ConsumerDisposed(Guid correlationId, Consumer consumer)
-        {
-            CorrelationId = correlationId;
-            Consumer = consumer;
-        }
+        public ConsumerDisposed(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
-        public Consumer Consumer { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/Events/ProducerCreated.cs b/src/DotPulsar/Internal/Events/ProducerCreated.cs
index 1c77f06..2a89db7 100644
--- a/src/DotPulsar/Internal/Events/ProducerCreated.cs
+++ b/src/DotPulsar/Internal/Events/ProducerCreated.cs
@@ -19,13 +19,9 @@
 
     public sealed class ProducerCreated : IEvent
     {
-        public ProducerCreated(Guid correlationId, Producer producer)
-        {
-            CorrelationId = correlationId;
-            Producer = producer;
-        }
+        public ProducerCreated(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
-        public Producer Producer { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/Events/ProducerDisposed.cs b/src/DotPulsar/Internal/Events/ProducerDisposed.cs
index 849f6d9..223024f 100644
--- a/src/DotPulsar/Internal/Events/ProducerDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ProducerDisposed.cs
@@ -19,13 +19,9 @@
 
     public sealed class ProducerDisposed : IEvent
     {
-        public ProducerDisposed(Guid correlationId, Producer producer)
-        {
-            CorrelationId = correlationId;
-            Producer = producer;
-        }
+        public ProducerDisposed(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
-        public Producer Producer { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/Events/ReaderCreated.cs b/src/DotPulsar/Internal/Events/ReaderCreated.cs
index 905e58d..1aa55f1 100644
--- a/src/DotPulsar/Internal/Events/ReaderCreated.cs
+++ b/src/DotPulsar/Internal/Events/ReaderCreated.cs
@@ -19,13 +19,9 @@
 
     public sealed class ReaderCreated : IEvent
     {
-        public ReaderCreated(Guid correlationId, Reader reader)
-        {
-            CorrelationId = correlationId;
-            Reader = reader;
-        }
+        public ReaderCreated(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
-        public Reader Reader { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/Events/ReaderDisposed.cs b/src/DotPulsar/Internal/Events/ReaderDisposed.cs
index c1bd28c..33a9ac6 100644
--- a/src/DotPulsar/Internal/Events/ReaderDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ReaderDisposed.cs
@@ -19,13 +19,9 @@
 
     public sealed class ReaderDisposed : IEvent
     {
-        public ReaderDisposed(Guid correlationId, Reader reader)
-        {
-            CorrelationId = correlationId;
-            Reader = reader;
-        }
+        public ReaderDisposed(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
-        public Reader Reader { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 0365895..867c21d 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -40,8 +40,11 @@
         public static void Throw(this CommandLookupTopicResponse command)
             => Throw(command.Error, command.Message);
 
-        public static void Throw(this CommandError error)
-            => Throw(error.Error, error.Message);
+        public static void Throw(this CommandError command)
+            => Throw(command.Error, command.Message);
+
+        public static void Throw(this CommandGetOrCreateSchemaResponse command)
+            => Throw(command.ErrorCode, command.ErrorMessage);
 
         private static void Throw(ServerError error, string message)
             => throw (error switch
@@ -177,5 +180,12 @@
                 CommandType = BaseCommand.Type.RedeliverUnacknowledgedMessages,
                 RedeliverUnacknowledgedMessages = command
             };
+
+        public static BaseCommand AsBaseCommand(this CommandGetOrCreateSchema command)
+            => new BaseCommand
+            {
+                CommandType = BaseCommand.Type.GetOrCreateSchema,
+                GetOrCreateSchema = command
+            };
     }
 }
diff --git a/src/DotPulsar/Internal/Message.cs b/src/DotPulsar/Internal/Message.cs
new file mode 100644
index 0000000..a5b0972
--- /dev/null
+++ b/src/DotPulsar/Internal/Message.cs
@@ -0,0 +1,98 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+
+    public sealed class Message<TValue> : IMessage<TValue>
+    {
+        private readonly ISchema<TValue> _schema;
+
+        internal Message(
+            MessageId messageId,
+            ReadOnlySequence<byte> data,
+            string producerName,
+            ulong sequenceId,
+            uint redeliveryCount,
+            ulong eventTime,
+            ulong publishTime,
+            IReadOnlyDictionary<string, string> properties,
+            bool hasBase64EncodedKey,
+            string? key,
+            byte[]? orderingKey,
+            byte[]? schemaVersion,
+            ISchema<TValue> schema)
+        {
+            MessageId = messageId;
+            Data = data;
+            ProducerName = producerName;
+            SequenceId = sequenceId;
+            RedeliveryCount = redeliveryCount;
+            EventTime = eventTime;
+            PublishTime = publishTime;
+            Properties = properties;
+            HasBase64EncodedKey = hasBase64EncodedKey;
+            Key = key;
+            OrderingKey = orderingKey;
+            SchemaVersion = schemaVersion;
+            _schema = schema;
+        }
+
+        public MessageId MessageId { get; }
+
+        public ReadOnlySequence<byte> Data { get; }
+
+        public string ProducerName { get; }
+
+        public byte[]? SchemaVersion { get; }
+
+        public ulong SequenceId { get; }
+
+        public uint RedeliveryCount { get; }
+
+        public bool HasEventTime => EventTime != 0;
+
+        public ulong EventTime { get; }
+
+        public DateTime EventTimeAsDateTime => EventTimeAsDateTimeOffset.UtcDateTime;
+
+        public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) EventTime);
+
+        public bool HasBase64EncodedKey { get; }
+
+        public bool HasKey => Key is not null;
+
+        public string? Key { get; }
+
+        public byte[]? KeyBytes => Key is not null ? Convert.FromBase64String(Key) : null;
+
+        public bool HasOrderingKey => OrderingKey is not null;
+
+        public byte[]? OrderingKey { get; }
+
+        public ulong PublishTime { get; }
+
+        public DateTime PublishTimeAsDateTime => PublishTimeAsDateTimeOffset.UtcDateTime;
+
+        public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) PublishTime);
+
+        public IReadOnlyDictionary<string, string> Properties { get; }
+
+        public TValue Value => _schema.Decode(Data);
+    }
+}
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index c17ec19..44b3812 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -15,90 +15,95 @@
 namespace DotPulsar.Internal
 {
     using DotPulsar.Abstractions;
-    using DotPulsar.Extensions;
     using Extensions;
     using System;
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class MessageBuilder : IMessageBuilder
+    public sealed class MessageBuilder<TMessage> : IMessageBuilder<TMessage>
     {
-        private readonly IProducer _producer;
+        private readonly IProducer<TMessage> _producer;
         private readonly MessageMetadata _metadata;
 
-        public MessageBuilder(IProducer producer)
+        public MessageBuilder(IProducer<TMessage> producer)
         {
             _producer = producer;
             _metadata = new MessageMetadata();
         }
 
-        public IMessageBuilder DeliverAt(long timestamp)
+        public IMessageBuilder<TMessage> DeliverAt(long timestamp)
         {
             _metadata.Metadata.DeliverAtTime = timestamp;
             return this;
         }
 
-        public IMessageBuilder DeliverAt(DateTime timestamp)
+        public IMessageBuilder<TMessage> DeliverAt(DateTime timestamp)
         {
             _metadata.Metadata.SetDeliverAtTime(timestamp);
             return this;
         }
 
-        public IMessageBuilder DeliverAt(DateTimeOffset timestamp)
+        public IMessageBuilder<TMessage> DeliverAt(DateTimeOffset timestamp)
         {
             _metadata.Metadata.SetDeliverAtTime(timestamp);
             return this;
         }
 
-        public IMessageBuilder EventTime(ulong eventTime)
+        public IMessageBuilder<TMessage> EventTime(ulong eventTime)
         {
             _metadata.Metadata.EventTime = eventTime;
             return this;
         }
 
-        public IMessageBuilder EventTime(DateTime eventTime)
+        public IMessageBuilder<TMessage> EventTime(DateTime eventTime)
         {
             _metadata.Metadata.SetEventTime(eventTime);
             return this;
         }
 
-        public IMessageBuilder EventTime(DateTimeOffset eventTime)
+        public IMessageBuilder<TMessage> EventTime(DateTimeOffset eventTime)
         {
             _metadata.Metadata.SetEventTime(eventTime);
             return this;
         }
 
-        public IMessageBuilder Key(string key)
+        public IMessageBuilder<TMessage> Key(string key)
         {
             _metadata.Metadata.SetKey(key);
             return this;
         }
 
-        public IMessageBuilder KeyBytes(byte[] key)
+        public IMessageBuilder<TMessage> KeyBytes(byte[] key)
         {
             _metadata.Metadata.SetKey(key);
             return this;
         }
 
-        public IMessageBuilder OrderingKey(byte[] key)
+        public IMessageBuilder<TMessage> OrderingKey(byte[] key)
         {
             _metadata.Metadata.OrderingKey = key;
             return this;
         }
 
-        public IMessageBuilder Property(string key, string value)
+        public IMessageBuilder<TMessage> Property(string key, string value)
         {
             _metadata[key] = value;
             return this;
         }
 
-        public IMessageBuilder SequenceId(ulong sequenceId)
+        public IMessageBuilder<TMessage> SchemaVersion(byte[] schemaVersion)
+        {
+            _metadata.Metadata.SchemaVersion = schemaVersion;
+            return this;
+        }
+
+        public IMessageBuilder<TMessage> SequenceId(ulong sequenceId)
         {
             _metadata.Metadata.SequenceId = sequenceId;
             return this;
         }
 
-        public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
-            => await _producer.Send(_metadata, data, cancellationToken).ConfigureAwait(false);
+        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
+            => await _producer.Send(_metadata, message, cancellationToken).ConfigureAwait(false);
     }
 }
diff --git a/src/DotPulsar/Internal/MessageFactory.cs b/src/DotPulsar/Internal/MessageFactory.cs
index 87bf110..1f68425 100644
--- a/src/DotPulsar/Internal/MessageFactory.cs
+++ b/src/DotPulsar/Internal/MessageFactory.cs
@@ -14,11 +14,13 @@
 
 namespace DotPulsar.Internal
 {
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal.Abstractions;
     using DotPulsar.Internal.PulsarApi;
     using System.Buffers;
     using System.Collections.Generic;
 
-    public static class MessageFactory
+    public sealed class MessageFactory<TValue> : IMessageFactory<TValue>
     {
         private static readonly Dictionary<string, string> _empty;
 
@@ -40,13 +42,26 @@
             return dictionary;
         }
 
-        public static Message Create(
+        private readonly ISchema<TValue> _schema;
+
+        public MessageFactory(ISchema<TValue> schema)
+            => _schema = schema;
+
+        public IMessage<TValue> Create(MessageId messageId, uint redeliveryCount, ReadOnlySequence<byte> data, MessageMetadata metadata, SingleMessageMetadata? singleMetadata = null)
+        {
+            if (singleMetadata is null)
+                return Create(messageId, redeliveryCount, metadata, data);
+
+            return Create(messageId, redeliveryCount, metadata, singleMetadata, data);
+        }
+
+        private Message<TValue> Create(
             MessageId messageId,
             uint redeliveryCount,
             MessageMetadata metadata,
             ReadOnlySequence<byte> data)
         {
-            return new Message(
+            return new Message<TValue>(
                 messageId: messageId,
                 data: data,
                 producerName: metadata.ProducerName,
@@ -57,17 +72,19 @@
                 properties: FromKeyValueList(metadata.Properties),
                 hasBase64EncodedKey: metadata.PartitionKeyB64Encoded,
                 key: metadata.PartitionKey,
-                orderingKey: metadata.OrderingKey);
+                orderingKey: metadata.OrderingKey,
+                schemaVersion: metadata.SchemaVersion,
+                _schema);
         }
 
-        public static Message Create(
+        private Message<TValue> Create(
             MessageId messageId,
             uint redeliveryCount,
             MessageMetadata metadata,
             SingleMessageMetadata singleMetadata,
             ReadOnlySequence<byte> data)
         {
-            return new Message(
+            return new Message<TValue>(
                 messageId: messageId,
                 data: data,
                 producerName: metadata.ProducerName,
@@ -78,23 +95,9 @@
                 properties: FromKeyValueList(singleMetadata.Properties),
                 hasBase64EncodedKey: singleMetadata.PartitionKeyB64Encoded,
                 key: singleMetadata.PartitionKey,
-                orderingKey: singleMetadata.OrderingKey);
+                orderingKey: singleMetadata.OrderingKey,
+                schemaVersion: metadata.SchemaVersion,
+                _schema);
         }
-
-        /// <summary>
-        /// Intended for testing.
-        /// </summary>
-        public static Message Create(
-            MessageId messageId,
-            ReadOnlySequence<byte> data,
-            string producerName,
-            ulong sequenceId,
-            uint redeliveryCount,
-            ulong eventTime,
-            ulong publishTime,
-            IReadOnlyDictionary<string, string> properties,
-            bool hasBase64EncodedKey,
-            string? key,
-            byte[]? orderingKey) => new Message(messageId, data, producerName, sequenceId, redeliveryCount, eventTime, publishTime, properties, hasBase64EncodedKey, key, orderingKey);
     }
 }
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index 75e724f..4427752 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
+    using DotPulsar.Abstractions;
     using Exceptions;
     using PulsarApi;
     using System;
@@ -22,7 +23,7 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel
+    public sealed class NotReadyChannel<TMessage> : IConsumerChannel<TMessage>, IProducerChannel
     {
         public ValueTask DisposeAsync()
             => new ValueTask();
@@ -30,7 +31,7 @@
         public ValueTask ClosedByClient(CancellationToken cancellationToken)
             => new ValueTask();
 
-        public ValueTask<Message> Receive(CancellationToken cancellationToken = default)
+        public ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken = default)
             => throw GetException();
 
         public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 02f9bd3..5264b25 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -25,7 +25,7 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Producer : IProducer
+    public sealed class Producer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
     {
         private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
         private readonly Guid _correlationId;
@@ -33,6 +33,8 @@
         private IProducerChannel _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ProducerState> _state;
+        private readonly IProducerChannelFactory _factory;
+        private readonly ISchema<TMessage> _schema;
         private readonly SequenceId _sequenceId;
         private int _isDisposed;
 
@@ -47,7 +49,9 @@
             IRegisterEvent registerEvent,
             IProducerChannel initialChannel,
             IExecute executor,
-            IStateChanged<ProducerState> state)
+            IStateChanged<ProducerState> state,
+            IProducerChannelFactory factory,
+            ISchema<TMessage> schema)
         {
             var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
             _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
@@ -59,9 +63,11 @@
             _channel = initialChannel;
             _executor = executor;
             _state = state;
+            _factory = factory;
+            _schema = schema;
             _isDisposed = 0;
 
-            _eventRegister.Register(new ProducerCreated(_correlationId, this));
+            _eventRegister.Register(new ProducerCreated(_correlationId));
         }
 
         public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
@@ -81,10 +87,15 @@
             if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
                 return;
 
-            _eventRegister.Register(new ProducerDisposed(_correlationId, this));
+            _eventRegister.Register(new ProducerDisposed(_correlationId));
             await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
+        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
+            => await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
+            => await Send(metadata, _schema.Encode(message), cancellationToken).ConfigureAwait(false);
 
         public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
@@ -127,13 +138,9 @@
             return response.MessageId.ToMessageId();
         }
 
-        internal async ValueTask SetChannel(IProducerChannel channel)
+        public async Task EstablishNewChannel(CancellationToken cancellationToken)
         {
-            if (_isDisposed != 0)
-            {
-                await channel.DisposeAsync().ConfigureAwait(false);
-                return;
-            }
+            var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
 
             var oldChannel = _channel;
             _channel = channel;
@@ -145,7 +152,7 @@
         private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-                throw new ProducerDisposedException();
+                throw new ProducerDisposedException(GetType().FullName!);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 4d66d6d..420b2a2 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -16,74 +16,61 @@
 {
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
-    using System;
-    using System.Threading;
-    using System.Threading.Tasks;
 
-    public sealed class ProducerBuilder : IProducerBuilder
+    public sealed class ProducerBuilder<TMessage> : IProducerBuilder<TMessage>
     {
         private readonly IPulsarClient _pulsarClient;
+        private readonly ISchema<TMessage> _schema;
         private string? _producerName;
         private CompressionType _compressionType;
         private ulong _initialSequenceId;
         private string? _topic;
         private IHandleStateChanged<ProducerStateChanged>? _stateChangedHandler;
 
-        public ProducerBuilder(IPulsarClient pulsarClient)
+        public ProducerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
         {
             _pulsarClient = pulsarClient;
-            _compressionType = ProducerOptions.DefaultCompressionType;
-            _initialSequenceId = ProducerOptions.DefaultInitialSequenceId;
+            _schema = schema;
+            _compressionType = ProducerOptions<TMessage>.DefaultCompressionType;
+            _initialSequenceId = ProducerOptions<TMessage>.DefaultInitialSequenceId;
         }
 
-        public IProducerBuilder CompressionType(CompressionType compressionType)
+        public IProducerBuilder<TMessage> CompressionType(CompressionType compressionType)
         {
             _compressionType = compressionType;
             return this;
         }
 
-        public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
+        public IProducerBuilder<TMessage> InitialSequenceId(ulong initialSequenceId)
         {
             _initialSequenceId = initialSequenceId;
             return this;
         }
 
-        public IProducerBuilder ProducerName(string name)
+        public IProducerBuilder<TMessage> ProducerName(string name)
         {
             _producerName = name;
             return this;
         }
 
-        public IProducerBuilder StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler)
+        public IProducerBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler)
         {
             _stateChangedHandler = handler;
             return this;
         }
 
-        public IProducerBuilder StateChangedHandler(Action<ProducerStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
-        {
-            _stateChangedHandler = new ActionStateChangedHandler<ProducerStateChanged>(handler, cancellationToken);
-            return this;
-        }
-
-        public IProducerBuilder StateChangedHandler(Func<ProducerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
-        {
-            _stateChangedHandler = new FuncStateChangedHandler<ProducerStateChanged>(handler, cancellationToken);
-            return this;
-        }
-
-        public IProducerBuilder Topic(string topic)
+        public IProducerBuilder<TMessage> Topic(string topic)
         {
             _topic = topic;
             return this;
         }
 
-        public IProducer Create()
+        public IProducer<TMessage> Create()
         {
             if (string.IsNullOrEmpty(_topic))
                 throw new ConfigurationException("ProducerOptions.Topic may not be null or empty");
 
-            var options = new ProducerOptions(_topic!)
+            var options = new ProducerOptions<TMessage>(_topic!, _schema)
             {
                 CompressionType = _compressionType,
                 InitialSequenceId = _initialSequenceId,
@@ -91,7 +78,7 @@
                 StateChangedHandler = _stateChangedHandler
             };
 
-            return _pulsarClient.CreateProducer(options);
+            return _pulsarClient.CreateProducer<TMessage>(options);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index fd9bd0c..20ae831 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -30,12 +30,14 @@
         private readonly string _name;
         private readonly IConnection _connection;
         private readonly ICompressorFactory? _compressorFactory;
+        private readonly byte[]? _schemaVersion;
 
         public ProducerChannel(
             ulong id,
             string name,
             IConnection connection,
-            ICompressorFactory? compressorFactory)
+            ICompressorFactory? compressorFactory,
+            byte[]? schemaVersion)
         {
             var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
             _sendPackagePool = new DefaultObjectPool<SendPackage>(sendPackagePolicy);
@@ -43,6 +45,7 @@
             _name = name;
             _connection = connection;
             _compressorFactory = compressorFactory;
+            _schemaVersion = schemaVersion;
         }
 
         public async ValueTask ClosedByClient(CancellationToken cancellationToken)
@@ -69,6 +72,9 @@
                 metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
                 metadata.ProducerName = _name;
 
+                if (metadata.SchemaVersion is null && _schemaVersion is not null)
+                    metadata.SchemaVersion = _schemaVersion;
+
                 if (sendPackage.Command is null)
                 {
                     sendPackage.Command = new CommandSend
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index a869666..23ef8c5 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
+    using DotPulsar.Internal.Extensions;
     using PulsarApi;
     using System;
     using System.Threading;
@@ -25,41 +26,60 @@
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
         private readonly IConnectionPool _connectionPool;
-        private readonly IExecute _executor;
         private readonly CommandProducer _commandProducer;
         private readonly ICompressorFactory? _compressorFactory;
+        private readonly Schema? _schema;
 
         public ProducerChannelFactory(
             Guid correlationId,
             IRegisterEvent eventRegister,
             IConnectionPool connectionPool,
-            IExecute executor,
-            ProducerOptions options,
+            string topic,
+            string? producerName,
+            SchemaInfo schemaInfo,
             ICompressorFactory? compressorFactory)
         {
             _correlationId = correlationId;
             _eventRegister = eventRegister;
             _connectionPool = connectionPool;
-            _executor = executor;
 
             _commandProducer = new CommandProducer
             {
-                ProducerName = options.ProducerName,
-                Topic = options.Topic
+                ProducerName = producerName,
+                Topic = topic
             };
 
             _compressorFactory = compressorFactory;
+            _schema = schemaInfo.PulsarSchema;
         }
 
         public async Task<IProducerChannel> Create(CancellationToken cancellationToken)
-            => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
-
-        private async ValueTask<IProducerChannel> GetChannel(CancellationToken cancellationToken)
         {
             var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken).ConfigureAwait(false);
+            var schemaVersion = await GetSchemaVersion(connection, cancellationToken).ConfigureAwait(false);
             var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
             var response = await connection.Send(_commandProducer, channel, cancellationToken).ConfigureAwait(false);
-            return new ProducerChannel(response.ProducerId, response.ProducerName, connection, _compressorFactory);
+            return new ProducerChannel(response.ProducerId, response.ProducerName, connection, _compressorFactory, schemaVersion);
+        }
+
+        private async ValueTask<byte[]?> GetSchemaVersion(IConnection connection, CancellationToken cancellationToken)
+        {
+            if (_schema is null)
+                return null;
+
+            var command = new CommandGetOrCreateSchema
+            {
+                Schema = _schema,
+                Topic = _commandProducer.Topic
+            };
+
+            var response = await connection.Send(command, cancellationToken).ConfigureAwait(false);
+
+            response.Expect(BaseCommand.Type.GetOrCreateSchemaResponse);
+            if (response.GetOrCreateSchemaResponse.ShouldSerializeErrorCode())
+                response.GetOrCreateSchemaResponse.Throw();
+
+            return response.GetOrCreateSchemaResponse.SchemaVersion;
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index 81861e3..7e0b37f 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -21,17 +21,14 @@
     public sealed class ProducerProcess : Process
     {
         private readonly IStateManager<ProducerState> _stateManager;
-        private readonly IProducerChannelFactory _factory;
-        private readonly Producer _producer;
+        private readonly IEstablishNewChannel _producer;
 
         public ProducerProcess(
             Guid correlationId,
             IStateManager<ProducerState> stateManager,
-            IProducerChannelFactory factory,
-            Producer producer) : base(correlationId)
+            IEstablishNewChannel producer) : base(correlationId)
         {
             _stateManager = stateManager;
-            _factory = factory;
             _producer = producer;
         }
 
@@ -44,7 +41,7 @@
 
         protected override void CalculateState()
         {
-            if (_producer.IsFinalState())
+            if (_stateManager.IsFinalState())
                 return;
 
             if (ExecutorState == ExecutorState.Faulted)
@@ -58,25 +55,12 @@
                 case ChannelState.ClosedByServer:
                 case ChannelState.Disconnected:
                     _stateManager.SetState(ProducerState.Disconnected);
-                    SetupChannel();
+                    _ = _producer.EstablishNewChannel(CancellationTokenSource.Token);
                     return;
                 case ChannelState.Connected:
                     _stateManager.SetState(ProducerState.Connected);
                     return;
             }
         }
-
-        private async void SetupChannel()
-        {
-            try
-            {
-                var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
-                await _producer.SetChannel(channel).ConfigureAwait(false);
-            }
-            catch
-            {
-                // ignored
-            }
-        }
     }
 }
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 120d5e5..fc1a2e0 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -21,7 +21,6 @@
     using System.Collections.Generic;
     using System.Security.Cryptography.X509Certificates;
     using System.Text;
-    using System.Threading.Tasks;
 
     public sealed class PulsarClientBuilder : IPulsarClientBuilder
     {
@@ -79,18 +78,6 @@
             return this;
         }
 
-        public IPulsarClientBuilder ExceptionHandler(Action<ExceptionContext> exceptionHandler)
-        {
-            _exceptionHandlers.Add(new ActionExceptionHandler(exceptionHandler));
-            return this;
-        }
-
-        public IPulsarClientBuilder ExceptionHandler(Func<ExceptionContext, ValueTask> exceptionHandler)
-        {
-            _exceptionHandlers.Add(new FuncExceptionHandler(exceptionHandler));
-            return this;
-        }
-
         public IPulsarClientBuilder RetryInterval(TimeSpan interval)
         {
             _retryInterval = interval;
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 731e4cd..bffcb2d 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -23,13 +23,14 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class Reader : IReader
+    public sealed class Reader<TMessage> : IEstablishNewChannel, IReader<TMessage>
     {
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
-        private IConsumerChannel _channel;
+        private IConsumerChannel<TMessage> _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ReaderState> _state;
+        private readonly IConsumerChannelFactory<TMessage> _factory;
         private int _isDisposed;
 
         public Uri ServiceUrl { get; }
@@ -40,9 +41,10 @@
             Uri serviceUrl,
             string topic,
             IRegisterEvent eventRegister,
-            IConsumerChannel initialChannel,
+            IConsumerChannel<TMessage> initialChannel,
             IExecute executor,
-            IStateChanged<ReaderState> state)
+            IStateChanged<ReaderState> state,
+            IConsumerChannelFactory<TMessage> factory)
         {
             _correlationId = correlationId;
             ServiceUrl = serviceUrl;
@@ -51,9 +53,10 @@
             _channel = initialChannel;
             _executor = executor;
             _state = state;
+            _factory = factory;
             _isDisposed = 0;
 
-            _eventRegister.Register(new ReaderCreated(_correlationId, this));
+            _eventRegister.Register(new ReaderCreated(_correlationId));
         }
 
         public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state, CancellationToken cancellationToken)
@@ -79,14 +82,14 @@
         private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
             => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask<Message> Receive(CancellationToken cancellationToken)
+        public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
             return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
-        private async ValueTask<Message> ReceiveMessage(CancellationToken cancellationToken)
+        private async ValueTask<IMessage<TMessage>> ReceiveMessage(CancellationToken cancellationToken)
             => await _channel.Receive(cancellationToken).ConfigureAwait(false);
 
         public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
@@ -110,7 +113,7 @@
             if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
                 return;
 
-            _eventRegister.Register(new ReaderDisposed(_correlationId, this));
+            _eventRegister.Register(new ReaderDisposed(_correlationId));
             await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
@@ -118,13 +121,9 @@
         private async Task Seek(CommandSeek command, CancellationToken cancellationToken)
             => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
-        internal async ValueTask SetChannel(IConsumerChannel channel)
+        public async Task EstablishNewChannel(CancellationToken cancellationToken)
         {
-            if (_isDisposed != 0)
-            {
-                await channel.DisposeAsync().ConfigureAwait(false);
-                return;
-            }
+            var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
 
             var oldChannel = _channel;
             _channel = channel;
@@ -136,7 +135,7 @@
         private void ThrowIfDisposed()
         {
             if (_isDisposed != 0)
-                throw new ReaderDisposedException();
+                throw new ReaderDisposedException(GetType().FullName!);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
index 4936cd0..f9090da 100644
--- a/src/DotPulsar/Internal/ReaderBuilder.cs
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -16,13 +16,11 @@
 {
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
-    using System;
-    using System.Threading;
-    using System.Threading.Tasks;
 
-    public sealed class ReaderBuilder : IReaderBuilder
+    public sealed class ReaderBuilder<TMessage> : IReaderBuilder<TMessage>
     {
         private readonly IPulsarClient _pulsarClient;
+        private readonly ISchema<TMessage> _schema;
         private string? _readerName;
         private uint _messagePrefetchCount;
         private bool _readCompacted;
@@ -30,62 +28,51 @@
         private string? _topic;
         private IHandleStateChanged<ReaderStateChanged>? _stateChangedHandler;
 
-        public ReaderBuilder(IPulsarClient pulsarClient)
+        public ReaderBuilder(IPulsarClient pulsarClient, ISchema<TMessage> schema)
         {
             _pulsarClient = pulsarClient;
-            _messagePrefetchCount = ReaderOptions.DefaultMessagePrefetchCount;
-            _readCompacted = ReaderOptions.DefaultReadCompacted;
+            _schema = schema;
+            _messagePrefetchCount = ReaderOptions<TMessage>.DefaultMessagePrefetchCount;
+            _readCompacted = ReaderOptions<TMessage>.DefaultReadCompacted;
         }
 
-        public IReaderBuilder MessagePrefetchCount(uint count)
+        public IReaderBuilder<TMessage> MessagePrefetchCount(uint count)
         {
             _messagePrefetchCount = count;
             return this;
         }
 
-        public IReaderBuilder ReadCompacted(bool readCompacted)
+        public IReaderBuilder<TMessage> ReadCompacted(bool readCompacted)
         {
             _readCompacted = readCompacted;
             return this;
         }
 
-        public IReaderBuilder ReaderName(string name)
+        public IReaderBuilder<TMessage> ReaderName(string name)
         {
             _readerName = name;
             return this;
         }
 
-        public IReaderBuilder StartMessageId(MessageId messageId)
+        public IReaderBuilder<TMessage> StartMessageId(MessageId messageId)
         {
             _startMessageId = messageId;
             return this;
         }
 
-        public IReaderBuilder StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler)
+        public IReaderBuilder<TMessage> StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler)
         {
             _stateChangedHandler = handler;
             return this;
         }
 
-        public IReaderBuilder StateChangedHandler(Action<ReaderStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
-        {
-            _stateChangedHandler = new ActionStateChangedHandler<ReaderStateChanged>(handler, cancellationToken);
-            return this;
-        }
-
-        public IReaderBuilder StateChangedHandler(Func<ReaderStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
-        {
-            _stateChangedHandler = new FuncStateChangedHandler<ReaderStateChanged>(handler, cancellationToken);
-            return this;
-        }
-
-        public IReaderBuilder Topic(string topic)
+        public IReaderBuilder<TMessage> Topic(string topic)
         {
             _topic = topic;
             return this;
         }
 
-        public IReader Create()
+        public IReader<TMessage> Create()
         {
             if (_startMessageId is null)
                 throw new ConfigurationException("StartMessageId may not be null");
@@ -93,7 +80,7 @@
             if (string.IsNullOrEmpty(_topic))
                 throw new ConfigurationException("Topic may not be null or empty");
 
-            var options = new ReaderOptions(_startMessageId, _topic!)
+            var options = new ReaderOptions<TMessage>(_startMessageId, _topic!, _schema)
             {
                 MessagePrefetchCount = _messagePrefetchCount,
                 ReadCompacted = _readCompacted,
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index 404de7e..5dd8cd1 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -21,17 +21,14 @@
     public sealed class ReaderProcess : Process
     {
         private readonly IStateManager<ReaderState> _stateManager;
-        private readonly IConsumerChannelFactory _factory;
-        private readonly Reader _reader;
+        private readonly IEstablishNewChannel _reader;
 
         public ReaderProcess(
             Guid correlationId,
             IStateManager<ReaderState> stateManager,
-            IConsumerChannelFactory factory,
-            Reader reader) : base(correlationId)
+            IEstablishNewChannel reader) : base(correlationId)
         {
             _stateManager = stateManager;
-            _factory = factory;
             _reader = reader;
         }
 
@@ -44,7 +41,7 @@
 
         protected override void CalculateState()
         {
-            if (_reader.IsFinalState())
+            if (_stateManager.IsFinalState())
                 return;
 
             if (ExecutorState == ExecutorState.Faulted)
@@ -58,7 +55,7 @@
                 case ChannelState.ClosedByServer:
                 case ChannelState.Disconnected:
                     _stateManager.SetState(ReaderState.Disconnected);
-                    SetupChannel();
+                    _ = _reader.EstablishNewChannel(CancellationTokenSource.Token);
                     return;
                 case ChannelState.Connected:
                     _stateManager.SetState(ReaderState.Connected);
@@ -68,18 +65,5 @@
                     return;
             }
         }
-
-        private async void SetupChannel()
-        {
-            try
-            {
-                var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
-                await _reader.SetChannel(channel).ConfigureAwait(false);
-            }
-            catch
-            {
-                // ignored
-            }
-        }
     }
 }
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index ae7bdee..55777f6 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -79,6 +79,9 @@
                 case BaseCommand.Type.GetLastMessageId:
                     cmd.GetLastMessageId.RequestId = _requestId.FetchNext();
                     return;
+                case BaseCommand.Type.GetOrCreateSchema:
+                    cmd.GetOrCreateSchema.RequestId = _requestId.FetchNext();
+                    return;
             }
         }
 
@@ -103,6 +106,8 @@
                 BaseCommand.Type.CloseConsumer => cmd.CloseConsumer.RequestId.ToString(),
                 BaseCommand.Type.GetLastMessageId => cmd.GetLastMessageId.RequestId.ToString(),
                 BaseCommand.Type.GetLastMessageIdResponse => cmd.GetLastMessageIdResponse.RequestId.ToString(),
+                BaseCommand.Type.GetOrCreateSchema => cmd.GetOrCreateSchema.RequestId.ToString(),
+                BaseCommand.Type.GetOrCreateSchemaResponse => cmd.GetOrCreateSchemaResponse.RequestId.ToString(),
                 _ => throw new ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType, "CommandType not supported as request/response type")
             };
     }
diff --git a/src/DotPulsar/Extensions/SendExtensions.cs b/src/DotPulsar/Internal/SendExtensions.cs
similarity index 80%
rename from src/DotPulsar/Extensions/SendExtensions.cs
rename to src/DotPulsar/Internal/SendExtensions.cs
index 41ace1e..535408b 100644
--- a/src/DotPulsar/Extensions/SendExtensions.cs
+++ b/src/DotPulsar/Internal/SendExtensions.cs
@@ -28,25 +28,25 @@
         /// <summary>
         /// Sends a message.
         /// </summary>
-        public static async ValueTask<MessageId> Send(this ISend sender, byte[] data, CancellationToken cancellationToken = default)
+        public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, byte[] data, CancellationToken cancellationToken = default)
             => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
 
         /// <summary>
         /// Sends a message.
         /// </summary>
-        public static async ValueTask<MessageId> Send(this ISend sender, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+        public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
             => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
 
         /// <summary>
         /// Sends a message with metadata.
         /// </summary>
-        public static async ValueTask<MessageId> Send(this ISend sender, MessageMetadata metadata, byte[] data, CancellationToken cancellationToken = default)
+        public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, MessageMetadata metadata, byte[] data, CancellationToken cancellationToken = default)
             => await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
 
         /// <summary>
         /// Sends a message with metadata.
         /// </summary>
-        public static async ValueTask<MessageId> Send(this ISend sender, MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+        public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
             => await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
     }
 }
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
deleted file mode 100644
index 32575d4..0000000
--- a/src/DotPulsar/Message.cs
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar
-{
-    using System;
-    using System.Buffers;
-    using System.Collections.Generic;
-
-    /// <summary>
-    /// The message received by consumers and readers.
-    /// </summary>
-    public sealed class Message
-    {
-        internal Message(
-            MessageId messageId,
-            ReadOnlySequence<byte> data,
-            string producerName,
-            ulong sequenceId,
-            uint redeliveryCount,
-            ulong eventTime,
-            ulong publishTime,
-            IReadOnlyDictionary<string, string> properties,
-            bool hasBase64EncodedKey,
-            string? key,
-            byte[]? orderingKey)
-        {
-            MessageId = messageId;
-            Data = data;
-            ProducerName = producerName;
-            SequenceId = sequenceId;
-            RedeliveryCount = redeliveryCount;
-            EventTime = eventTime;
-            PublishTime = publishTime;
-            Properties = properties;
-            HasBase64EncodedKey = hasBase64EncodedKey;
-            Key = key;
-            OrderingKey = orderingKey;
-        }
-
-        /// <summary>
-        /// The id of the message.
-        /// </summary>
-        public MessageId MessageId { get; }
-
-        /// <summary>
-        /// The raw payload of the message.
-        /// </summary>
-        public ReadOnlySequence<byte> Data { get; }
-
-        /// <summary>
-        /// The name of the producer who produced the message.
-        /// </summary>
-        public string ProducerName { get; }
-
-        /// <summary>
-        /// The sequence id of the message.
-        /// </summary>
-        public ulong SequenceId { get; }
-
-        /// <summary>
-        /// The redelivery count (maintained by the broker) of the message.
-        /// </summary>
-        public uint RedeliveryCount { get; }
-
-        /// <summary>
-        /// Check whether the message has an event time.
-        /// </summary>
-        public bool HasEventTime => EventTime != 0;
-
-        /// <summary>
-        /// The event time of the message as unix time in milliseconds.
-        /// </summary>
-        public ulong EventTime { get; }
-
-        /// <summary>
-        /// The event time of the message as an UTC DateTime.
-        /// </summary>
-        public DateTime EventTimeAsDateTime => EventTimeAsDateTimeOffset.UtcDateTime;
-
-        /// <summary>
-        /// The event time of the message as a DateTimeOffset with an offset of 0.
-        /// </summary>
-        public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) EventTime);
-
-        /// <summary>
-        /// Check whether the key been base64 encoded.
-        /// </summary>
-        public bool HasBase64EncodedKey { get; }
-
-        /// <summary>
-        /// Check whether the message has a key.
-        /// </summary>
-        public bool HasKey => Key is not null;
-
-        /// <summary>
-        /// The key as a string.
-        /// </summary>
-        public string? Key { get; }
-
-        /// <summary>
-        /// The key as bytes.
-        /// </summary>
-        public byte[]? KeyBytes => Key is not null ? Convert.FromBase64String(Key) : null;
-
-        /// <summary>
-        /// Check whether the message has an ordering key.
-        /// </summary>
-        public bool HasOrderingKey => OrderingKey is not null;
-
-        /// <summary>
-        /// The ordering key of the message.
-        /// </summary>
-        public byte[]? OrderingKey { get; }
-
-        /// <summary>
-        /// The publish time of the message as unix time in milliseconds.
-        /// </summary>
-        public ulong PublishTime { get; }
-
-        /// <summary>
-        /// The publish time of the message as an UTC DateTime.
-        /// </summary>
-        public DateTime PublishTimeAsDateTime => PublishTimeAsDateTimeOffset.UtcDateTime;
-
-        /// <summary>
-        /// The publish time of the message as a DateTimeOffset with an offset of 0.
-        /// </summary>
-        public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) PublishTime);
-
-        /// <summary>
-        /// The properties of the message.
-        /// </summary>
-        public IReadOnlyDictionary<string, string> Properties { get; }
-    }
-}
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index b9109e9..e8fa47a 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -148,6 +148,15 @@
         }
 
         /// <summary>
+        /// The schema version of the message.
+        /// </summary>
+        public byte[]? SchemaVersion
+        {
+            get => Metadata.SchemaVersion;
+            set => Metadata.SchemaVersion = value;
+        }
+
+        /// <summary>
         /// The sequence id of the message.
         /// </summary>
         public ulong SequenceId
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index f1ad688..958a1f8 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -19,7 +19,7 @@
     /// <summary>
     /// The producer building options.
     /// </summary>
-    public sealed class ProducerOptions
+    public sealed class ProducerOptions<TMessage>
     {
         /// <summary>
         /// The default compression type.
@@ -34,11 +34,12 @@
         /// <summary>
         /// Initializes a new instance using the specified topic.
         /// </summary>
-        public ProducerOptions(string topic)
+        public ProducerOptions(string topic, ISchema<TMessage> schema)
         {
             CompressionType = DefaultCompressionType;
             InitialSequenceId = DefaultInitialSequenceId;
             Topic = topic;
+            Schema = schema;
         }
 
         /// <summary>
@@ -57,6 +58,11 @@
         public string? ProducerName { get; set; }
 
         /// <summary>
+        /// Set the schema. This is required.
+        /// </summary>
+        public ISchema<TMessage> Schema { get; set; }
+
+        /// <summary>
         /// Register a state changed handler. This is optional.
         /// </summary>
         public IHandleStateChanged<ProducerStateChanged>? StateChangedHandler { get; set; }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 3ce1940..678ac83 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -60,7 +60,7 @@
         /// <summary>
         /// Create a producer.
         /// </summary>
-        public IProducer CreateProducer(ProducerOptions options)
+        public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options)
         {
             ThrowIfDisposed();
 
@@ -76,12 +76,18 @@
 
             var correlationId = Guid.NewGuid();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, compressorFactory);
+            var topic = options.Topic;
+            var producerName = options.ProducerName;
+            var schema = options.Schema;
+            var initialSequenceId = options.InitialSequenceId;
+
+            var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, compressorFactory);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-            var producer = new Producer(correlationId, ServiceUrl, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
+            var initialChannel = new NotReadyChannel<TMessage>();
+            var producer = new Producer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
-            var process = new ProducerProcess(correlationId, stateManager, factory, producer);
+            var process = new ProducerProcess(correlationId, stateManager, producer);
             _processManager.Add(process);
             process.Start();
             return producer;
@@ -90,7 +96,7 @@
         /// <summary>
         /// Create a consumer.
         /// </summary>
-        public IConsumer CreateConsumer(ConsumerOptions options)
+        public IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options)
         {
             ThrowIfDisposed();
 
@@ -107,14 +113,16 @@
                 Type = (CommandSubscribe.SubType) options.SubscriptionType
             };
             var messagePrefetchCount = options.MessagePrefetchCount;
-            var batchHandler = new BatchHandler(true);
+            var messageFactory = new MessageFactory<TMessage>(options.Schema);
+            var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
             var decompressorFactories = CompressionFactories.DecompressorFactories();
-            var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, subscribe, messagePrefetchCount, batchHandler, decompressorFactories);
+            var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
             var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
-            var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            var initialChannel = new NotReadyChannel<TMessage>();
+            var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
-            var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
+            var process = new ConsumerProcess(correlationId, stateManager, consumer, options.SubscriptionType == SubscriptionType.Failover);
             _processManager.Add(process);
             process.Start();
             return consumer;
@@ -123,7 +131,7 @@
         /// <summary>
         /// Create a reader.
         /// </summary>
-        public IReader CreateReader(ReaderOptions options)
+        public IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options)
         {
             ThrowIfDisposed();
 
@@ -139,14 +147,16 @@
                 Topic = options.Topic
             };
             var messagePrefetchCount = options.MessagePrefetchCount;
-            var batchHandler = new BatchHandler(false);
+            var messageFactory = new MessageFactory<TMessage>(options.Schema);
+            var batchHandler = new BatchHandler<TMessage>(false, messageFactory);
             var decompressorFactories = CompressionFactories.DecompressorFactories();
-            var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, subscribe, messagePrefetchCount, batchHandler, decompressorFactories);
+            var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
             var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
-            var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            var initialChannel = new NotReadyChannel<TMessage>();
+            var reader = new Reader<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
-            var process = new ReaderProcess(correlationId, stateManager, factory, reader);
+            var process = new ReaderProcess(correlationId, stateManager, reader);
             _processManager.Add(process);
             process.Start();
             return reader;
diff --git a/src/DotPulsar/ReaderOptions.cs b/src/DotPulsar/ReaderOptions.cs
index 42e4f3c..4a1e596 100644
--- a/src/DotPulsar/ReaderOptions.cs
+++ b/src/DotPulsar/ReaderOptions.cs
@@ -19,7 +19,7 @@
     /// <summary>
     /// The reader building options.
     /// </summary>
-    public sealed class ReaderOptions
+    public sealed class ReaderOptions<TMessage>
     {
         /// <summary>
         /// The default message prefetch count.
@@ -34,12 +34,13 @@
         /// <summary>
         /// Initializes a new instance using the specified startMessageId and topic.
         /// </summary>
-        public ReaderOptions(MessageId startMessageId, string topic)
+        public ReaderOptions(MessageId startMessageId, string topic, ISchema<TMessage> schema)
         {
             MessagePrefetchCount = DefaultMessagePrefetchCount;
             ReadCompacted = DefaultReadCompacted;
             StartMessageId = startMessageId;
             Topic = topic;
+            Schema = schema;
         }
 
         /// <summary>
@@ -58,6 +59,11 @@
         public string? ReaderName { get; set; }
 
         /// <summary>
+        /// Set the schema. This is required.
+        /// </summary>
+        public ISchema<TMessage> Schema { get; set; }
+
+        /// <summary>
         /// The initial reader position is set to the specified message id. This is required.
         /// </summary>
         public MessageId StartMessageId { get; set; }
diff --git a/src/DotPulsar/Schema.cs b/src/DotPulsar/Schema.cs
new file mode 100644
index 0000000..6aff024
--- /dev/null
+++ b/src/DotPulsar/Schema.cs
@@ -0,0 +1,94 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Schemas;
+
+    /// <summary>
+    /// Message schema definitions.
+    /// </summary>
+    public static class Schema
+    {
+        static Schema()
+        {
+            Bytes = new ByteArraySchema();
+            String = StringSchema.UTF8;
+            Boolean = new BooleanSchema();
+            Int8 = new ByteSchema();
+            Int16 = new ShortSchema();
+            Int32 = new IntegerSchema();
+            Int64 = new LongSchema();
+            Float = new FloatSchema();
+            TimeStamp = TimestampSchema.Timestamp;
+            Date = TimestampSchema.Date;
+            Time = new TimeSchema();
+        }
+
+        /// <summary>
+        /// Raw bytes schema using byte[].
+        /// </summary>
+        public static ByteArraySchema Bytes { get; }
+
+        /// <summary>
+        /// UTF-8 schema.
+        /// </summary>
+        public static StringSchema String { get; }
+
+        /// <summary>
+        /// Boolean schema.
+        /// </summary>
+        public static BooleanSchema Boolean { get; }
+
+        /// <summary>
+        /// Byte schema.
+        /// </summary>
+        public static ByteSchema Int8 { get; }
+
+        /// <summary>
+        /// Short schema.
+        /// </summary>
+        public static ShortSchema Int16 { get; }
+
+        /// <summary>
+        /// Integer schema.
+        /// </summary>
+        public static IntegerSchema Int32 { get; }
+
+        /// <summary>
+        /// Long schema.
+        /// </summary>
+        public static LongSchema Int64 { get; }
+
+        /// <summary>
+        /// Float schema.
+        /// </summary>
+        public static FloatSchema Float { get; }
+
+        /// <summary>
+        /// Timestamp schema using DateTime.
+        /// </summary>
+        public static TimestampSchema TimeStamp { get; }
+
+        /// <summary>
+        /// Date schema using DateTime.
+        /// </summary>
+        public static TimestampSchema Date { get; }
+
+        /// <summary>
+        /// Time schema using TimeSpan.
+        /// </summary>
+        public static TimeSchema Time { get; }
+    }
+}
diff --git a/src/DotPulsar/SchemaInfo.cs b/src/DotPulsar/SchemaInfo.cs
new file mode 100644
index 0000000..8e0a375
--- /dev/null
+++ b/src/DotPulsar/SchemaInfo.cs
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using System.Collections.Generic;
+    using System.Linq;
+
+    /// <summary>
+    /// Information about the schema.
+    /// </summary>
+    public sealed class SchemaInfo
+    {
+        internal SchemaInfo(Internal.PulsarApi.Schema schema)
+            => PulsarSchema = schema;
+
+        public SchemaInfo(string name, byte[] data, SchemaType type, IReadOnlyDictionary<string, string> properties)
+        {
+            PulsarSchema = new Internal.PulsarApi.Schema
+            {
+                Name = name,
+                SchemaData = data,
+                Type = (Internal.PulsarApi.Schema.SchemaType) type,
+            };
+
+            foreach (var property in properties)
+            {
+                var keyValue = new Internal.PulsarApi.KeyValue
+                {
+                    Key = property.Key,
+                    Value = property.Value
+                };
+
+                PulsarSchema.Properties.Add(keyValue);
+            }
+        }
+
+        internal Internal.PulsarApi.Schema PulsarSchema { get; }
+
+        /// <summary>
+        /// The name of the schema.
+        /// </summary>
+        public string Name => PulsarSchema.Name;
+
+        /// <summary>
+        /// The data of the schema.
+        /// </summary>
+        public byte[] Data => PulsarSchema.SchemaData;
+
+        /// <summary>
+        /// The type of the schema.
+        /// </summary>
+        public SchemaType Type => (SchemaType) PulsarSchema.Type;
+
+        /// <summary>
+        /// The properties of the schema.
+        /// </summary>
+        public IReadOnlyDictionary<string, string> Properties => PulsarSchema.Properties.ToDictionary(p => p.Key, p => p.Value);
+    }
+}
diff --git a/src/DotPulsar/SchemaType.cs b/src/DotPulsar/SchemaType.cs
new file mode 100644
index 0000000..2c9f57a
--- /dev/null
+++ b/src/DotPulsar/SchemaType.cs
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    /// <summary>
+    /// The supported schema types for messages.
+    /// </summary>
+    public enum SchemaType : byte
+    {
+        /// <summary>
+        /// No schema.
+        /// </summary>
+        None = 0,
+
+        /// <summary>
+        /// UTF-8 schema.
+        /// </summary>
+        String = 1,
+
+        /// <summary>
+        /// JSON schema.
+        /// </summary>
+        Json = 2,
+
+        /// <summary>
+        /// Protobuf schema.
+        /// </summary>
+        Protobuf = 3,
+
+        /// <summary>
+        /// Avro schema.
+        /// </summary>
+        Avro = 4,
+
+        /// <summary>
+        /// Boolean schema.
+        /// </summary>
+        Boolean = 5,
+
+        /// <summary>
+        /// 8-byte integer schema.
+        /// </summary>
+        Int8 = 6,
+
+        /// <summary>
+        /// 16-byte integer schema.
+        /// </summary>
+        Int16 = 7,
+
+        /// <summary>
+        ///32-byte integer schema.
+        /// </summary>
+        Int32 = 8,
+
+        /// <summary>
+        /// 64-byte integer schema.
+        /// </summary>
+        Int64 = 9,
+
+        /// <summary>
+        /// Float schema.
+        /// </summary>
+        Float = 10,
+
+        /// <summary>
+        /// Double schema.
+        /// </summary>
+        Double = 11,
+
+        /// <summary>
+        /// Date schema.
+        /// </summary>
+        Date = 12,
+
+        /// <summary>
+        /// Time schema.
+        /// </summary>
+        Time = 13,
+
+        /// <summary>
+        /// Timestamp schema.
+        /// </summary>
+        Timestamp = 14,
+
+        /// <summary>
+        /// KeyValue schema.
+        /// </summary>
+        KeyValue = 15,
+
+        /// <summary>
+        /// Instant schema.
+        /// </summary>
+        Instant = 16,
+
+        /// <summary>
+        /// Local date schema.
+        /// </summary>
+        LocalDate = 17,
+
+        /// <summary>
+        /// Local time schema.
+        /// </summary>
+        LocalTime = 18,
+
+        /// <summary>
+        /// Local data time schema.
+        /// </summary>
+        LocalDateTime = 19,
+
+        /// <summary>
+        /// Protobuf native schema.
+        /// </summary>
+        ProtobufNative = 20
+    }
+}
diff --git a/src/DotPulsar/Schemas/BooleanSchema.cs b/src/DotPulsar/Schemas/BooleanSchema.cs
new file mode 100644
index 0000000..82a7844
--- /dev/null
+++ b/src/DotPulsar/Schemas/BooleanSchema.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+
+    /// <summary>
+    /// Schema definition for Boolean messages.
+    /// </summary>
+    public sealed class BooleanSchema : ISchema<bool>
+    {
+        private readonly static ReadOnlySequence<byte> _true;
+        private readonly static ReadOnlySequence<byte> _false;
+
+        static BooleanSchema()
+        {
+            _true = new ReadOnlySequence<byte>(new byte[] { 1 });
+            _false = new ReadOnlySequence<byte>(new byte[] { 0 });
+        }
+
+        public BooleanSchema()
+            => SchemaInfo = new SchemaInfo("Boolean", Array.Empty<byte>(), SchemaType.Boolean, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public bool Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 1)
+                throw new SchemaSerializationException($"{nameof(BooleanSchema)} expected to decode 1 byte, but received {bytes} bytes");
+
+            return bytes.First.Span[0] != 0;
+        }
+
+        public ReadOnlySequence<byte> Encode(bool message)
+            => message ? _true : _false;
+    }
+}
diff --git a/src/DotPulsar/Schemas/ByteArraySchema.cs b/src/DotPulsar/Schemas/ByteArraySchema.cs
new file mode 100644
index 0000000..979562c
--- /dev/null
+++ b/src/DotPulsar/Schemas/ByteArraySchema.cs
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+
+    /// <summary>
+    /// Schema definition for raw messages using byte[].
+    /// </summary>
+    public sealed class ByteArraySchema : ISchema<byte[]>
+    {
+        public ByteArraySchema()
+            => SchemaInfo = new SchemaInfo("Bytes", Array.Empty<byte>(), SchemaType.None, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public byte[] Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+            => bytes.ToArray();
+
+        public ReadOnlySequence<byte> Encode(byte[] message)
+            => new(message);
+    }
+}
diff --git a/src/DotPulsar/Schemas/ByteSchema.cs b/src/DotPulsar/Schemas/ByteSchema.cs
new file mode 100644
index 0000000..14caebc
--- /dev/null
+++ b/src/DotPulsar/Schemas/ByteSchema.cs
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+
+    /// <summary>
+    /// Schema definition for Byte (int8) messages.
+    /// </summary>
+    public sealed class ByteSchema : ISchema<byte>
+    {
+        public ByteSchema()
+            => SchemaInfo = new SchemaInfo("INT8", Array.Empty<byte>(), SchemaType.Int8, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public byte Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 1)
+                throw new SchemaSerializationException($"{nameof(ByteSchema)} expected to decode 1 byte, but received {bytes} bytes");
+
+            return bytes.First.Span[0];
+        }
+
+        public ReadOnlySequence<byte> Encode(byte message)
+            => new(new[] { message });
+    }
+}
diff --git a/src/DotPulsar/Schemas/ByteSequenceSchema.cs b/src/DotPulsar/Schemas/ByteSequenceSchema.cs
new file mode 100644
index 0000000..d8c8282
--- /dev/null
+++ b/src/DotPulsar/Schemas/ByteSequenceSchema.cs
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+
+    /// <summary>
+    /// Schema definition for raw messages using ReadOnlySequence of bytes.
+    /// </summary>
+    public sealed class ByteSequenceSchema : ISchema<ReadOnlySequence<byte>>
+    {
+        public ByteSequenceSchema()
+            => SchemaInfo = new SchemaInfo("Bytes", Array.Empty<byte>(), SchemaType.None, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public ReadOnlySequence<byte> Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+            => bytes;
+
+        public ReadOnlySequence<byte> Encode(ReadOnlySequence<byte> message)
+            => message;
+    }
+}
diff --git a/src/DotPulsar/Schemas/DoubleSchema.cs b/src/DotPulsar/Schemas/DoubleSchema.cs
new file mode 100644
index 0000000..0f7552e
--- /dev/null
+++ b/src/DotPulsar/Schemas/DoubleSchema.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+    using System.Linq;
+    using System.Runtime.InteropServices;
+
+    /// <summary>
+    /// Schema definition for Double messages.
+    /// </summary>
+    public sealed class DoubleSchema : ISchema<double>
+    {
+        public DoubleSchema()
+            => SchemaInfo = new SchemaInfo("Double", Array.Empty<byte>(), SchemaType.Double, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public double Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 8)
+                throw new SchemaSerializationException($"{nameof(DoubleSchema)} expected to decode 8 bytes, but received {bytes} bytes");
+
+            var array = bytes.ToArray();
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return MemoryMarshal.Read<double>(array);
+        }
+
+        public ReadOnlySequence<byte> Encode(double message)
+        {
+            var array = BitConverter.GetBytes(message);
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return new(array);
+        }
+    }
+}
diff --git a/src/DotPulsar/Schemas/FloatSchema .cs b/src/DotPulsar/Schemas/FloatSchema .cs
new file mode 100644
index 0000000..3040669
--- /dev/null
+++ b/src/DotPulsar/Schemas/FloatSchema .cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+    using System.Linq;
+    using System.Runtime.InteropServices;
+
+    /// <summary>
+    /// Schema definition for Float messages.
+    /// </summary>
+    public sealed class FloatSchema : ISchema<float>
+    {
+        public FloatSchema()
+            => SchemaInfo = new SchemaInfo("Float", Array.Empty<byte>(), SchemaType.Float, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public float Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 4)
+                throw new SchemaSerializationException($"{nameof(FloatSchema)} expected to decode 4 bytes, but received {bytes} bytes");
+
+            var array = bytes.ToArray();
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return MemoryMarshal.Read<float>(array);
+        }
+
+        public ReadOnlySequence<byte> Encode(float message)
+        {
+            var array = BitConverter.GetBytes(message);
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return new(array);
+        }
+    }
+}
diff --git a/src/DotPulsar/Schemas/IntegerSchema.cs b/src/DotPulsar/Schemas/IntegerSchema.cs
new file mode 100644
index 0000000..69d079b
--- /dev/null
+++ b/src/DotPulsar/Schemas/IntegerSchema.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+    using System.Linq;
+    using System.Runtime.InteropServices;
+
+    /// <summary>
+    /// Schema definition for Integer (int32) messages.
+    /// </summary>
+    public sealed class IntegerSchema : ISchema<int>
+    {
+        public IntegerSchema()
+            => SchemaInfo = new SchemaInfo("INT32", Array.Empty<byte>(), SchemaType.Int32, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public int Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 4)
+                throw new SchemaSerializationException($"{nameof(IntegerSchema)} expected to decode 4 bytes, but received {bytes} bytes");
+
+            var array = bytes.ToArray();
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return MemoryMarshal.Read<int>(array);
+        }
+
+        public ReadOnlySequence<byte> Encode(int message)
+        {
+            var array = BitConverter.GetBytes(message);
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return new(array);
+        }
+    }
+}
diff --git a/src/DotPulsar/Schemas/LongSchema.cs b/src/DotPulsar/Schemas/LongSchema.cs
new file mode 100644
index 0000000..be40c4f
--- /dev/null
+++ b/src/DotPulsar/Schemas/LongSchema.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+    using System.Linq;
+    using System.Runtime.InteropServices;
+
+    /// <summary>
+    /// Schema definition for Long (int64) messages.
+    /// </summary>
+    public sealed class LongSchema : ISchema<long>
+    {
+        public LongSchema()
+            => SchemaInfo = new SchemaInfo("INT64", Array.Empty<byte>(), SchemaType.Int64, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public long Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 8)
+                throw new SchemaSerializationException($"{nameof(LongSchema)} expected to decode 8 bytes, but received {bytes} bytes");
+
+            var array = bytes.ToArray();
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return MemoryMarshal.Read<long>(array);
+        }
+
+        public ReadOnlySequence<byte> Encode(long message)
+        {
+            var array = BitConverter.GetBytes(message);
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return new(array);
+        }
+    }
+}
diff --git a/src/DotPulsar/Schemas/ShortSchema.cs b/src/DotPulsar/Schemas/ShortSchema.cs
new file mode 100644
index 0000000..3c84100
--- /dev/null
+++ b/src/DotPulsar/Schemas/ShortSchema.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+    using System.Linq;
+    using System.Runtime.InteropServices;
+
+    /// <summary>
+    /// Schema definition for Short (int16) messages.
+    /// </summary>
+    public sealed class ShortSchema : ISchema<short>
+    {
+        public ShortSchema()
+            => SchemaInfo = new SchemaInfo("INT16", Array.Empty<byte>(), SchemaType.Int16, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public short Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 2)
+                throw new SchemaSerializationException($"{nameof(ShortSchema)} expected to decode 2 bytes, but received {bytes} bytes");
+
+            var array = bytes.ToArray();
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return MemoryMarshal.Read<short>(array);
+        }
+
+        public ReadOnlySequence<byte> Encode(short message)
+        {
+            var array = BitConverter.GetBytes(message);
+
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(array);
+
+            return new(array);
+        }
+    }
+}
diff --git a/src/DotPulsar/Schemas/StringSchema.cs b/src/DotPulsar/Schemas/StringSchema.cs
new file mode 100644
index 0000000..50a688b
--- /dev/null
+++ b/src/DotPulsar/Schemas/StringSchema.cs
@@ -0,0 +1,110 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.Text;
+
+    /// <summary>
+    /// Schema definition for UTF-8 (default), UTF-16 (unicode) or US-ASCII encoded strings.
+    /// </summary>
+    public sealed class StringSchema : ISchema<string>
+    {
+        private const string _charSetKey = "__charset";
+        private const string _utf8 = "UTF-8";
+        private const string _unicode = "UTF-16";
+        private const string _ascii = "US-ASCII";
+
+        static StringSchema()
+        {
+            UTF8 = new StringSchema(Encoding.UTF8);
+            Unicode = new StringSchema(Encoding.Unicode);
+            ASCII = new StringSchema(Encoding.ASCII);
+        }
+
+        /// <summary>
+        /// Schema definition for UTF-8 encoded strings.
+        /// </summary>
+        public static StringSchema UTF8 { get; }
+
+        /// <summary>
+        /// Schema definition for UTF-16 encoded strings.
+        /// </summary>
+        public static StringSchema Unicode { get; }
+
+        /// <summary>
+        /// Schema definition for US-ASCII encoded strings.
+        /// </summary>
+        public static StringSchema ASCII { get; }
+
+        private static string GetCharSet(string encodingName)
+        {
+            return encodingName switch
+            {
+                "Unicode (UTF-8)" => _utf8,
+                "Unicode" => _unicode,
+                "US-ASCII" => _ascii,
+                _ => throw new Exception($"Encoding '{encodingName}' is not supported!")
+            };
+        }
+
+        private static StringSchema GetSchema(string charSet)
+        {
+            return charSet switch
+            {
+                _utf8 => UTF8,
+                _unicode => Unicode,
+                _ascii => ASCII,
+                _ => throw new Exception($"CharSet '{charSet}' is not supported!")
+            };
+        }
+
+        public static StringSchema From(SchemaInfo schemaInfo)
+        {
+            if (schemaInfo.Type != SchemaType.String)
+                throw new Exception("Not a string schema!");
+
+            if (schemaInfo.Properties.TryGetValue(_charSetKey, out var charset))
+                return GetSchema(charset);
+            else
+                return UTF8;
+        }
+
+        private readonly Encoding _encoding;
+
+        public StringSchema(Encoding encoding)
+        {
+            _encoding = encoding;
+
+            var properties = new Dictionary<string, string>
+            {
+                { _charSetKey, GetCharSet(encoding.EncodingName) }
+            };
+
+            SchemaInfo = new SchemaInfo("String", Array.Empty<byte>(), SchemaType.String, properties);
+        }
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public string Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion)
+            => _encoding.GetString(bytes.ToArray());
+
+        public ReadOnlySequence<byte> Encode(string message)
+            => new(_encoding.GetBytes(message));
+    }
+}
diff --git a/src/DotPulsar/Schemas/TimeSchema.cs b/src/DotPulsar/Schemas/TimeSchema.cs
new file mode 100644
index 0000000..67d6775
--- /dev/null
+++ b/src/DotPulsar/Schemas/TimeSchema.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+
+    /// <summary>
+    /// Schema definition for Time (TimeSpan) messages.
+    /// </summary>
+    public sealed class TimeSchema : ISchema<TimeSpan>
+    {
+        public TimeSchema()
+            => SchemaInfo = new SchemaInfo("Time", Array.Empty<byte>(), SchemaType.Time, ImmutableDictionary<string, string>.Empty);
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public TimeSpan Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 8)
+                throw new SchemaSerializationException($"{nameof(TimestampSchema)} expected to decode 8 bytes, but received {bytes} bytes");
+
+            var milliseconds = Schema.Int64.Decode(bytes);
+            return TimeSpan.FromMilliseconds(milliseconds);
+        }
+
+        public ReadOnlySequence<byte> Encode(TimeSpan message)
+        {
+            var milliseconds = (long) message.TotalMilliseconds;
+            return Schema.Int64.Encode(milliseconds);
+        }
+    }
+}
diff --git a/src/DotPulsar/Schemas/TimestampSchema.cs b/src/DotPulsar/Schemas/TimestampSchema.cs
new file mode 100644
index 0000000..18bee0a
--- /dev/null
+++ b/src/DotPulsar/Schemas/TimestampSchema.cs
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Immutable;
+
+    /// <summary>
+    /// Schema definition for Timestamp (DateTime) messages.
+    /// </summary>
+    public sealed class TimestampSchema : ISchema<DateTime>
+    {
+        static TimestampSchema()
+        {
+            Timestamp = new TimestampSchema(new SchemaInfo("Timestamp", Array.Empty<byte>(), SchemaType.Timestamp, ImmutableDictionary<string, string>.Empty));
+            Date = new TimestampSchema(new SchemaInfo("Date", Array.Empty<byte>(), SchemaType.Date, ImmutableDictionary<string, string>.Empty));
+        }
+
+        public static TimestampSchema Timestamp { get; }
+        public static TimestampSchema Date { get; }
+
+        public TimestampSchema(SchemaInfo schemaInfo)
+            => SchemaInfo = schemaInfo;
+
+        public SchemaInfo SchemaInfo { get; }
+
+        public DateTime Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+        {
+            if (bytes.Length != 8)
+                throw new SchemaSerializationException($"{nameof(TimestampSchema)} expected to decode 8 bytes, but received {bytes} bytes");
+
+            var milliseconds = Schema.Int64.Decode(bytes);
+
+            if (milliseconds < -62135596800000)
+                return DateTime.MinValue;
+
+            if (milliseconds > 253402300799999)
+                return DateTime.MaxValue;
+
+            return DateTimeOffset.FromUnixTimeMilliseconds(milliseconds).DateTime;
+        }
+
+        public ReadOnlySequence<byte> Encode(DateTime message)
+        {
+            var milliseconds = new DateTimeOffset(message).ToUnixTimeMilliseconds();
+            return Schema.Int64.Encode(milliseconds);
+        }
+    }
+}
diff --git a/tests/DotPulsar.StressTests/ConnectionTests.cs b/tests/DotPulsar.StressTests/ConnectionTests.cs
index 3f350b4..da6e663 100644
--- a/tests/DotPulsar.StressTests/ConnectionTests.cs
+++ b/tests/DotPulsar.StressTests/ConnectionTests.cs
@@ -48,7 +48,7 @@
 
             await using var client = builder.Build();
 
-            await using var producer = client.NewProducer()
+            await using var producer = client.NewProducer(Schema.Bytes)
                 .ProducerName($"producer-{testRunId}")
                 .Topic(topic)
                 .Create();
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs b/tests/DotPulsar.StressTests/ConsumerTests.cs
index 2bc3421..df51edb 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -15,7 +15,7 @@
 namespace DotPulsar.StressTests
 {
     using DotPulsar.Abstractions;
-    using Extensions;
+    using DotPulsar.Extensions;
     using Fixtures;
     using FluentAssertions;
     using System;
@@ -48,14 +48,14 @@
                 .ServiceUrl(new Uri("pulsar://localhost:54545"))
                 .Build();
 
-            await using var consumer = client.NewConsumer()
+            await using var consumer = client.NewConsumer(Schema.Bytes)
                 .ConsumerName($"consumer-{testRunId}")
                 .InitialPosition(SubscriptionInitialPosition.Earliest)
                 .SubscriptionName($"subscription-{testRunId}")
                 .Topic(topic)
                 .Create();
 
-            await using var producer = client.NewProducer()
+            await using var producer = client.NewProducer(Schema.Bytes)
                 .ProducerName($"producer-{testRunId}")
                 .Topic(topic)
                 .Create();
@@ -70,7 +70,7 @@
             consumed.Should().BeEquivalentTo(produced);
         }
 
-        private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer producer, int numberOfMessages, CancellationToken ct)
+        private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer<byte[]> producer, int numberOfMessages, CancellationToken ct)
         {
             var messageIds = new MessageId[numberOfMessages];
 
@@ -83,7 +83,7 @@
             return messageIds;
         }
 
-        private static async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer consumer, int numberOfMessages, CancellationToken ct)
+        private static async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer<byte[]> consumer, int numberOfMessages, CancellationToken ct)
         {
             var messageIds = new List<MessageId>(numberOfMessages);
 
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 1550a89..4cdb2b0 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -6,13 +6,13 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
     <PackageReference Include="xunit" Version="2.4.1" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
-    <PackageReference Include="coverlet.collector" Version="3.0.2">
+    <PackageReference Include="coverlet.collector" Version="3.0.3">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index d460de3..cef7485 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -12,7 +12,7 @@
  * limitations under the License.
  */
 
-#pragma warning disable 8601
+#pragma warning disable 8601, 8618
 
 namespace DotPulsar.StressTests
 {
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 8d40129..9ffdd6c 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -7,13 +7,13 @@
 
   <ItemGroup>
     <PackageReference Include="FluentAssertions" Version="5.10.3" />
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
     <PackageReference Include="xunit" Version="2.4.1" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
-    <PackageReference Include="coverlet.collector" Version="3.0.2">
+    <PackageReference Include="coverlet.collector" Version="3.0.3">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
diff --git a/tests/docker-compose-standalone-tests.yml b/tests/docker-compose-standalone-tests.yml
index 5dde15b..b9b3cd9 100644
--- a/tests/docker-compose-standalone-tests.yml
+++ b/tests/docker-compose-standalone-tests.yml
@@ -4,7 +4,7 @@
 
   pulsar:
     container_name: pulsar-stresstests
-    image: 'apachepulsar/pulsar:2.6.1'
+    image: 'apachepulsar/pulsar:2.7.0'
     ports:
       - '54546:8080'
       - '54545:6650'