Compression wired up and seems to work. Needs more testing.
diff --git a/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs b/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs
index ecb2fdf..f658e24 100644
--- a/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/ICompressorFactory.cs
@@ -14,8 +14,11 @@
namespace DotPulsar.Internal.Abstractions
{
+ using DotPulsar.Internal.PulsarApi;
+
public interface ICompressorFactory
{
+ CompressionType CompressionType { get; }
ICompress Create();
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs b/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs
index 4770dfc..7cbbd96 100644
--- a/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IDecompressorFactory.cs
@@ -14,8 +14,11 @@
namespace DotPulsar.Internal.Abstractions
{
+ using DotPulsar.Internal.PulsarApi;
+
public interface IDecompressorFactory
{
+ CompressionType CompressionType { get; }
IDecompress Create();
}
}
diff --git a/src/DotPulsar/Internal/Compression/CompressionFactories.cs b/src/DotPulsar/Internal/Compression/CompressionFactories.cs
new file mode 100644
index 0000000..a2da330
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/CompressionFactories.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression
+{
+ using DotPulsar.Internal.Abstractions;
+ using System.Collections.Generic;
+
+ public static class CompressionFactories
+ {
+ private static readonly List<ICompressorFactory> _compressorFactories;
+ private static readonly List<IDecompressorFactory> _decompressorFactories;
+
+ static CompressionFactories()
+ {
+ _compressorFactories = new List<ICompressorFactory>();
+ _decompressorFactories = new List<IDecompressorFactory>();
+
+
+ if (Lz4Compression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory))
+ Add(compressorFactory, decompressorFactory);
+
+ if (SnappyCompression.TryLoading(out compressorFactory, out decompressorFactory))
+ Add(compressorFactory, decompressorFactory);
+
+ if (ZlibCompression.TryLoading(out compressorFactory, out decompressorFactory))
+ Add(compressorFactory, decompressorFactory);
+
+ if (ZstdCompression.TryLoading(out compressorFactory, out decompressorFactory))
+ Add(compressorFactory, decompressorFactory);
+ }
+
+ private static void Add(ICompressorFactory? compressorFactory, IDecompressorFactory? decompressorFactory)
+ {
+ _compressorFactories.Add(compressorFactory!);
+ _decompressorFactories.Add(decompressorFactory!);
+ }
+
+ public static IEnumerable<ICompressorFactory> CompressorFactories()
+ => _compressorFactories;
+
+ public static IEnumerable<IDecompressorFactory> DecompressorFactories()
+ => _decompressorFactories;
+ }
+}
diff --git a/src/DotPulsar/Internal/Compression/Compressor.cs b/src/DotPulsar/Internal/Compression/Compressor.cs
index 96c05fa..51ffaf9 100644
--- a/src/DotPulsar/Internal/Compression/Compressor.cs
+++ b/src/DotPulsar/Internal/Compression/Compressor.cs
@@ -35,6 +35,7 @@
public ReadOnlySequence<byte> Compress(ReadOnlySequence<byte> data)
=> _compress(data);
- public void Dispose() => _disposable?.Dispose();
+ public void Dispose()
+ => _disposable?.Dispose();
}
}
diff --git a/src/DotPulsar/Internal/Compression/CompressorFactory.cs b/src/DotPulsar/Internal/Compression/CompressorFactory.cs
index 7e01684..230e1ff 100644
--- a/src/DotPulsar/Internal/Compression/CompressorFactory.cs
+++ b/src/DotPulsar/Internal/Compression/CompressorFactory.cs
@@ -15,14 +15,20 @@
namespace DotPulsar.Internal.Compression
{
using DotPulsar.Internal.Abstractions;
+ using DotPulsar.Internal.PulsarApi;
using System;
public sealed class CompressorFactory : ICompressorFactory
{
private readonly Func<ICompress> _create;
- public CompressorFactory(Func<ICompress> create)
- => _create = create;
+ public CompressorFactory(CompressionType compressionType, Func<ICompress> create)
+ {
+ CompressionType = compressionType;
+ _create = create;
+ }
+
+ public CompressionType CompressionType { get; }
public ICompress Create()
=> _create();
diff --git a/src/DotPulsar/Internal/Compression/Decompressor.cs b/src/DotPulsar/Internal/Compression/Decompressor.cs
index 281cdce..eddba6c 100644
--- a/src/DotPulsar/Internal/Compression/Decompressor.cs
+++ b/src/DotPulsar/Internal/Compression/Decompressor.cs
@@ -32,6 +32,7 @@
public ReadOnlySequence<byte> Decompress(ReadOnlySequence<byte> data, int decompressedSize)
=> _decompress(data, decompressedSize);
- public void Dispose() => _disposable?.Dispose();
+ public void Dispose()
+ => _disposable?.Dispose();
}
}
diff --git a/src/DotPulsar/Internal/Compression/DecompressorFactory.cs b/src/DotPulsar/Internal/Compression/DecompressorFactory.cs
index 1e448bd..dc2b6da 100644
--- a/src/DotPulsar/Internal/Compression/DecompressorFactory.cs
+++ b/src/DotPulsar/Internal/Compression/DecompressorFactory.cs
@@ -15,14 +15,20 @@
namespace DotPulsar.Internal.Compression
{
using DotPulsar.Internal.Abstractions;
+ using DotPulsar.Internal.PulsarApi;
using System;
public sealed class DecompressorFactory : IDecompressorFactory
{
private readonly Func<IDecompress> _create;
- public DecompressorFactory(Func<IDecompress> create)
- => _create = create;
+ public DecompressorFactory(CompressionType compressionType, Func<IDecompress> create)
+ {
+ CompressionType = compressionType;
+ _create = create;
+ }
+
+ public CompressionType CompressionType { get; }
public IDecompress Create()
=> _create();
diff --git a/src/DotPulsar/Internal/Compression/Lz4Compression.cs b/src/DotPulsar/Internal/Compression/Lz4Compression.cs
index a94b7b6..1411bf1 100644
--- a/src/DotPulsar/Internal/Compression/Lz4Compression.cs
+++ b/src/DotPulsar/Internal/Compression/Lz4Compression.cs
@@ -45,8 +45,8 @@
var encode = FindEncode(methods, lz4Level);
var maximumOutputSize = FindMaximumOutputSize(methods);
- compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(encode, maximumOutputSize)));
- decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(decode)));
+ compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Lz4, () => new Compressor(CreateCompressor(encode, maximumOutputSize)));
+ decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Lz4, () => new Decompressor(CreateDecompressor(decode)));
return true;
}
catch
diff --git a/src/DotPulsar/Internal/Compression/SnappyCompression.cs b/src/DotPulsar/Internal/Compression/SnappyCompression.cs
index 68af96f..3f77a43 100644
--- a/src/DotPulsar/Internal/Compression/SnappyCompression.cs
+++ b/src/DotPulsar/Internal/Compression/SnappyCompression.cs
@@ -41,8 +41,8 @@
var decode = FindDecode(methods);
var encode = FindEncode(methods);
- compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(encode)));
- decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(decode)));
+ compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Snappy, () => new Compressor(CreateCompressor(encode)));
+ decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Snappy, () => new Decompressor(CreateDecompressor(decode)));
return true;
}
catch
diff --git a/src/DotPulsar/Internal/Compression/ZlibCompression.cs b/src/DotPulsar/Internal/Compression/ZlibCompression.cs
index 1af3d4d..082b9b1 100644
--- a/src/DotPulsar/Internal/Compression/ZlibCompression.cs
+++ b/src/DotPulsar/Internal/Compression/ZlibCompression.cs
@@ -41,8 +41,8 @@
var compressBuffer = FindCompressBuffer(methods);
var uncompressBuffer = FindUncompressBuffer(methods);
- compressorFactory = new CompressorFactory(() => new Compressor(CreateCompressor(compressBuffer)));
- decompressorFactory = new DecompressorFactory(() => new Decompressor(CreateDecompressor(uncompressBuffer)));
+ compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Zlib, () => new Compressor(CreateCompressor(compressBuffer)));
+ decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Zlib, () => new Decompressor(CreateDecompressor(uncompressBuffer)));
return true;
}
catch
diff --git a/src/DotPulsar/Internal/Compression/ZstdCompression.cs b/src/DotPulsar/Internal/Compression/ZstdCompression.cs
index df12300..fd5f281 100644
--- a/src/DotPulsar/Internal/Compression/ZstdCompression.cs
+++ b/src/DotPulsar/Internal/Compression/ZstdCompression.cs
@@ -43,7 +43,7 @@
var compressorMethods = compressorType.GetMethods(BindingFlags.Public | BindingFlags.Instance);
var wrapMethod = FindWrap(compressorMethods);
- compressorFactory = new CompressorFactory(() =>
+ compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Zstd, () =>
{
var compressor = Activator.CreateInstance(compressorType);
if (compressor is null)
@@ -53,7 +53,7 @@
return new Compressor(CreateCompressor(wrap), (IDisposable) compressor);
});
- decompressorFactory = new DecompressorFactory(() =>
+ decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Zstd, () =>
{
var decompressor = Activator.CreateInstance(decompressorType);
if (decompressor is null)
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 225eb20..479e89e 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -15,9 +15,11 @@
namespace DotPulsar.Internal
{
using Abstractions;
+ using DotPulsar.Exceptions;
using Extensions;
using PulsarApi;
using System;
+ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -28,6 +30,7 @@
private readonly IConnection _connection;
private readonly BatchHandler _batchHandler;
private readonly CommandFlow _cachedCommandFlow;
+ private readonly IDecompress?[] _decompressors;
private readonly AsyncLock _lock;
private uint _sendWhenZero;
private bool _firstFlow;
@@ -37,13 +40,21 @@
uint messagePrefetchCount,
AsyncQueue<MessagePackage> queue,
IConnection connection,
- BatchHandler batchHandler)
+ BatchHandler batchHandler,
+ IEnumerable<IDecompressorFactory> decompressorFactories)
{
_id = id;
_queue = queue;
_connection = connection;
_batchHandler = batchHandler;
+ _decompressors = new IDecompress[5];
+
+ foreach (var decompressorFactory in decompressorFactories)
+ {
+ _decompressors[(int) decompressorFactory.CompressionType] = decompressorFactory.Create();
+ }
+
_lock = new AsyncLock();
_cachedCommandFlow = new CommandFlow
@@ -81,13 +92,28 @@
}
var metadataSize = messagePackage.GetMetadataSize();
- var redeliveryCount = messagePackage.RedeliveryCount;
- var data = messagePackage.ExtractData(metadataSize);
var metadata = messagePackage.ExtractMetadata(metadataSize);
+ var data = messagePackage.ExtractData(metadataSize);
- // TODO decompress if needed
+ if (metadata.Compression != CompressionType.None)
+ {
+ var decompressor = _decompressors[(int) metadata.Compression];
+ if (decompressor is null)
+ throw new CompressionException($"Support for {metadata.Compression} compression was not found");
+
+ try
+ {
+ data = decompressor.Decompress(data, (int) metadata.UncompressedSize);
+ }
+ catch
+ {
+ await RejectPackage(messagePackage, CommandAck.ValidationErrorType.DecompressionError, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
+ }
var messageId = messagePackage.MessageId;
+ var redeliveryCount = messagePackage.RedeliveryCount;
return metadata.ShouldSerializeNumMessagesInBatch()
? _batchHandler.Add(messageId, redeliveryCount, metadata, data)
@@ -146,6 +172,12 @@
public async ValueTask DisposeAsync()
{
_queue.Dispose();
+
+ for (var i = 0; i < _decompressors.Length; ++i)
+ {
+ _decompressors[i]?.Dispose();
+ }
+
await _lock.DisposeAsync().ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 5268264..eb97404 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -17,6 +17,7 @@
using Abstractions;
using PulsarApi;
using System;
+ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -29,13 +30,15 @@
private readonly CommandSubscribe _subscribe;
private readonly uint _messagePrefetchCount;
private readonly BatchHandler _batchHandler;
+ private readonly IEnumerable<IDecompressorFactory> _decompressorFactories;
public ConsumerChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
IExecute executor,
- ConsumerOptions options)
+ ConsumerOptions options,
+ IEnumerable<IDecompressorFactory> decompressorFactories)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
@@ -55,6 +58,7 @@
};
_batchHandler = new BatchHandler(true);
+ _decompressorFactories = decompressorFactories;
}
public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
@@ -66,7 +70,7 @@
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
- return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
+ return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _decompressorFactories);
}
}
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 3b43351..fd9bd0c 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -29,14 +29,20 @@
private readonly ulong _id;
private readonly string _name;
private readonly IConnection _connection;
+ private readonly ICompressorFactory? _compressorFactory;
- public ProducerChannel(ulong id, string name, IConnection connection)
+ public ProducerChannel(
+ ulong id,
+ string name,
+ IConnection connection,
+ ICompressorFactory? compressorFactory)
{
var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
_sendPackagePool = new DefaultObjectPool<SendPackage>(sendPackagePolicy);
_id = id;
_name = name;
_connection = connection;
+ _compressorFactory = compressorFactory;
}
public async ValueTask ClosedByClient(CancellationToken cancellationToken)
@@ -74,7 +80,16 @@
sendPackage.Command.SequenceId = metadata.SequenceId;
sendPackage.Metadata = metadata;
- sendPackage.Payload = payload;
+
+ if (_compressorFactory is null)
+ sendPackage.Payload = payload;
+ else
+ {
+ sendPackage.Metadata.Compression = _compressorFactory.CompressionType;
+ sendPackage.Metadata.UncompressedSize = (uint) payload.Length;
+ using var compressor = _compressorFactory.Create();
+ sendPackage.Payload = compressor.Compress(payload);
+ }
var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 87c76de..a869666 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -27,13 +27,15 @@
private readonly IConnectionPool _connectionPool;
private readonly IExecute _executor;
private readonly CommandProducer _commandProducer;
+ private readonly ICompressorFactory? _compressorFactory;
public ProducerChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
IExecute executor,
- ProducerOptions options)
+ ProducerOptions options,
+ ICompressorFactory? compressorFactory)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
@@ -45,6 +47,8 @@
ProducerName = options.ProducerName,
Topic = options.Topic
};
+
+ _compressorFactory = compressorFactory;
}
public async Task<IProducerChannel> Create(CancellationToken cancellationToken)
@@ -55,7 +59,7 @@
var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken).ConfigureAwait(false);
var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
var response = await connection.Send(_commandProducer, channel, cancellationToken).ConfigureAwait(false);
- return new ProducerChannel(response.ProducerId, response.ProducerName, connection);
+ return new ProducerChannel(response.ProducerId, response.ProducerName, connection, _compressorFactory);
}
}
}
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 58b31e1..bc8b932 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -17,6 +17,7 @@
using Abstractions;
using PulsarApi;
using System;
+ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -29,13 +30,15 @@
private readonly CommandSubscribe _subscribe;
private readonly uint _messagePrefetchCount;
private readonly BatchHandler _batchHandler;
+ private readonly IEnumerable<IDecompressorFactory> _decompressorFactories;
public ReaderChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
IExecute executor,
- ReaderOptions options)
+ ReaderOptions options,
+ IEnumerable<IDecompressorFactory> decompressorFactories)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
@@ -54,6 +57,7 @@
};
_batchHandler = new BatchHandler(false);
+ _decompressorFactories = decompressorFactories;
}
public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
@@ -65,7 +69,7 @@
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
- return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
+ return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _decompressorFactories);
}
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index c18c7d8..a74f015 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -15,10 +15,12 @@
namespace DotPulsar
{
using Abstractions;
+ using DotPulsar.Internal.Compression;
using Exceptions;
using Internal;
using Internal.Abstractions;
using System;
+ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -60,9 +62,20 @@
public IProducer CreateProducer(ProducerOptions options)
{
ThrowIfDisposed();
+
+ ICompressorFactory? compressorFactory = null;
+
+ if (options.CompressionType != CompressionType.None)
+ {
+ var compressionType = (Internal.PulsarApi.CompressionType) options.CompressionType;
+ compressorFactory = CompressionFactories.CompressorFactories().SingleOrDefault(f => f.CompressionType == compressionType);
+ if (compressorFactory is null)
+ throw new CompressionException($"Support for {compressionType} compression was not found");
+ }
+
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
+ var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, compressorFactory);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var producer = new Producer(correlationId, ServiceUrl, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
@@ -81,7 +94,7 @@
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
+ var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
@@ -100,7 +113,7 @@
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
+ var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)