Implemented compression using LZ4 (K4os.Compression.LZ4), Snappy (IronSnappy), Zlib (DotNetZip) and Zstd (ZstdNet). Just need to wire it up, test it and create documentation. This will be in the next release.
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 2cdc6ef..930d2ca 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -24,6 +24,11 @@
     public interface IProducerBuilder
     {
         /// <summary>
+        /// Set the compression type. The default is 'None'.
+        /// </summary>
+        IProducerBuilder CompressionType(CompressionType compressionType);
+
+        /// <summary>
         /// Set the initial sequence id. The default is 0.
         /// </summary>
         IProducerBuilder InitialSequenceId(ulong initialSequenceId);
diff --git a/src/DotPulsar/CompressionType.cs b/src/DotPulsar/CompressionType.cs
new file mode 100644
index 0000000..2e55628
--- /dev/null
+++ b/src/DotPulsar/CompressionType.cs
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    /// <summary>
+    /// The compression types that can be set on a producer.
+    /// </summary>
+    public enum CompressionType : byte
+    {
+        /// <summary>
+        /// No compression.
+        /// </summary>
+        None = 0,
+
+        /// <summary>
+        /// Compress with LZ4.
+        /// </summary>
+        Lz4 = 1,
+
+        /// <summary>
+        /// Compress with zlib.
+        /// </summary>
+        Zlib = 2,
+
+        /// <summary>
+        /// Compress with zstd.
+        /// </summary>
+        Zstd = 3,
+
+        /// <summary>
+        /// Compress with Snappy.
+        /// </summary>
+        Snappy = 4
+    }
+}
diff --git a/src/DotPulsar/Exceptions/CompressionException.cs b/src/DotPulsar/Exceptions/CompressionException.cs
new file mode 100644
index 0000000..be6d12a
--- /dev/null
+++ b/src/DotPulsar/Exceptions/CompressionException.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+    public sealed class CompressionException : DotPulsarException
+    {
+        public CompressionException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/ICompress.cs b/src/DotPulsar/Internal/Abstractions/ICompress.cs
new file mode 100644
index 0000000..39ea5a0
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/ICompress.cs
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+    using System.Buffers;
+
+    public interface ICompress : IDisposable
+    {
+        ReadOnlySequence<byte> Compress(ReadOnlySequence<byte> data);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs b/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs
new file mode 100644
index 0000000..ecb2fdf
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface ICompressorFactory
+    {
+        ICompress Create();
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IDecompress.cs b/src/DotPulsar/Internal/Abstractions/IDecompress.cs
new file mode 100644
index 0000000..0584a26
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IDecompress.cs
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+    using System.Buffers;
+
+    public interface IDecompress : IDisposable
+    {
+        ReadOnlySequence<byte> Decompress(ReadOnlySequence<byte> data, int decompressedSize);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs b/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs
new file mode 100644
index 0000000..4770dfc
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IDecompressorFactory
+    {
+        IDecompress Create();
+    }
+}
diff --git a/src/DotPulsar/Internal/AsyncLockExecutor.cs b/src/DotPulsar/Internal/AsyncLockExecutor.cs
deleted file mode 100644
index 05349d2..0000000
--- a/src/DotPulsar/Internal/AsyncLockExecutor.cs
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
-{
-    using Abstractions;
-    using System;
-    using System.Threading;
-    using System.Threading.Tasks;
-
-    public sealed class AsyncLockExecutor : IExecute, IAsyncDisposable
-    {
-        private readonly AsyncLock _lock;
-        private readonly IExecute _executor;
-
-        public AsyncLockExecutor(IExecute executor)
-        {
-            _lock = new AsyncLock();
-            _executor = executor;
-        }
-
-        public ValueTask DisposeAsync()
-            => _lock.DisposeAsync();
-
-        public async ValueTask Execute(Action action, CancellationToken cancellationToken)
-        {
-            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
-            {
-                await _executor.Execute(action, cancellationToken).ConfigureAwait(false);
-            }
-        }
-
-        public async ValueTask Execute(Func<Task> func, CancellationToken cancellationToken)
-        {
-            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
-            {
-                await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
-            }
-        }
-
-        public async ValueTask Execute(Func<ValueTask> func, CancellationToken cancellationToken)
-        {
-            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
-            {
-                await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
-            }
-        }
-
-        public async ValueTask<TResult> Execute<TResult>(Func<TResult> func, CancellationToken cancellationToken)
-        {
-            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
-            {
-                return await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
-            }
-        }
-
-        public async ValueTask<TResult> Execute<TResult>(Func<Task<TResult>> func, CancellationToken cancellationToken)
-        {
-            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
-            {
-                return await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
-            }
-        }
-
-        public async ValueTask<TResult> Execute<TResult>(Func<ValueTask<TResult>> func, CancellationToken cancellationToken)
-        {
-            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
-            {
-                return await _executor.Execute(func, cancellationToken).ConfigureAwait(false);
-            }
-        }
-    }
-}
diff --git a/src/DotPulsar/Internal/Compression/Compressor.cs b/src/DotPulsar/Internal/Compression/Compressor.cs
new file mode 100644
index 0000000..96c05fa
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/Compressor.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Internal.Abstractions;
+    using System;
+    using System.Buffers;
+
+    public sealed class Compressor : ICompress
+    {
+        private readonly IDisposable? _disposable;
+        private readonly Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> _compress;
+
+        public Compressor(Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> compress, IDisposable? disposable = null)
+        {
+            _disposable = disposable;
+            _compress = compress;
+        }
+
+        public Compressor(Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> compress)
+            => _compress = compress;
+
+        public ReadOnlySequence<byte> Compress(ReadOnlySequence<byte> data)
+            => _compress(data);
+
+        public void Dispose() => _disposable?.Dispose();
+    }
+}
diff --git a/src/DotPulsar/Internal/Compression/CompressorFactory.cs b/src/DotPulsar/Internal/Compression/CompressorFactory.cs
new file mode 100644
index 0000000..7e01684
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/CompressorFactory.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Internal.Abstractions;
+    using System;
+
+    public sealed class CompressorFactory : ICompressorFactory
+    {
+        private readonly Func<ICompress> _create;
+
+        public CompressorFactory(Func<ICompress> create)
+            => _create = create;
+
+        public ICompress Create()
+            => _create();
+    }
+}
diff --git a/src/DotPulsar/Internal/Compression/Decompressor.cs b/src/DotPulsar/Internal/Compression/Decompressor.cs
new file mode 100644
index 0000000..281cdce
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/Decompressor.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Internal.Abstractions;
+    using System;
+    using System.Buffers;
+
+    public sealed class Decompressor : IDecompress
+    {
+        private readonly IDisposable? _disposable;
+        private readonly Func<ReadOnlySequence<byte>, int, ReadOnlySequence<byte>> _decompress;
+
+        public Decompressor(Func<ReadOnlySequence<byte>, int, ReadOnlySequence<byte>> decompress, IDisposable? disposable = null)
+        {
+            _disposable = disposable;
+            _decompress = decompress;
+        }
+
+        public ReadOnlySequence<byte> Decompress(ReadOnlySequence<byte> data, int decompressedSize)
+            => _decompress(data, decompressedSize);
+
+        public void Dispose() => _disposable?.Dispose();
+    }
+}
diff --git a/src/DotPulsar/Internal/Compression/DecompressorFactory.cs b/src/DotPulsar/Internal/Compression/DecompressorFactory.cs
new file mode 100644
index 0000000..1e448bd
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/DecompressorFactory.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Internal.Abstractions;
+    using System;
+
+    public sealed class DecompressorFactory : IDecompressorFactory
+    {
+        private readonly Func<IDecompress> _create;
+
+        public DecompressorFactory(Func<IDecompress> create)
+            => _create = create;
+
+        public IDecompress Create()
+            => _create();
+    }
+}
diff --git a/src/DotPulsar/Internal/Compression/Lz4Compression.cs b/src/DotPulsar/Internal/Compression/Lz4Compression.cs
new file mode 100644
index 0000000..a94b7b6
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/Lz4Compression.cs
@@ -0,0 +1,204 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Exceptions;
+    using DotPulsar.Internal.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Reflection;
+
+    public static class Lz4Compression
+    {
+        public delegate int Decode(byte[] source, int sourceOffset, int sourceLength, byte[] target, int targetOffset, int targetLength);
+        public delegate int Encode(byte[] source, int sourceOffset, int sourceLength, byte[] target, int targetOffset, int targetLength, int level);
+        public delegate int MaximumOutputSize(int length);
+
+        public static bool TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory)
+        {
+            try
+            {
+                var assembly = Assembly.Load("K4os.Compression.LZ4");
+
+                var definedTypes = assembly.DefinedTypes.ToArray();
+
+                var lz4Codec = FindLZ4Codec(definedTypes);
+                var lz4Level = FindLZ4Level(definedTypes);
+
+                var methods = lz4Codec.GetMethods(BindingFlags.Public | BindingFlags.Static);
+
+                var decode = FindDecode(methods);
+                var encode = FindEncode(methods, lz4Level);
+                var maximumOutputSize = FindMaximumOutputSize(methods);
+
+                compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(encode, maximumOutputSize)));
+                decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(decode)));
+                return true;
+            }
+            catch
+            { 
+                // Ignore
+            }
+
+            compressorFactory = null;
+            decompressorFactory = null;
+
+            return false;
+        }
+
+        private static TypeInfo FindLZ4Codec(IEnumerable<TypeInfo> types)
+        {
+            const string fullName = "K4os.Compression.LZ4.LZ4Codec";
+
+            foreach (var type in types)
+            {
+                if (type.FullName is null || !type.FullName.Equals(fullName))
+                    continue;
+
+                if (type.IsPublic && type.IsClass && type.IsAbstract && type.IsSealed)
+                    return type;
+
+                break;
+            }
+
+            throw new Exception($"{fullName} as a public and static class was not found");
+        }
+
+        private static TypeInfo FindLZ4Level(IEnumerable<TypeInfo> types)
+        {
+            const string fullName = "K4os.Compression.LZ4.LZ4Level";
+
+            foreach (var type in types)
+            {
+                if (type.FullName is null || !type.FullName.Equals(fullName))
+                    continue;
+
+                if (type.IsPublic && type.IsEnum && Enum.GetUnderlyingType(type) == typeof(int))
+                    return type;
+
+                break;
+            }
+
+            throw new Exception($"{fullName} as a public enum with an int backing was not found");
+        }
+
+        private static Decode FindDecode(MethodInfo[] methods)
+        {
+            const string name = "Decode";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(int))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 6)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(byte[]) ||
+                    parameters[1].ParameterType != typeof(int) ||
+                    parameters[2].ParameterType != typeof(int) ||
+                    parameters[3].ParameterType != typeof(byte[]) ||
+                    parameters[4].ParameterType != typeof(int) ||
+                    parameters[5].ParameterType != typeof(int))
+                    continue;
+
+                return (Decode) method.CreateDelegate(typeof(Decode));
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static Encode FindEncode(MethodInfo[] methods, Type lz4Level)
+        {
+            const string name = "Encode";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(int))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 7)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(byte[]) ||
+                    parameters[1].ParameterType != typeof(int) ||
+                    parameters[2].ParameterType != typeof(int) ||
+                    parameters[3].ParameterType != typeof(byte[]) ||
+                    parameters[4].ParameterType != typeof(int) ||
+                    parameters[5].ParameterType != typeof(int) ||
+                    parameters[6].ParameterType != lz4Level)
+                    continue;
+
+                return (Encode) method.CreateDelegate(typeof(Encode));
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static MaximumOutputSize FindMaximumOutputSize(MethodInfo[] methods)
+        {
+            const string name = "MaximumOutputSize";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(int))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 1)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(int))
+                    continue;
+
+                return (MaximumOutputSize) method.CreateDelegate(typeof(MaximumOutputSize));
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static Func<ReadOnlySequence<byte>, int, ReadOnlySequence<byte>> CreateDecompressor(Decode decompress)
+        {
+            return (source, size) =>
+            {
+                var decompressed = new byte[size];
+                var sourceBytes = source.ToArray();
+                var bytesDecompressed = decompress(sourceBytes, 0, sourceBytes.Length, decompressed, 0, decompressed.Length);
+                if (size == bytesDecompressed)
+                    return new ReadOnlySequence<byte>(decompressed);
+
+                throw new CompressionException($"LZ4Codec.Decode returned {bytesDecompressed} but expected {size}");
+            };
+        }
+
+        private static Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> CreateCompressor(Encode compress, MaximumOutputSize maximumOutputSize)
+        {
+            return (source) =>
+            {
+                var sourceBytes = source.ToArray();
+                var compressed = new byte[maximumOutputSize(sourceBytes.Length)];
+                var bytesCompressed = compress(sourceBytes, 0, sourceBytes.Length, compressed, 0, compressed.Length, 0);
+                if (bytesCompressed == -1)
+                    throw new CompressionException($"LZ4Codec.Encode returned -1 when compressing {sourceBytes.Length} bytes");
+
+                return new ReadOnlySequence<byte>(compressed, 0, bytesCompressed);
+            };
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Compression/SnappyCompression.cs b/src/DotPulsar/Internal/Compression/SnappyCompression.cs
new file mode 100644
index 0000000..68af96f
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/SnappyCompression.cs
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Internal.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Reflection;
+
+    public static class SnappyCompression
+    {
+        public delegate byte[] Decode(ReadOnlySpan<byte> source);
+        public delegate byte[] Encode(ReadOnlySpan<byte> source);
+
+        public static bool TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory)
+        {
+            try
+            {
+                var assembly = Assembly.Load("IronSnappy");
+
+                var definedTypes = assembly.DefinedTypes.ToArray();
+
+                var snappy = FindSnappy(definedTypes);
+
+                var methods = snappy.GetMethods(BindingFlags.Public | BindingFlags.Static);
+
+                var decode = FindDecode(methods);
+                var encode = FindEncode(methods);
+
+                compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(encode)));
+                decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(decode)));
+                return true;
+            }
+            catch
+            {
+                // Ignore
+            }
+
+            compressorFactory = null;
+            decompressorFactory = null;
+
+            return false;
+        }
+
+        private static TypeInfo FindSnappy(IEnumerable<TypeInfo> types)
+        {
+            const string fullName = "IronSnappy.Snappy";
+
+            foreach (var type in types)
+            {
+                if (type.FullName is null || !type.FullName.Equals(fullName))
+                    continue;
+
+                if (type.IsPublic && type.IsClass && type.IsAbstract && type.IsSealed)
+                    return type;
+
+                break;
+            }
+
+            throw new Exception($"{fullName} as a public and static class was not found");
+        }
+
+        private static Decode FindDecode(MethodInfo[] methods)
+        {
+            const string name = "Decode";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(byte[]))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 1)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(ReadOnlySpan<byte>))
+                    continue;
+
+                return (Decode) method.CreateDelegate(typeof(Decode));
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static Encode FindEncode(MethodInfo[] methods)
+        {
+            const string name = "Encode";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(byte[]))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 1)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(ReadOnlySpan<byte>))
+                    continue;
+
+                return (Encode) method.CreateDelegate(typeof(Encode));
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static Func<ReadOnlySequence<byte>, int, ReadOnlySequence<byte>> CreateDecompressor(Decode decompress)
+            => (source, size) => new ReadOnlySequence<byte>(decompress(source.ToArray()));
+
+        private static Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> CreateCompressor(Encode compress)
+            => (source) => new ReadOnlySequence<byte>(compress(source.ToArray()));
+    }
+}
diff --git a/src/DotPulsar/Internal/Compression/ZlibCompression.cs b/src/DotPulsar/Internal/Compression/ZlibCompression.cs
new file mode 100644
index 0000000..1af3d4d
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/ZlibCompression.cs
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Internal.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Reflection;
+
+    public static class ZlibCompression
+    {
+        public delegate byte[] CompressBuffer(byte[] source);
+        public delegate byte[] UncompressBuffer(byte[] source);
+
+        public static bool TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory)
+        {
+            try
+            {
+                var assembly = Assembly.Load("DotNetZip");
+
+                var definedTypes = assembly.DefinedTypes.ToArray();
+
+                var ZlibStream = FindZlibStream(definedTypes);
+
+                var methods = ZlibStream.GetMethods(BindingFlags.Public | BindingFlags.Static);
+
+                var compressBuffer = FindCompressBuffer(methods);
+                var uncompressBuffer = FindUncompressBuffer(methods);
+
+                compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(compressBuffer)));
+                decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(uncompressBuffer)));
+                return true;
+            }
+            catch
+            {
+                // Ignore
+            }
+
+            compressorFactory = null;
+            decompressorFactory = null;
+
+            return false;
+        }
+
+        private static TypeInfo FindZlibStream(IEnumerable<TypeInfo> types)
+        {
+            const string fullName = "Ionic.Zlib.ZlibStream";
+
+            foreach (var type in types)
+            {
+                if (type.FullName is null || !type.FullName.Equals(fullName))
+                    continue;
+
+                if (type.IsPublic && type.IsClass)
+                    return type;
+
+                break;
+            }
+
+            throw new Exception($"{fullName} as a public class was not found");
+        }
+
+        private static CompressBuffer FindCompressBuffer(MethodInfo[] methods)
+        {
+            const string name = "CompressBuffer";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(byte[]))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 1)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(byte[]))
+                    continue;
+
+                return (CompressBuffer) method.CreateDelegate(typeof(CompressBuffer));
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static UncompressBuffer FindUncompressBuffer(MethodInfo[] methods)
+        {
+            const string name = "UncompressBuffer";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(byte[]))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 1)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(byte[]))
+                    continue;
+
+                return (UncompressBuffer) method.CreateDelegate(typeof(UncompressBuffer));
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static Func<ReadOnlySequence<byte>, int, ReadOnlySequence<byte>> CreateDecompressor(UncompressBuffer decompress)
+            => (source, size) => new ReadOnlySequence<byte>(decompress(source.ToArray()));
+
+        private static Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> CreateCompressor(CompressBuffer compress)
+            => (source) => new ReadOnlySequence<byte>(compress(source.ToArray()));
+    }
+}
diff --git a/src/DotPulsar/Internal/Compression/ZstdCompression.cs b/src/DotPulsar/Internal/Compression/ZstdCompression.cs
new file mode 100644
index 0000000..df12300
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/ZstdCompression.cs
@@ -0,0 +1,162 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Exceptions;
+    using DotPulsar.Internal.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Reflection;
+
+    public static class ZstdCompression
+    {
+        public delegate byte[] Wrap(byte[] src);
+        public delegate int Unwrap(byte[] src, byte[] dst, int offset, bool bufferSizePrecheck);
+
+        public static bool TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory)
+        {
+            try
+            {
+                var assembly = Assembly.Load("ZstdNet");
+
+                var definedTypes = assembly.DefinedTypes.ToArray();
+
+                var decompressorType = Find(definedTypes, "ZstdNet.Decompressor");
+                var decompressorMethods = decompressorType.GetMethods(BindingFlags.Public | BindingFlags.Instance);
+                var unwrapMethod = FindUnwrap(decompressorMethods);
+
+                var compressorType = Find(definedTypes, "ZstdNet.Compressor");
+                var compressorMethods = compressorType.GetMethods(BindingFlags.Public | BindingFlags.Instance);
+                var wrapMethod = FindWrap(compressorMethods);
+
+                compressorFactory = new CompressorFactory(() =>
+                {
+                    var compressor = Activator.CreateInstance(compressorType);
+                    if (compressor is null)
+                        throw new Exception($"Activator.CreateInstance returned null when trying to create a {compressorType.FullName}");
+
+                    var wrap = (Wrap) wrapMethod.CreateDelegate(typeof(Wrap), compressor);
+                    return new Compressor(CreateCompressor(wrap), (IDisposable) compressor);
+                });
+
+                decompressorFactory = new DecompressorFactory(() =>
+                {
+                    var decompressor = Activator.CreateInstance(decompressorType);
+                    if (decompressor is null)
+                        throw new Exception($"Activator.CreateInstance returned null when trying to create a {decompressorType.FullName}");
+
+                    var unwrap = (Unwrap) unwrapMethod.CreateDelegate(typeof(Unwrap), decompressor);
+                    return new Decompressor(CreateDecompressor(unwrap), (IDisposable) decompressor);
+                });
+
+                return true;
+            }
+            catch
+            {
+                // Ignore
+            }
+
+            compressorFactory = null;
+            decompressorFactory = null;
+
+            return false;
+        }
+
+        private static TypeInfo Find(IEnumerable<TypeInfo> types, string fullName)
+        {
+            foreach (var type in types)
+            {
+                if (type.FullName is null || !type.FullName.Equals(fullName))
+                    continue;
+
+                if (type.IsPublic && 
+                    type.IsClass && 
+                    !type.IsAbstract && 
+                    type.ImplementedInterfaces.Contains(typeof(IDisposable)) &&
+                    type.GetConstructor(Type.EmptyTypes) is not null)
+                    return type;
+
+                break;
+            }
+
+            throw new Exception($"{fullName} as a public class with an empty public constructor and implementing IDisposable was not found");
+        }
+
+        private static MethodInfo FindWrap(MethodInfo[] methods)
+        {
+            const string name = "Wrap";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(byte[]))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 1)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(byte[]))
+                    continue;
+
+                return method;
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static MethodInfo FindUnwrap(MethodInfo[] methods)
+        {
+            const string name = "Unwrap";
+
+            foreach (var method in methods)
+            {
+                if (method.Name != name || method.ReturnType != typeof(int))
+                    continue;
+
+                var parameters = method.GetParameters();
+                if (parameters.Length != 4)
+                    continue;
+
+                if (parameters[0].ParameterType != typeof(byte[]) ||
+                    parameters[1].ParameterType != typeof(byte[]) ||
+                    parameters[2].ParameterType != typeof(int) ||
+                    parameters[3].ParameterType != typeof(bool))
+                    continue;
+
+                return method;
+            }
+
+            throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+        }
+
+        private static Func<ReadOnlySequence<byte>, int, ReadOnlySequence<byte>> CreateDecompressor(Unwrap decompress)
+        {
+            return (source, size) =>
+            {
+                var decompressed = new byte[size];
+                var bytesDecompressed = decompress(source.ToArray(), decompressed, 0, false);
+                if (size == bytesDecompressed)
+                    return new ReadOnlySequence<byte>(decompressed);
+
+                throw new CompressionException($"ZstdNet.Decompressor returned {bytesDecompressed} but expected {size}");
+            };
+        }
+
+        private static Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> CreateCompressor(Wrap compress)
+            => (source) => new ReadOnlySequence<byte>(compress(source.ToArray()));
+    }
+}
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index de109dd..225eb20 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -74,9 +74,9 @@
 
                     var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
 
-                    if (!messagePackage.IsValid())
+                    if (!messagePackage.ValidateMagicNumberAndChecksum())
                     {
-                        await RejectPackage(messagePackage, cancellationToken).ConfigureAwait(false);
+                        await RejectPackage(messagePackage, CommandAck.ValidationErrorType.ChecksumMismatch, cancellationToken).ConfigureAwait(false);
                         continue;
                     }
 
@@ -84,6 +84,9 @@
                     var redeliveryCount = messagePackage.RedeliveryCount;
                     var data = messagePackage.ExtractData(metadataSize);
                     var metadata = messagePackage.ExtractMetadata(metadataSize);
+
+                    // TODO decompress if needed
+
                     var messageId = messagePackage.MessageId;
 
                     return metadata.ShouldSerializeNumMessagesInBatch()
@@ -159,9 +162,13 @@
             _sendWhenZero = _cachedCommandFlow.MessagePermits;
         }
 
-        private async Task RejectPackage(MessagePackage messagePackage, CancellationToken cancellationToken)
+        private async Task RejectPackage(MessagePackage messagePackage, CommandAck.ValidationErrorType validationErrorType, CancellationToken cancellationToken)
         {
-            var ack = new CommandAck { Type = CommandAck.AckType.Individual, ValidationError = CommandAck.ValidationErrorType.ChecksumMismatch };
+            var ack = new CommandAck
+            {
+                Type = CommandAck.AckType.Individual,
+                ValidationError = validationErrorType
+            };
 
             ack.MessageIds.Add(messagePackage.MessageId);
 
diff --git a/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
index 928c7f4..4e8ead0 100644
--- a/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
@@ -28,13 +28,13 @@
         public static ReadOnlySequence<byte> ExtractData(this MessagePackage package, uint metadataSize)
             => package.Data.Slice(Constants.MetadataOffset + metadataSize);
 
-        public static bool IsValid(this MessagePackage package)
-            => StartsWithMagicNumber(package.Data) && HasValidCheckSum(package.Data);
+        public static bool ValidateMagicNumberAndChecksum(this MessagePackage package)
+            => StartsWithMagicNumber(package.Data) && HasValidChecksum(package.Data);
 
         private static bool StartsWithMagicNumber(ReadOnlySequence<byte> input)
             => input.StartsWith(Constants.MagicNumber);
 
-        private static bool HasValidCheckSum(ReadOnlySequence<byte> input)
+        private static bool HasValidChecksum(ReadOnlySequence<byte> input)
             => input.ReadUInt32(Constants.MagicNumber.Length, true) == Crc32C.Calculate(input.Slice(Constants.MetadataSizeOffset));
     }
 }
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 941c0aa..4d66d6d 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -24,6 +24,7 @@
     {
         private readonly IPulsarClient _pulsarClient;
         private string? _producerName;
+        private CompressionType _compressionType;
         private ulong _initialSequenceId;
         private string? _topic;
         private IHandleStateChanged<ProducerStateChanged>? _stateChangedHandler;
@@ -31,9 +32,16 @@
         public ProducerBuilder(IPulsarClient pulsarClient)
         {
             _pulsarClient = pulsarClient;
+            _compressionType = ProducerOptions.DefaultCompressionType;
             _initialSequenceId = ProducerOptions.DefaultInitialSequenceId;
         }
 
+        public IProducerBuilder CompressionType(CompressionType compressionType)
+        {
+            _compressionType = compressionType;
+            return this;
+        }
+
         public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
         {
             _initialSequenceId = initialSequenceId;
@@ -77,6 +85,7 @@
 
             var options = new ProducerOptions(_topic!)
             {
+                CompressionType = _compressionType,
                 InitialSequenceId = _initialSequenceId,
                 ProducerName = _producerName,
                 StateChangedHandler = _stateChangedHandler
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 2fc5eb0..081a818 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -65,10 +65,8 @@
 #else
         public async ValueTask DisposeAsync()
         {
-            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
-                return;
-
-            await _stream.DisposeAsync().ConfigureAwait(false);
+            if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
+                await _stream.DisposeAsync().ConfigureAwait(false);
         }
 #endif
 
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index d7373a1..f1ad688 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -22,6 +22,11 @@
     public sealed class ProducerOptions
     {
         /// <summary>
+        /// The default compression type.
+        /// </summary>
+        public static readonly CompressionType DefaultCompressionType = CompressionType.None;
+
+        /// <summary>
         /// The default initial sequence id.
         /// </summary>
         public static readonly ulong DefaultInitialSequenceId = 0;
@@ -31,11 +36,17 @@
         /// </summary>
         public ProducerOptions(string topic)
         {
+            CompressionType = DefaultCompressionType;
             InitialSequenceId = DefaultInitialSequenceId;
             Topic = topic;
         }
 
         /// <summary>
+        /// Set the compression type. The default is 'None'.
+        /// </summary>
+        public CompressionType CompressionType { get; set; }
+
+        /// <summary>
         /// Set the initial sequence id. The default is 0.
         /// </summary>
         public ulong InitialSequenceId { get; set; }