Compression wired up and seems to work. Needs more testing.
diff --git a/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs b/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs
index ecb2fdf..f658e24 100644
--- a/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs
@@ -14,8 +14,11 @@
 
 namespace DotPulsar.Internal.Abstractions
 {
+    using DotPulsar.Internal.PulsarApi;
+
     public interface ICompressorFactory
     {
+        CompressionType CompressionType { get; }
         ICompress Create();
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs b/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs
index 4770dfc..7cbbd96 100644
--- a/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs
@@ -14,8 +14,11 @@
 
 namespace DotPulsar.Internal.Abstractions
 {
+    using DotPulsar.Internal.PulsarApi;
+
     public interface IDecompressorFactory
     {
+        CompressionType CompressionType { get; }
         IDecompress Create();
     }
 }
diff --git a/src/DotPulsar/Internal/Compression/CompressionFactories.cs b/src/DotPulsar/Internal/Compression/CompressionFactories.cs
new file mode 100644
index 0000000..a2da330
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/CompressionFactories.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+    using DotPulsar.Internal.Abstractions;
+    using System.Collections.Generic;
+
+    public static class CompressionFactories
+    {
+        private static readonly List<ICompressorFactory> _compressorFactories;
+        private static readonly List<IDecompressorFactory> _decompressorFactories;
+
+        static CompressionFactories()
+        {
+            _compressorFactories = new List<ICompressorFactory>();
+            _decompressorFactories = new List<IDecompressorFactory>();
+
+
+            if (Lz4Compression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory))
+                Add(compressorFactory, decompressorFactory);
+
+            if (SnappyCompression.TryLoading(out compressorFactory, out decompressorFactory))
+                Add(compressorFactory, decompressorFactory);
+
+            if (ZlibCompression.TryLoading(out compressorFactory, out decompressorFactory))
+                Add(compressorFactory, decompressorFactory);
+
+            if (ZstdCompression.TryLoading(out compressorFactory, out decompressorFactory))
+                Add(compressorFactory, decompressorFactory);
+        }
+
+        private static void Add(ICompressorFactory? compressorFactory, IDecompressorFactory? decompressorFactory)
+        {
+            _compressorFactories.Add(compressorFactory!);
+            _decompressorFactories.Add(decompressorFactory!);
+        }
+
+        public static IEnumerable<ICompressorFactory> CompressorFactories()
+            => _compressorFactories;
+
+        public static IEnumerable<IDecompressorFactory> DecompressorFactories()
+            => _decompressorFactories;
+    }
+}
diff --git a/src/DotPulsar/Internal/Compression/Compressor.cs b/src/DotPulsar/Internal/Compression/Compressor.cs
index 96c05fa..51ffaf9 100644
--- a/src/DotPulsar/Internal/Compression/Compressor.cs
+++ b/src/DotPulsar/Internal/Compression/Compressor.cs
@@ -35,6 +35,7 @@
         public ReadOnlySequence<byte> Compress(ReadOnlySequence<byte> data)
             => _compress(data);
 
-        public void Dispose() => _disposable?.Dispose();
+        public void Dispose()
+            => _disposable?.Dispose();
     }
 }
diff --git a/src/DotPulsar/Internal/Compression/CompressorFactory.cs b/src/DotPulsar/Internal/Compression/CompressorFactory.cs
index 7e01684..230e1ff 100644
--- a/src/DotPulsar/Internal/Compression/CompressorFactory.cs
+++ b/src/DotPulsar/Internal/Compression/CompressorFactory.cs
@@ -15,14 +15,20 @@
 namespace DotPulsar.Internal.Compression
 {
     using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.PulsarApi;
     using System;
 
     public sealed class CompressorFactory : ICompressorFactory
     {
         private readonly Func<ICompress> _create;
 
-        public CompressorFactory(Func<ICompress> create)
-            => _create = create;
+        public CompressorFactory(CompressionType compressionType, Func<ICompress> create)
+        {
+            CompressionType = compressionType;
+            _create = create;
+        }
+
+        public CompressionType CompressionType { get; }
 
         public ICompress Create()
             => _create();
diff --git a/src/DotPulsar/Internal/Compression/Decompressor.cs b/src/DotPulsar/Internal/Compression/Decompressor.cs
index 281cdce..eddba6c 100644
--- a/src/DotPulsar/Internal/Compression/Decompressor.cs
+++ b/src/DotPulsar/Internal/Compression/Decompressor.cs
@@ -32,6 +32,7 @@
         public ReadOnlySequence<byte> Decompress(ReadOnlySequence<byte> data, int decompressedSize)
             => _decompress(data, decompressedSize);
 
-        public void Dispose() => _disposable?.Dispose();
+        public void Dispose()
+            => _disposable?.Dispose();
     }
 }
diff --git a/src/DotPulsar/Internal/Compression/DecompressorFactory.cs b/src/DotPulsar/Internal/Compression/DecompressorFactory.cs
index 1e448bd..dc2b6da 100644
--- a/src/DotPulsar/Internal/Compression/DecompressorFactory.cs
+++ b/src/DotPulsar/Internal/Compression/DecompressorFactory.cs
@@ -15,14 +15,20 @@
 namespace DotPulsar.Internal.Compression
 {
     using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.PulsarApi;
     using System;
 
     public sealed class DecompressorFactory : IDecompressorFactory
     {
         private readonly Func<IDecompress> _create;
 
-        public DecompressorFactory(Func<IDecompress> create)
-            => _create = create;
+        public DecompressorFactory(CompressionType compressionType, Func<IDecompress> create)
+        {
+            CompressionType = compressionType;
+            _create = create;
+        }
+
+        public CompressionType CompressionType { get; }
 
         public IDecompress Create()
             => _create();
diff --git a/src/DotPulsar/Internal/Compression/Lz4Compression.cs b/src/DotPulsar/Internal/Compression/Lz4Compression.cs
index a94b7b6..1411bf1 100644
--- a/src/DotPulsar/Internal/Compression/Lz4Compression.cs
+++ b/src/DotPulsar/Internal/Compression/Lz4Compression.cs
@@ -45,8 +45,8 @@
                 var encode = FindEncode(methods, lz4Level);
                 var maximumOutputSize = FindMaximumOutputSize(methods);
 
-                compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(encode, maximumOutputSize)));
-                decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(decode)));
+                compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Lz4, () => new Compressor(CreateCompressor(encode, maximumOutputSize)));
+                decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Lz4, () => new Decompressor(CreateDecompressor(decode)));
                 return true;
             }
             catch
diff --git a/src/DotPulsar/Internal/Compression/SnappyCompression.cs b/src/DotPulsar/Internal/Compression/SnappyCompression.cs
index 68af96f..3f77a43 100644
--- a/src/DotPulsar/Internal/Compression/SnappyCompression.cs
+++ b/src/DotPulsar/Internal/Compression/SnappyCompression.cs
@@ -41,8 +41,8 @@
                 var decode = FindDecode(methods);
                 var encode = FindEncode(methods);
 
-                compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(encode)));
-                decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(decode)));
+                compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Snappy, () => new Compressor(CreateCompressor(encode)));
+                decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Snappy, () => new Decompressor(CreateDecompressor(decode)));
                 return true;
             }
             catch
diff --git a/src/DotPulsar/Internal/Compression/ZlibCompression.cs b/src/DotPulsar/Internal/Compression/ZlibCompression.cs
index 1af3d4d..082b9b1 100644
--- a/src/DotPulsar/Internal/Compression/ZlibCompression.cs
+++ b/src/DotPulsar/Internal/Compression/ZlibCompression.cs
@@ -41,8 +41,8 @@
                 var compressBuffer = FindCompressBuffer(methods);
                 var uncompressBuffer = FindUncompressBuffer(methods);
 
-                compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(compressBuffer)));
-                decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(uncompressBuffer)));
+                compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Zlib, () => new Compressor(CreateCompressor(compressBuffer)));
+                decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Zlib, () => new Decompressor(CreateDecompressor(uncompressBuffer)));
                 return true;
             }
             catch
diff --git a/src/DotPulsar/Internal/Compression/ZstdCompression.cs b/src/DotPulsar/Internal/Compression/ZstdCompression.cs
index df12300..fd5f281 100644
--- a/src/DotPulsar/Internal/Compression/ZstdCompression.cs
+++ b/src/DotPulsar/Internal/Compression/ZstdCompression.cs
@@ -43,7 +43,7 @@
                 var compressorMethods = compressorType.GetMethods(BindingFlags.Public | BindingFlags.Instance);
                 var wrapMethod = FindWrap(compressorMethods);
 
-                compressorFactory = new CompressorFactory(() =>
+                compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Zstd, () =>
                 {
                     var compressor = Activator.CreateInstance(compressorType);
                     if (compressor is null)
@@ -53,7 +53,7 @@
                     return new Compressor(CreateCompressor(wrap), (IDisposable) compressor);
                 });
 
-                decompressorFactory = new DecompressorFactory(() =>
+                decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Zstd, () =>
                 {
                     var decompressor = Activator.CreateInstance(decompressorType);
                     if (decompressor is null)
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 225eb20..479e89e 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -15,9 +15,11 @@
 namespace DotPulsar.Internal
 {
     using Abstractions;
+    using DotPulsar.Exceptions;
     using Extensions;
     using PulsarApi;
     using System;
+    using System.Collections.Generic;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -28,6 +30,7 @@
         private readonly IConnection _connection;
         private readonly BatchHandler _batchHandler;
         private readonly CommandFlow _cachedCommandFlow;
+        private readonly IDecompress?[] _decompressors;
         private readonly AsyncLock _lock;
         private uint _sendWhenZero;
         private bool _firstFlow;
@@ -37,13 +40,21 @@
             uint messagePrefetchCount,
             AsyncQueue<MessagePackage> queue,
             IConnection connection,
-            BatchHandler batchHandler)
+            BatchHandler batchHandler,
+            IEnumerable<IDecompressorFactory> decompressorFactories)
         {
             _id = id;
             _queue = queue;
             _connection = connection;
             _batchHandler = batchHandler;
 
+            _decompressors = new IDecompress[5];
+
+            foreach (var decompressorFactory in decompressorFactories)
+            {
+                _decompressors[(int) decompressorFactory.CompressionType] = decompressorFactory.Create();
+            }
+
             _lock = new AsyncLock();
 
             _cachedCommandFlow = new CommandFlow
@@ -81,13 +92,28 @@
                     }
 
                     var metadataSize = messagePackage.GetMetadataSize();
-                    var redeliveryCount = messagePackage.RedeliveryCount;
-                    var data = messagePackage.ExtractData(metadataSize);
                     var metadata = messagePackage.ExtractMetadata(metadataSize);
+                    var data = messagePackage.ExtractData(metadataSize);
 
-                    // TODO decompress if needed
+                    if (metadata.Compression != CompressionType.None)
+                    {
+                        var decompressor = _decompressors[(int) metadata.Compression];
+                        if (decompressor is null)
+                            throw new CompressionException($"Support for {metadata.Compression} compression was not found");
+
+                        try
+                        {
+                            data = decompressor.Decompress(data, (int) metadata.UncompressedSize);
+                        }
+                        catch
+                        {
+                            await RejectPackage(messagePackage, CommandAck.ValidationErrorType.DecompressionError, cancellationToken).ConfigureAwait(false);
+                            continue;
+                        }
+                    }
 
                     var messageId = messagePackage.MessageId;
+                    var redeliveryCount = messagePackage.RedeliveryCount;
 
                     return metadata.ShouldSerializeNumMessagesInBatch()
                         ? _batchHandler.Add(messageId, redeliveryCount, metadata, data)
@@ -146,6 +172,12 @@
         public async ValueTask DisposeAsync()
         {
             _queue.Dispose();
+
+            for (var i = 0; i < _decompressors.Length; ++i)
+            {
+                _decompressors[i]?.Dispose();
+            }
+
             await _lock.DisposeAsync().ConfigureAwait(false);
         }
 
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 5268264..eb97404 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -17,6 +17,7 @@
     using Abstractions;
     using PulsarApi;
     using System;
+    using System.Collections.Generic;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -29,13 +30,15 @@
         private readonly CommandSubscribe _subscribe;
         private readonly uint _messagePrefetchCount;
         private readonly BatchHandler _batchHandler;
+        private readonly IEnumerable<IDecompressorFactory> _decompressorFactories;
 
         public ConsumerChannelFactory(
             Guid correlationId,
             IRegisterEvent eventRegister,
             IConnectionPool connectionPool,
             IExecute executor,
-            ConsumerOptions options)
+            ConsumerOptions options,
+            IEnumerable<IDecompressorFactory> decompressorFactories)
         {
             _correlationId = correlationId;
             _eventRegister = eventRegister;
@@ -55,6 +58,7 @@
             };
 
             _batchHandler = new BatchHandler(true);
+            _decompressorFactories = decompressorFactories;
         }
 
         public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
@@ -66,7 +70,7 @@
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, messageQueue);
             var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
-            return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
+            return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _decompressorFactories);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 3b43351..fd9bd0c 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -29,14 +29,20 @@
         private readonly ulong _id;
         private readonly string _name;
         private readonly IConnection _connection;
+        private readonly ICompressorFactory? _compressorFactory;
 
-        public ProducerChannel(ulong id, string name, IConnection connection)
+        public ProducerChannel(
+            ulong id,
+            string name,
+            IConnection connection,
+            ICompressorFactory? compressorFactory)
         {
             var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
             _sendPackagePool = new DefaultObjectPool<SendPackage>(sendPackagePolicy);
             _id = id;
             _name = name;
             _connection = connection;
+            _compressorFactory = compressorFactory;
         }
 
         public async ValueTask ClosedByClient(CancellationToken cancellationToken)
@@ -74,7 +80,16 @@
 
                 sendPackage.Command.SequenceId = metadata.SequenceId;
                 sendPackage.Metadata = metadata;
-                sendPackage.Payload = payload;
+
+                if (_compressorFactory is null)
+                    sendPackage.Payload = payload;
+                else
+                {
+                    sendPackage.Metadata.Compression = _compressorFactory.CompressionType;
+                    sendPackage.Metadata.UncompressedSize = (uint) payload.Length;
+                    using var compressor = _compressorFactory.Create();
+                    sendPackage.Payload = compressor.Compress(payload);
+                }
 
                 var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 87c76de..a869666 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -27,13 +27,15 @@
         private readonly IConnectionPool _connectionPool;
         private readonly IExecute _executor;
         private readonly CommandProducer _commandProducer;
+        private readonly ICompressorFactory? _compressorFactory;
 
         public ProducerChannelFactory(
             Guid correlationId,
             IRegisterEvent eventRegister,
             IConnectionPool connectionPool,
             IExecute executor,
-            ProducerOptions options)
+            ProducerOptions options,
+            ICompressorFactory? compressorFactory)
         {
             _correlationId = correlationId;
             _eventRegister = eventRegister;
@@ -45,6 +47,8 @@
                 ProducerName = options.ProducerName,
                 Topic = options.Topic
             };
+
+            _compressorFactory = compressorFactory;
         }
 
         public async Task<IProducerChannel> Create(CancellationToken cancellationToken)
@@ -55,7 +59,7 @@
             var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken).ConfigureAwait(false);
             var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
             var response = await connection.Send(_commandProducer, channel, cancellationToken).ConfigureAwait(false);
-            return new ProducerChannel(response.ProducerId, response.ProducerName, connection);
+            return new ProducerChannel(response.ProducerId, response.ProducerName, connection, _compressorFactory);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 58b31e1..bc8b932 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -17,6 +17,7 @@
     using Abstractions;
     using PulsarApi;
     using System;
+    using System.Collections.Generic;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -29,13 +30,15 @@
         private readonly CommandSubscribe _subscribe;
         private readonly uint _messagePrefetchCount;
         private readonly BatchHandler _batchHandler;
+        private readonly IEnumerable<IDecompressorFactory> _decompressorFactories;
 
         public ReaderChannelFactory(
             Guid correlationId,
             IRegisterEvent eventRegister,
             IConnectionPool connectionPool,
             IExecute executor,
-            ReaderOptions options)
+            ReaderOptions options,
+            IEnumerable<IDecompressorFactory> decompressorFactories)
         {
             _correlationId = correlationId;
             _eventRegister = eventRegister;
@@ -54,6 +57,7 @@
             };
 
             _batchHandler = new BatchHandler(false);
+            _decompressorFactories = decompressorFactories;
         }
 
         public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
@@ -65,7 +69,7 @@
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, messageQueue);
             var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
-            return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
+            return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _decompressorFactories);
         }
     }
 }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index c18c7d8..a74f015 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -15,10 +15,12 @@
 namespace DotPulsar
 {
     using Abstractions;
+    using DotPulsar.Internal.Compression;
     using Exceptions;
     using Internal;
     using Internal.Abstractions;
     using System;
+    using System.Linq;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -60,9 +62,20 @@
         public IProducer CreateProducer(ProducerOptions options)
         {
             ThrowIfDisposed();
+
+            ICompressorFactory? compressorFactory = null;
+
+            if (options.CompressionType != CompressionType.None)
+            {
+                var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
+                compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType);
+                if (compressorFactory is null)
+                    throw new CompressionException($"Support for {compressionType} compression was not found");
+            }
+
             var correlationId = Guid.NewGuid();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
+            var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, compressorFactory);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
             var producer = new Producer(correlationId, ServiceUrl, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
@@ -81,7 +94,7 @@
             ThrowIfDisposed();
             var correlationId = Guid.NewGuid();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
+            var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
             var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
             var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
@@ -100,7 +113,7 @@
             ThrowIfDisposed();
             var correlationId = Guid.NewGuid();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
+            var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
             var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
             var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)