Adding PackageIcon and removing the ReaderChannelFactory
diff --git a/.asf.yaml b/.asf.yaml
index 7f8fedd..319738d 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -18,7 +18,7 @@
#
github:
- description: ".NET/C# client library for Apache Pulsar"
+ description: "The official .NET/C# client library for Apache Pulsar"
homepage: https://pulsar.apache.org/
labels:
- pulsar
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index f89de85..934b5a4 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -5,11 +5,12 @@
<Version>0.11.0</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
- <Authors>DanskeCommodities;dblank</Authors>
- <Company>Danske Commodities A/S</Company>
+ <Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
+ <Company>Apache Software Foundation</Company>
<Copyright>$(Company)</Copyright>
<Title>DotPulsar</Title>
<PackageTags>Apache;Pulsar</PackageTags>
+ <PackageIcon>PackageIcon.png</PackageIcon>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<PackageReleaseNotes>Please refer to CHANGELOG.md for details</PackageReleaseNotes>
<Description>The official .NET/C# client library for Apache Pulsar</Description>
@@ -32,4 +33,8 @@
<PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
</ItemGroup>
+ <ItemGroup>
+ <None Include="PackageIcon.png" Pack="true" PackagePath="/" Visible="False" />
+ </ItemGroup>
+
</Project>
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index eb97404..5d20ce5 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -37,27 +37,18 @@
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
IExecute executor,
- ConsumerOptions options,
+ CommandSubscribe subscribe,
+ uint messagePrefetchCount,
+ BatchHandler batchHandler,
IEnumerable<IDecompressorFactory> decompressorFactories)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
_connectionPool = connectionPool;
_executor = executor;
- _messagePrefetchCount = options.MessagePrefetchCount;
-
- _subscribe = new CommandSubscribe
- {
- ConsumerName = options.ConsumerName,
- InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
- PriorityLevel = options.PriorityLevel,
- ReadCompacted = options.ReadCompacted,
- Subscription = options.SubscriptionName,
- Topic = options.Topic,
- Type = (CommandSubscribe.SubType) options.SubscriptionType
- };
-
- _batchHandler = new BatchHandler(true);
+ _subscribe = subscribe;
+ _messagePrefetchCount = messagePrefetchCount;
+ _batchHandler = batchHandler;
_decompressorFactories = decompressorFactories;
}
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
deleted file mode 100644
index bc8b932..0000000
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
-{
- using Abstractions;
- using PulsarApi;
- using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
-
- public sealed class ReaderChannelFactory : IConsumerChannelFactory
- {
- private readonly Guid _correlationId;
- private readonly IRegisterEvent _eventRegister;
- private readonly IConnectionPool _connectionPool;
- private readonly IExecute _executor;
- private readonly CommandSubscribe _subscribe;
- private readonly uint _messagePrefetchCount;
- private readonly BatchHandler _batchHandler;
- private readonly IEnumerable<IDecompressorFactory> _decompressorFactories;
-
- public ReaderChannelFactory(
- Guid correlationId,
- IRegisterEvent eventRegister,
- IConnectionPool connectionPool,
- IExecute executor,
- ReaderOptions options,
- IEnumerable<IDecompressorFactory> decompressorFactories)
- {
- _correlationId = correlationId;
- _eventRegister = eventRegister;
- _connectionPool = connectionPool;
- _executor = executor;
- _messagePrefetchCount = options.MessagePrefetchCount;
-
- _subscribe = new CommandSubscribe
- {
- ConsumerName = options.ReaderName,
- Durable = false,
- ReadCompacted = options.ReadCompacted,
- StartMessageId = options.StartMessageId.ToMessageIdData(),
- Subscription = $"Reader-{Guid.NewGuid():N}",
- Topic = options.Topic
- };
-
- _batchHandler = new BatchHandler(false);
- _decompressorFactories = decompressorFactories;
- }
-
- public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
- => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellationToken)
- {
- var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
- var messageQueue = new AsyncQueue<MessagePackage>();
- var channel = new Channel(_correlationId, _eventRegister, messageQueue);
- var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
- return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _decompressorFactories);
- }
- }
-}
diff --git a/src/DotPulsar/PackageIcon.png b/src/DotPulsar/PackageIcon.png
new file mode 100644
index 0000000..849e7ce
--- /dev/null
+++ b/src/DotPulsar/PackageIcon.png
Binary files differ
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index a74f015..3ce1940 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -16,6 +16,7 @@
{
using Abstractions;
using DotPulsar.Internal.Compression;
+ using DotPulsar.Internal.PulsarApi;
using Exceptions;
using Internal;
using Internal.Abstractions;
@@ -92,9 +93,23 @@
public IConsumer CreateConsumer(ConsumerOptions options)
{
ThrowIfDisposed();
+
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
+ var subscribe = new CommandSubscribe
+ {
+ ConsumerName = options.ConsumerName,
+ InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
+ PriorityLevel = options.PriorityLevel,
+ ReadCompacted = options.ReadCompacted,
+ Subscription = options.SubscriptionName,
+ Topic = options.Topic,
+ Type = (CommandSubscribe.SubType) options.SubscriptionType
+ };
+ var messagePrefetchCount = options.MessagePrefetchCount;
+ var batchHandler = new BatchHandler(true);
+ var decompressorFactories = CompressionFactories.DecompressorFactories();
+ var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, subscribe, messagePrefetchCount, batchHandler, decompressorFactories);
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
@@ -111,9 +126,22 @@
public IReader CreateReader(ReaderOptions options)
{
ThrowIfDisposed();
+
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
+ var subscribe = new CommandSubscribe
+ {
+ ConsumerName = options.ReaderName,
+ Durable = false,
+ ReadCompacted = options.ReadCompacted,
+ StartMessageId = options.StartMessageId.ToMessageIdData(),
+ Subscription = $"Reader-{Guid.NewGuid():N}",
+ Topic = options.Topic
+ };
+ var messagePrefetchCount = options.MessagePrefetchCount;
+ var batchHandler = new BatchHandler(false);
+ var decompressorFactories = CompressionFactories.DecompressorFactories();
+ var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, subscribe, messagePrefetchCount, batchHandler, decompressorFactories);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)