Initial commit
diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..1ff0c42
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1,63 @@
+###############################################################################
+# Set default behavior to automatically normalize line endings.
+###############################################################################
+* text=auto
+
+###############################################################################
+# Set default behavior for command prompt diff.
+#
+# This is need for earlier builds of msysgit that does not have it on by
+# default for csharp files.
+# Note: This is only used by command line
+###############################################################################
+#*.cs diff=csharp
+
+###############################################################################
+# Set the merge driver for project and solution files
+#
+# Merging from the command prompt will add diff markers to the files if there
+# are conflicts (Merging from VS is not affected by the settings below, in VS
+# the diff markers are never inserted). Diff markers may cause the following
+# file extensions to fail to load in VS. An alternative would be to treat
+# these files as binary and thus will always conflict and require user
+# intervention with every merge. To do so, just uncomment the entries below
+###############################################################################
+#*.sln merge=binary
+#*.csproj merge=binary
+#*.vbproj merge=binary
+#*.vcxproj merge=binary
+#*.vcproj merge=binary
+#*.dbproj merge=binary
+#*.fsproj merge=binary
+#*.lsproj merge=binary
+#*.wixproj merge=binary
+#*.modelproj merge=binary
+#*.sqlproj merge=binary
+#*.wwaproj merge=binary
+
+###############################################################################
+# behavior for image files
+#
+# image files are treated as binary by default.
+###############################################################################
+#*.jpg binary
+#*.png binary
+#*.gif binary
+
+###############################################################################
+# diff behavior for common document formats
+#
+# Convert binary document formats to text before diffing them. This feature
+# is only available from the command line. Turn it on by uncommenting the
+# entries below.
+###############################################################################
+#*.doc diff=astextplain
+#*.DOC diff=astextplain
+#*.docx diff=astextplain
+#*.DOCX diff=astextplain
+#*.dot diff=astextplain
+#*.DOT diff=astextplain
+#*.pdf diff=astextplain
+#*.PDF diff=astextplain
+#*.rtf diff=astextplain
+#*.RTF diff=astextplain
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6e2e10d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,261 @@
+## Ignore Visual Studio temporary files, build results, and
+## files generated by popular Visual Studio add-ons.
+
+# User-specific files
+*.suo
+*.user
+*.userosscache
+*.sln.docstates
+
+# User-specific files (MonoDevelop/Xamarin Studio)
+*.userprefs
+
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+bld/
+[Bb]in/
+[Oo]bj/
+[Ll]og/
+
+# Visual Studio 2015 cache/options directory
+.vs/
+# Uncomment if you have tasks that create the project's static files in wwwroot
+wwwroot/
+
+# MSTest test Results
+[Tt]est[Rr]esult*/
+[Bb]uild[Ll]og.*
+
+# NUNIT
+*.VisualState.xml
+TestResult.xml
+
+# Build Results of an ATL Project
+[Dd]ebugPS/
+[Rr]eleasePS/
+dlldata.c
+
+# DNX
+project.lock.json
+project.fragment.lock.json
+artifacts/
+
+*_i.c
+*_p.c
+*_i.h
+*.ilk
+*.meta
+*.obj
+*.pch
+*.pdb
+*.pgc
+*.pgd
+*.rsp
+*.sbr
+*.tlb
+*.tli
+*.tlh
+*.tmp
+*.tmp_proj
+*.log
+*.vspscc
+*.vssscc
+.builds
+*.pidb
+*.svclog
+*.scc
+
+# Chutzpah Test files
+_Chutzpah*
+
+# Visual C++ cache files
+ipch/
+*.aps
+*.ncb
+*.opendb
+*.opensdf
+*.sdf
+*.cachefile
+*.VC.db
+*.VC.VC.opendb
+
+# Visual Studio profiler
+*.psess
+*.vsp
+*.vspx
+*.sap
+
+# TFS 2012 Local Workspace
+$tf/
+
+# Guidance Automation Toolkit
+*.gpState
+
+# ReSharper is a .NET coding add-in
+_ReSharper*/
+*.[Rr]e[Ss]harper
+*.DotSettings.user
+
+# JustCode is a .NET coding add-in
+.JustCode
+
+# TeamCity is a build add-in
+_TeamCity*
+
+# DotCover is a Code Coverage Tool
+*.dotCover
+
+# NCrunch
+_NCrunch_*
+.*crunch*.local.xml
+nCrunchTemp_*
+
+# MightyMoose
+*.mm.*
+AutoTest.Net/
+
+# Web workbench (sass)
+.sass-cache/
+
+# Installshield output folder
+[Ee]xpress/
+
+# DocProject is a documentation generator add-in
+DocProject/buildhelp/
+DocProject/Help/*.HxT
+DocProject/Help/*.HxC
+DocProject/Help/*.hhc
+DocProject/Help/*.hhk
+DocProject/Help/*.hhp
+DocProject/Help/Html2
+DocProject/Help/html
+
+# Click-Once directory
+publish/
+
+# Publish Web Output
+*.[Pp]ublish.xml
+*.azurePubxml
+# TODO: Comment the next line if you want to checkin your web deploy settings
+# but database connection strings (with potential passwords) will be unencrypted
+#*.pubxml
+*.publishproj
+
+# Microsoft Azure Web App publish settings. Comment the next line if you want to
+# checkin your Azure Web App publish settings, but sensitive information contained
+# in these scripts will be unencrypted
+PublishScripts/
+
+# NuGet Packages
+*.nupkg
+# The packages folder can be ignored because of Package Restore
+**/packages/*
+# except build/, which is used as an MSBuild target.
+!**/packages/build/
+# Uncomment if necessary however generally it will be regenerated when needed
+#!**/packages/repositories.config
+# NuGet v3's project.json files produces more ignoreable files
+*.nuget.props
+*.nuget.targets
+
+# Microsoft Azure Build Output
+csx/
+*.build.csdef
+
+# Microsoft Azure Emulator
+ecf/
+rcf/
+
+# Windows Store app package directories and files
+AppPackages/
+BundleArtifacts/
+Package.StoreAssociation.xml
+_pkginfo.txt
+
+# Visual Studio cache files
+# files ending in .cache can be ignored
+*.[Cc]ache
+# but keep track of directories ending in .cache
+!*.[Cc]ache/
+
+# Others
+ClientBin/
+~$*
+*~
+*.dbmdl
+*.dbproj.schemaview
+*.jfm
+*.pfx
+*.publishsettings
+node_modules/
+orleans.codegen.cs
+
+# Since there are multiple workflows, uncomment next line to ignore bower_components
+# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
+#bower_components/
+
+# RIA/Silverlight projects
+Generated_Code/
+
+# Backup & report files from converting an old project file
+# to a newer Visual Studio version. Backup files are not needed,
+# because we have git ;-)
+_UpgradeReport_Files/
+Backup*/
+UpgradeLog*.XML
+UpgradeLog*.htm
+
+# SQL Server files
+*.mdf
+*.ldf
+
+# Business Intelligence projects
+*.rdl.data
+*.bim.layout
+*.bim_*.settings
+
+# Microsoft Fakes
+FakesAssemblies/
+
+# GhostDoc plugin setting file
+*.GhostDoc.xml
+
+# Node.js Tools for Visual Studio
+.ntvs_analysis.dat
+
+# Visual Studio 6 build log
+*.plg
+
+# Visual Studio 6 workspace options file
+*.opt
+
+# Visual Studio LightSwitch build output
+**/*.HTMLClient/GeneratedArtifacts
+**/*.DesktopClient/GeneratedArtifacts
+**/*.DesktopClient/ModelManifest.xml
+**/*.Server/GeneratedArtifacts
+**/*.Server/ModelManifest.xml
+_Pvt_Extensions
+
+# Paket dependency manager
+.paket/paket.exe
+paket-files/
+
+# FAKE - F# Make
+.fake/
+
+# JetBrains Rider
+.idea/
+*.sln.iml
+
+# CodeRush
+.cr/
+
+# Python Tools for Visual Studio (PTVS)
+__pycache__/
+*.pyc
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..b3ae3d2
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2018 Daniel Blankensteiner
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..4bb5697
--- /dev/null
+++ b/README.md
@@ -0,0 +1,216 @@
+# DotPulsar
+
+Native .NET/C# client library for [Apache Pulsar](https://pulsar.apache.org/)
+
+## Getting Started
+
+DotPulsar is written entirely in C# and implement Apache Pulsar's [binary protocol](https://pulsar.apache.org/docs/en/develop-binary-protocol/). Other options was using the [C++ client library](https://pulsar.apache.org/docs/en/client-libraries-cpp/) (which is what the [Python client](https://pulsar.apache.org/docs/en/client-libraries-python/) and [Go client](https://pulsar.apache.org/docs/en/client-libraries-go/) do) or build on top of the [WebSocket API](https://pulsar.apache.org/docs/en/client-libraries-websocket/). We decided to implement the binary protocol in order to gain full control and maximize portability and performance.
+
+DotPulsar's API is strongly inspired by Apache Pulsar's official [Java client](https://pulsar.apache.org/docs/en/client-libraries-java/), but a 100% match is not goal.
+
+Let's see how to produce, consume and read messages.
+
+### Producing messages
+
+Producers can be created via the extension method show below, which follows the API from the Java client:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var producer = client.NewProducer().Topic("persistent://public/default/mytopic").Create();
+await producer.Send(System.Text.Encoding.UTF8.GetBytes("Hello World"));
+```
+
+If you are not a fan of extensions methods and builders, then there is another option:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var producerOptions = new ProducerOptions
+{
+ ProducerName = "MyProducer",
+ Topic = "persistent://public/default/mytopic"
+};
+var producer = client.CreateProducer(producerOptions);
+```
+
+In the above you only specify the data being sent, but you can also specify metadata:
+
+```csharp
+var data = Encoding.UTF8.GetBytes("Hello World");
+var messageId = await producer.NewMessage()
+ .Property("SomeKey", "SomeValue") //EventTime and SequenceId can also be set
+ .Send(data);
+```
+
+If you are not a fan of extensions methods and builders, then there is another option:
+
+```csharp
+var data = Encoding.UTF8.GetBytes("Hello World");
+var metadata = new MessageMetadata(); //EventTime and SequenceId can be set via properties
+metadata["SomeKey"] = "SomeValue";
+var messageId = await producer.Send(metadata, data));
+```
+
+### Consuming messages
+
+Consumers can be created via the extension method show below, which follows the API from the Java client:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var consumer = client.NewConsumer()
+ .SubscriptionName("MySubscription")
+ .Topic("persistent://public/default/mytopic")
+ .Create();
+var message = await consumer.Receive();
+Console.WriteLine("Received Message: " + Encoding.UTF8.GetString(message.Data.ToArray()));
+await consumer.Acknowledge(message);
+```
+
+If you are not a fan of extensions methods and builders, then there is another option:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var consumerOptions = new ConsumerOptions
+{
+ SubscriptionName = "MySubscription",
+ Topic = "persistent://public/default/mytopic"
+};
+var consumer = client.CreateConsumer(consumerOptions);
+```
+
+### Reading messages
+
+Readers can be created via the extension method show below, which follows the API from the Java client:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var reader = client.NewReader()
+ .StartMessageId(MessageId.Earliest)
+ .Topic("persistent://public/default/mytopic")
+ .Create();
+var message = await reader.Receive();
+Console.WriteLine("Received Message: " + Encoding.UTF8.GetString(message.Data.ToArray()));
+```
+
+If you are not a fan of extensions methods and builders, then there is another option:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var readerOptions = new ReaderOptions
+{
+ StartMessageId = MessageId.Earliest,
+ Topic = "persistent://public/default/mytopic"
+};
+var reader = client.CreateReader(readerOptions);
+```
+
+## Monitoring state
+
+Consumers, producers and readers all have states that can be monitored. Let's have a look at what states they can have.
+
+### Consumer states
+
+* Active (All is well)
+* Inactive (All is well. The subscription type is 'Failover' and you are not the active consumer)
+* Closed (The consumer or PulsarClient has been disposed)
+* Disconnected (The connection was lost and attempts are being made to reconnect)
+* Faulted (An unrecoverable error has occurred)
+* ReachedEndOfTopic (No more messages will be delivered)
+
+### Producer states
+
+* Closed (The producer or PulsarClient has been disposed)
+* Connected (All is well)
+* Disconnected (The connection was lost and attempts are being made to reconnect)
+* Faulted (An unrecoverable error has occurred)
+
+### Reader states
+
+* Closed (The reader or PulsarClient has been disposed)
+* Connected: (All is well)
+* Disconnected (The connection was lost and attempts are being made to reconnect)
+* Faulted (An unrecoverable error has occurred)
+* ReachedEndOfTopic (No more messages will be delivered)
+
+### How to
+
+Monitoring the state is easy, so let's see how to monitor when a consumer changes state:
+
+```csharp
+private static async Task MonitorConsumerState(IConsumer consumer, CancellationToken cancellationToken)
+{
+ var state = ConsumerState.Disconnected;
+
+ while (true)
+ {
+ state = await consumer.StateChangedFrom(state, cancellationToken);
+
+ switch (state)
+ {
+ case ConsumerState.Active:
+ Console.WriteLine("Consumer is active");
+ break;
+ case ConsumerState.Inactive:
+ Console.WriteLine("Consumer is inactive");
+ break;
+ case ConsumerState.Disconnected:
+ Console.WriteLine("Consumer is disconnected");
+ break;
+ case ConsumerState.Closed:
+ Console.WriteLine("Consumer has closed");
+ break;
+ case ConsumerState.ReachedEndOfTopic:
+ Console.WriteLine("Consumer has reached end of topic");
+ break;
+ case ConsumerState.Faulted:
+ Console.WriteLine("Consumer has faulted");
+ break;
+ }
+
+ if (consumer.IsFinalState(state))
+ return;
+ }
+}
+```
+
+Here the variable 'state' will contained to new state. You can both monitor going From (StateChangedFrom) and To (StateChangedTo) a state.
+Some states are final, meaning the state can no longer change. For consumers 'Closed', 'Faulted' and 'ReachedEndOfTopic' are final states. When the consumer enter a final state, all monitoring tasks are completed. So if you e.g. are monitoring going to 'Diconnected' and the consumer is 'Closed', then you task will complete and return 'Closed'.
+
+## Built With
+
+* [protobuf-net](https://github.com/mgravell/protobuf-net) - Provides simple access to fast and efficient "Protocol Buffers" serialization from .NET applications
+* [System.IO.Pipelines](https://www.nuget.org/packages/System.IO.Pipelines/) - Single producer single consumer byte buffer management
+
+## Versioning
+
+We use [SemVer](http://semver.org/) for versioning. For the versions available, see the [tags on this repository](https://github.com/danske-commodities/dotpulsar/tags).
+
+## Authors
+
+* **Daniel Blankensteiner** - *Initial work* - [Danske Commodities](https://github.com/danske-commodities)
+
+See also the list of [contributors](https://github.com/danske-commodities/dotpulsar/contributors) who participated in this project.
+
+## License
+
+This project is licensed under the Apache License Version 2.0 - see the [LICENSE](LICENSE) file for details.
+
+## Roadmap
+
+1.0.0
+
+* Move to IAsyncDisposable and IAsyncEnumerable (will mean moving to .NET Standard 2.1)
+
+X.X.X //Future
+
+* Schemas
+* Authentication/Authorization (TLS Authentication, Athenz, Kerberos)
+* Partitioned topics
+* Topic compaction
+* Message compression (LZ4, ZLIB, ZSTD, SNAPPY)
+* Multi-topic subscriptions
+* Connection encryption
+* Message encryption
+* Batching
+* CommandConsumerStats/CommandConsumerStatsResponse
+* CommandGetTopicsOfNamespace/CommandGetTopicsOfNamespaceResponse
+* CommandPartitionedTopicMetadata/CommandPartitionedTopicMetadataResponse
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
new file mode 100644
index 0000000..13d206a
--- /dev/null
+++ b/src/Directory.Build.props
@@ -0,0 +1,7 @@
+<Project>
+
+ <PropertyGroup>
+ <LangVersion>8.0</LangVersion>
+ </PropertyGroup>
+
+</Project>
\ No newline at end of file
diff --git a/src/DotPulsar.Tests/DotPulsar.Tests.csproj b/src/DotPulsar.Tests/DotPulsar.Tests.csproj
new file mode 100644
index 0000000..34b3de6
--- /dev/null
+++ b/src/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -0,0 +1,21 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>netcoreapp2.2</TargetFramework>
+ <IsPackable>false</IsPackable>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
+ <PackageReference Include="xunit" Version="2.4.1" />
+ <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
+ </PackageReference>
+ </ItemGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="..\DotPulsar\DotPulsar.csproj" />
+ </ItemGroup>
+
+</Project>
diff --git a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
new file mode 100644
index 0000000..b5ba9b1
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -0,0 +1,137 @@
+using DotPulsar.Internal;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+ public class AsyncLockTests
+ {
+ [Fact]
+ public void Lock_GivenLockIsFree_ShouldReturnCompletedTask()
+ {
+ //Arrange
+ var sut = new AsyncLock();
+
+ //Act
+ var actual = sut.Lock();
+
+ //Assert
+ Assert.True(actual.IsCompleted);
+
+ //Annihilate
+ actual.Result.Dispose();
+ sut.Dispose();
+ }
+
+ [Fact]
+ public async Task Lock_GivenLockIsTaken_ShouldReturnIncompletedTask()
+ {
+ //Arrange
+ var sut = new AsyncLock();
+ var alreadyTaken = await sut.Lock();
+
+ //Act
+ var actual = sut.Lock();
+
+ //Assert
+ Assert.False(actual.IsCompleted);
+
+ //Annihilate
+ alreadyTaken.Dispose();
+ actual.Result.Dispose();
+ sut.Dispose();
+ }
+
+ [Fact]
+ public async Task Lock_GivenLockIsDisposed_ShouldThrowObjectDisposedException()
+ {
+ //Arrange
+ var sut = new AsyncLock();
+ sut.Dispose();
+
+ //Act
+ var exception = await Record.ExceptionAsync(() => sut.Lock());
+
+ //Assert
+ Assert.IsType<ObjectDisposedException>(exception);
+ }
+
+ [Fact]
+ public async Task Lock_GivenLockIsDisposedWhileAwaitingLock_ShouldThrowObjectDisposedException()
+ {
+ //Arrange
+ var sut = new AsyncLock();
+ var gotLock = await sut.Lock();
+ var awaiting = sut.Lock();
+ _ = Task.Run(() => sut.Dispose());
+
+ //Act
+ var exception = await Record.ExceptionAsync(() => awaiting);
+
+ //Assert
+ Assert.IsType<ObjectDisposedException>(exception);
+
+ //Annihilate
+ sut.Dispose();
+ gotLock.Dispose();
+ }
+
+ [Fact]
+ public async Task Lock_GivenLockIsTakenAndCancellationTokenIsActivated_ShouldThrowTaskCanceledException()
+ {
+ //Arrange
+ var cts = new CancellationTokenSource();
+ var sut = new AsyncLock();
+ var gotLock = await sut.Lock();
+ var awaiting = sut.Lock(cts.Token);
+
+ //Act
+ cts.Cancel();
+ var exception = await Record.ExceptionAsync(() => awaiting);
+
+ //Assert
+ Assert.IsType<TaskCanceledException>(exception);
+
+ //Annihilate
+ cts.Dispose();
+ gotLock.Dispose();
+ sut.Dispose();
+ }
+
+ [Fact]
+ public async Task Dispose_GivenLockIsDisposedWhileItIsTaken_ShouldNotCompleteBeforeItIsReleased()
+ {
+ //Arrange
+ var sut = new AsyncLock();
+ var gotLock = await sut.Lock();
+ var disposeTask = Task.Run(() => sut.Dispose());
+ Assert.False(disposeTask.IsCompleted);
+
+ //Act
+ gotLock.Dispose();
+ await disposeTask;
+
+ //Assert
+ Assert.True(disposeTask.IsCompleted);
+
+ //Annihilate
+ sut.Dispose();
+ }
+
+ [Fact]
+ public void Dispose_WhenCalledMultipleTimes_ShouldBeSafeToDoSo()
+ {
+ //Arrange
+ var sut = new AsyncLock();
+
+ //Act
+ sut.Dispose();
+ var exception = Record.Exception(() => sut.Dispose());
+
+ //Assert
+ Assert.Null(exception);
+ }
+ }
+}
diff --git a/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs b/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs
new file mode 100644
index 0000000..4491804
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -0,0 +1,117 @@
+using DotPulsar.Internal;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+ public class AsyncQueueTests
+ {
+ [Fact]
+ public async Task Enqueue_GivenDequeueTaskWasWaiting_ShouldCompleteDequeueTask()
+ {
+ //Arrange
+ const int value = 1;
+ var queue = new AsyncQueue<int>();
+ var dequeueTask = queue.Dequeue();
+ queue.Enqueue(value);
+
+ //Act
+ var actual = await dequeueTask;
+
+ //Assert
+ Assert.Equal(value, actual);
+
+ //Annihilate
+ queue.Dispose();
+ }
+
+ [Fact]
+ public async Task DequeueAsync_GivenQueueWasNotEmpty_ShouldCompleteDequeueTask()
+ {
+ //Arrange
+ const int value = 1;
+ var queue = new AsyncQueue<int>();
+ queue.Enqueue(value);
+
+ //Act
+ var actual = await queue.Dequeue();
+
+ //Assert
+ Assert.Equal(value, actual);
+
+ //Annihilate
+ queue.Dispose();
+ }
+
+ [Fact]
+ public async Task DequeueAsync_GivenMultipleDequeues_ShouldCompleteInOrderedSequence()
+ {
+ //Arrange
+ const int value1 = 1, value2 = 2;
+ var queue = new AsyncQueue<int>();
+ var dequeue1 = queue.Dequeue();
+ var dequeue2 = queue.Dequeue();
+ queue.Enqueue(value1);
+ queue.Enqueue(value2);
+
+ //Act
+ var actual1 = await dequeue1;
+ var actual2 = await dequeue2;
+
+ //Assert
+ Assert.Equal(value1, actual1);
+ Assert.Equal(value2, actual2);
+
+ //Annihilate
+ queue.Dispose();
+ }
+
+ [Fact]
+ public async Task DequeueAsync_GivenSequenceOfInput_ShouldReturnSameSequenceOfOutput()
+ {
+ //Arrange
+ const int value1 = 1, value2 = 2;
+ var queue = new AsyncQueue<int>();
+ queue.Enqueue(value1);
+ queue.Enqueue(value2);
+
+ //Act
+ var actual1 = await queue.Dequeue();
+ var actual2 = await queue.Dequeue();
+
+ //Assert
+ Assert.Equal(value1, actual1);
+ Assert.Equal(value2, actual2);
+
+ //Annihilate
+ queue.Dispose();
+ }
+
+ [Fact]
+ public async Task DequeueAsync_GivenTokenIsCanceled_ShouldCancelTask()
+ {
+ //Arrange
+ CancellationTokenSource source1 = new CancellationTokenSource(), source2 = new CancellationTokenSource();
+ const int excepted = 1;
+ var queue = new AsyncQueue<int>();
+ var task1 = queue.Dequeue(source1.Token);
+ var task2 = queue.Dequeue(source2.Token);
+
+ //Act
+ source1.Cancel();
+ queue.Enqueue(excepted);
+ var exception = await Record.ExceptionAsync(() => task1);
+ await task2;
+
+ //Assert
+ Assert.IsType<TaskCanceledException>(exception);
+ Assert.Equal(excepted, task2.Result);
+
+ //Annihilate
+ source1.Dispose();
+ source2.Dispose();
+ queue.Dispose();
+ }
+ }
+}
diff --git a/src/DotPulsar.Tests/Internal/Crc32CTests.cs b/src/DotPulsar.Tests/Internal/Crc32CTests.cs
new file mode 100644
index 0000000..0187c29
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/Crc32CTests.cs
@@ -0,0 +1,39 @@
+using DotPulsar.Internal;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+ public class Crc32CTests
+ {
+ [Fact]
+ public void Calculate_GivenSequenceWithSingleSegment_ShouldReturnExpectedChecksum()
+ {
+ //Arrange
+ var segment = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 0x2c };
+ var sequence = new SequenceBuilder<byte>().Append(segment).Build();
+
+ //Act
+ var actual = Crc32C.Calculate(sequence);
+
+ //Assert
+ const uint expected = 2355953212;
+ Assert.Equal(expected, actual);
+ }
+
+ [Fact]
+ public void Calculate_GivenSequenceWithMultipleSegments_ShouldReturnExpectedChecksum()
+ {
+ //Arrange
+ var s1 = new byte[] { 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18, 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, 0x58, 0x01 };
+ var s2 = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 0x2c };
+ var sequence = new SequenceBuilder<byte>().Append(s1).Append(s2).Build();
+
+ //Act
+ var actual = Crc32C.Calculate(sequence);
+
+ //Assert
+ const uint expected = 1079987866;
+ Assert.Equal(expected, actual);
+ }
+ }
+}
diff --git a/src/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/src/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
new file mode 100644
index 0000000..82f92e8
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
@@ -0,0 +1,146 @@
+using DotPulsar.Internal;
+using DotPulsar.Internal.Extensions;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal.Extensions
+{
+ public class ReadOnlySequenceExtensionsTests
+ {
+ [Fact]
+ public void StartsWith_GivenToShortSequenceWithSingleSegment_ShouldReturnFalse()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00 }).Build();
+
+ //Act
+ var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
+
+ //Assert
+ Assert.False(actual);
+ }
+
+ [Fact]
+ public void StartsWith_GivenSequenceWithSingleSegment_ShouldReturnFalse()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x02, 0x01 }).Build();
+
+ //Act
+ var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
+
+ //Assert
+ Assert.False(actual);
+ }
+
+ [Fact]
+ public void StartsWith_GivenSequenceWithSingleSegment_ShouldReturnTrue()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01, 0x02 }).Build();
+
+ //Act
+ var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
+
+ //Assert
+ Assert.True(actual);
+ }
+
+ [Fact]
+ public void StartsWith_GivenToShortSequenceWithMultipleSegments_ShouldReturnFalse()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02 }).Build();
+
+ //Act
+ var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02, 0x03 });
+
+ //Assert
+ Assert.False(actual);
+ }
+
+ [Fact]
+ public void StartsWith_GivenSequenceWithMultipleSegments_ShouldReturnFalse()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x02 }).Append(new byte[] { 0x01, 0x03 }).Build();
+
+ //Act
+ var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02 });
+
+ //Assert
+ Assert.False(actual);
+ }
+
+ [Fact]
+ public void StartsWith_GivenSequenceWithMultipleSegments_ShouldReturnTrue()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build();
+
+ //Act
+ var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02 });
+
+ //Assert
+ Assert.True(actual);
+ }
+
+ [Fact]
+ public void ReadUInt32_GivenSequenceWithSingleSegment_ShouldGiveExceptedResult()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01, 0x02, 0x03 }).Build();
+
+ //Act
+ var actual = sequence.ReadUInt32(0, true);
+
+ //Assert
+ const uint expected = 66051;
+ Assert.Equal(expected, actual);
+ }
+
+ [Fact]
+ public void ReadUInt32_GivenSequenceWithSingleSegmentAndNonZeroStart_ShouldGiveExceptedResult()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x09, 0x00, 0x01, 0x02, 0x03 }).Build();
+
+ //Act
+ var actual = sequence.ReadUInt32(1, true);
+
+ //Assert
+ const uint expected = 66051;
+ Assert.Equal(expected, actual);
+ }
+
+ [Fact]
+ public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build();
+
+ //Act
+ var actual = sequence.ReadUInt32(0, true);
+
+ //Assert
+ const uint expected = 66051;
+ Assert.Equal(expected, actual);
+ }
+
+ [Fact]
+ public void ReadUInt32_GivenSequenceWithMultipleSegmentsAndNonZeroStart_ShouldGiveExceptedResult()
+ {
+ //Arrange
+ var sequence = new SequenceBuilder<byte>()
+ .Append(new byte[] { 0x09, 0x09, 0x09 })
+ .Append(new byte[] { 0x09, 0x00, 0x01 })
+ .Append(new byte[] { 0x02, 0x03 }).Build();
+
+ //Act
+ var actual = sequence.ReadUInt32(4, true);
+
+ //Assert
+ const uint expected = 66051;
+ Assert.Equal(expected, actual);
+ }
+ }
+}
diff --git a/src/DotPulsar.Tests/Internal/SequenceBuilderTests.cs b/src/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
new file mode 100644
index 0000000..51c0ab7
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
@@ -0,0 +1,45 @@
+using DotPulsar.Internal;
+using System.Buffers;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+ public class SequenceBuilderTests
+ {
+ [Fact]
+ public void Append_GivenMultipleInput_ShouldArrangeInCorrectOrder()
+ {
+ //Arrange
+ var a = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+ var b = new byte[] { 0x04, 0x05, 0x06, 0x07, 0x08 };
+ var c = new byte[] { 0x09 };
+ var builder = new SequenceBuilder<byte>().Append(a).Append(b).Append(c);
+
+ //Act
+ var sequence = builder.Build();
+
+ //Assert
+ var array = sequence.ToArray();
+ for (byte i = 0; i < array.Length; ++i)
+ Assert.Equal(i, array[i]);
+ }
+
+ [Fact]
+ public void Prepend_GivenMultipleInput_ShouldArrangeInCorrectOrder()
+ {
+ //Arrange
+ var a = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+ var b = new byte[] { 0x04, 0x05, 0x06, 0x07, 0x08 };
+ var c = new byte[] { 0x09 };
+ var builder = new SequenceBuilder<byte>().Prepend(c).Prepend(b).Prepend(a);
+
+ //Act
+ var sequence = builder.Build();
+
+ //Assert
+ var array = sequence.ToArray();
+ for (byte i = 0; i < array.Length; ++i)
+ Assert.Equal(i, array[i]);
+ }
+ }
+}
diff --git a/src/DotPulsar.Tests/Internal/SerializerTests.cs b/src/DotPulsar.Tests/Internal/SerializerTests.cs
new file mode 100644
index 0000000..f933a74
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/SerializerTests.cs
@@ -0,0 +1,22 @@
+using DotPulsar.Internal;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+ public class SerializerTests
+ {
+ [Fact]
+ public void ToBigEndianBytes_GivenUnsignedInteger_ShouldReturnExpectedBytes()
+ {
+ //Arrange
+ uint value = 66051;
+
+ //Act
+ var actual = Serializer.ToBigEndianBytes(value);
+
+ //Assert
+ var expected = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+ Assert.Equal(expected, actual);
+ }
+ }
+}
diff --git a/src/DotPulsar.Tests/Internal/StateManagerTests.cs b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
new file mode 100644
index 0000000..b7a6590
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -0,0 +1,231 @@
+using DotPulsar.Internal;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+ public class StateManagerTests
+ {
+ [Theory]
+ [InlineData(ProducerState.Connected, ProducerState.Connected, ProducerState.Connected)]
+ [InlineData(ProducerState.Connected, ProducerState.Disconnected, ProducerState.Connected)]
+ [InlineData(ProducerState.Connected, ProducerState.Closed, ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Connected, ProducerState.Disconnected)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Disconnected, ProducerState.Disconnected)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Disconnected)]
+ [InlineData(ProducerState.Closed, ProducerState.Connected, ProducerState.Closed)]
+ [InlineData(ProducerState.Closed, ProducerState.Disconnected, ProducerState.Closed)]
+ [InlineData(ProducerState.Closed, ProducerState.Closed, ProducerState.Closed)]
+ public void SetState_GivenNewState_ShouldReturnFormerState(ProducerState initialState, ProducerState newState, ProducerState expected)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+ //Act
+ var actual = sut.SetState(newState);
+
+ //Assert
+ Assert.Equal(expected, actual);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected)]
+ [InlineData(ProducerState.Closed)]
+ public void SetState_GivenStateIsFinal_ShouldNotChangeState(ProducerState newState)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed);
+
+ //Act
+ _ = sut.SetState(newState);
+
+ //Assert
+ Assert.Equal(ProducerState.Closed, sut.CurrentState);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+ [InlineData(ProducerState.Connected, ProducerState.Closed)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
+ public void SetState_GivenStateIsChangedToWanted_ShouldCompleteTask(ProducerState initialState, ProducerState newState)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+ var task = sut.StateChangedTo(newState, default);
+
+ //Act
+ _ = sut.SetState(newState);
+
+ //Assert
+ Assert.True(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+ [InlineData(ProducerState.Connected, ProducerState.Closed)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
+ public void SetState_GivenStateIsChangedFromWanted_ShouldCompleteTask(ProducerState initialState, ProducerState newState)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+ var task = sut.StateChangedFrom(initialState, default);
+
+ //Act
+ _ = sut.SetState(newState);
+
+ //Assert
+ Assert.True(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected)]
+ [InlineData(ProducerState.Closed)]
+ public void StateChangedTo_GivenStateIsAlreadyWanted_ShouldCompleteTask(ProducerState state)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(state, ProducerState.Closed);
+
+ //Act
+ var task = sut.StateChangedTo(state, default);
+
+ //Assert
+ Assert.True(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+ [InlineData(ProducerState.Connected, ProducerState.Closed)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
+ public void StateChangedTo_GivenStateIsNotWanted_ShouldNotCompleteTask(ProducerState initialState, ProducerState wantedState)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+ //Act
+ var task = sut.StateChangedTo(wantedState, default);
+
+ //Assert
+ Assert.False(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected)]
+ public void StateChangedTo_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed);
+
+ //Act
+ var task = sut.StateChangedTo(state, default);
+
+ //Assert
+ Assert.True(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected)]
+ public void StateChangedFrom_GivenStateHasNotChanged_ShouldNotCompleteTask(ProducerState state)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(state, ProducerState.Closed);
+
+ //Act
+ var task = sut.StateChangedFrom(state, default);
+
+ //Assert
+ Assert.False(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+ [InlineData(ProducerState.Connected, ProducerState.Closed)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
+ public void StateChangedFrom_GivenStateHasChanged_ShouldCompleteTask(ProducerState initialState, ProducerState fromState)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+ //Act
+ var task = sut.StateChangedFrom(fromState, default);
+
+ //Assert
+ Assert.True(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected)]
+ [InlineData(ProducerState.Disconnected)]
+ [InlineData(ProducerState.Closed)]
+ public void StateChangedFrom_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed);
+
+ //Act
+ var task = sut.StateChangedFrom(state, default);
+
+ //Assert
+ Assert.True(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+ public void SetState_GivenStateIsChangeToFinalState_ShouldCompleteTask(ProducerState initialState, ProducerState wantedState)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+ //Act
+ var task = sut.StateChangedTo(wantedState, default);
+ _ = sut.SetState(ProducerState.Closed);
+
+ //Assert
+ Assert.True(task.IsCompleted);
+ }
+
+ [Theory]
+ [InlineData(ProducerState.Connected, ProducerState.Disconnected, ProducerState.Closed)]
+ [InlineData(ProducerState.Disconnected, ProducerState.Connected, ProducerState.Closed)]
+ public void SetState_GivenStateIsChangedToNotWanted_ShouldNotCompleteTask(ProducerState initialState, ProducerState newState, ProducerState wantedState)
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+ //Act
+ var task = sut.StateChangedTo(wantedState, default);
+ _ = sut.SetState(newState);
+
+ //Assert
+ Assert.False(task.IsCompleted);
+ }
+
+ [Fact]
+ public async Task CancelToken_GivenTaskWasStillWaiting_ShouldCancelTask()
+ {
+ //Arrange
+ var sut = new StateManager<ProducerState>(ProducerState.Connected, ProducerState.Closed);
+ var cts = new CancellationTokenSource();
+ var task = sut.StateChangedFrom(ProducerState.Connected, cts.Token);
+
+ //Act
+ cts.Cancel();
+ var exception = await Record.ExceptionAsync(() => task);
+
+ //Assert
+ Assert.IsType<TaskCanceledException>(exception);
+
+ //Annihilate
+ cts.Dispose();
+ }
+ }
+}
diff --git a/src/DotPulsar.sln b/src/DotPulsar.sln
new file mode 100644
index 0000000..db0a03e
--- /dev/null
+++ b/src/DotPulsar.sln
@@ -0,0 +1,31 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.29209.62
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar", "DotPulsar\DotPulsar.csproj", "{8D300FC3-2E35-4D23-B0DD-FEE9E153330A}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar.Tests", "DotPulsar.Tests\DotPulsar.Tests.csproj", "{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {8D300FC3-2E35-4D23-B0DD-FEE9E153330A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8D300FC3-2E35-4D23-B0DD-FEE9E153330A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8D300FC3-2E35-4D23-B0DD-FEE9E153330A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8D300FC3-2E35-4D23-B0DD-FEE9E153330A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
+ EndGlobalSection
+EndGlobal
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
new file mode 100644
index 0000000..9cb5830
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -0,0 +1,52 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A consumer abstraction
+ /// </summary>
+ public interface IConsumer : IStateChanged<ConsumerState>, IDisposable
+ {
+ /// <summary>
+ /// Acknowledge the consumption of a single message
+ /// </summary>
+ Task Acknowledge(Message message, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Acknowledge the consumption of a single message using the MessageId
+ /// </summary>
+ Task Acknowledge(MessageId messageId, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Acknowledge the consumption of all the messages in the topic up to and including the provided message
+ /// </summary>
+ Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Acknowledge the consumption of all the messages in the topic up to and including the provided MessageId
+ /// </summary>
+ Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Get the MessageId of the last message on the topic
+ /// </summary>
+ Task<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Receives a single message
+ /// </summary>
+ Task<Message> Receive(CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this consumer to a specific MessageId
+ /// </summary>
+ Task Seek(MessageId messageId, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Unsubscribe the consumer
+ /// </summary>
+ Task Unsubscribe(CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
new file mode 100644
index 0000000..338b5e8
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -0,0 +1,48 @@
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A consumer building abstraction
+ /// </summary>
+ public interface IConsumerBuilder
+ {
+ /// <summary>
+ /// Set the consumer name. This is optional.
+ /// </summary>
+ IConsumerBuilder ConsumerName(string name);
+
+ /// <summary>
+ /// Set initial position for the subscription. Default is 'Latest'.
+ /// </summary>
+ IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition);
+
+ /// <summary>
+ /// Set the priority level for the shared subscription consumer. Default is 0.
+ /// </summary>
+ IConsumerBuilder PriorityLevel(int priorityLevel);
+
+ /// <summary>
+ /// Number of messages that will be prefetched. Default is 1000.
+ /// </summary>
+ IConsumerBuilder MessagePrefetchCount(uint count);
+
+ /// <summary>
+ /// Set the subscription name for this consumer. This is required.
+ /// </summary>
+ IConsumerBuilder SubscriptionName(string name);
+
+ /// <summary>
+ /// Set the subscription type for this consumer. Default is 'Exclusive'.
+ /// </summary>
+ IConsumerBuilder SubscriptionType(SubscriptionType type);
+
+ /// <summary>
+ /// Set the topic for this consumer. This is required.
+ /// </summary>
+ IConsumerBuilder Topic(string topic);
+
+ /// <summary>
+ /// Create the consumer
+ /// </summary>
+ IConsumer Create();
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IMessageBuilder.cs b/src/DotPulsar/Abstractions/IMessageBuilder.cs
new file mode 100644
index 0000000..14792ba
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IMessageBuilder.cs
@@ -0,0 +1,32 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A message building abstraction
+ /// </summary>
+ public interface IMessageBuilder
+ {
+ /// <summary>
+ /// Set the event time of the message
+ /// </summary>
+ IMessageBuilder EventTime(ulong eventTime);
+
+ /// <summary>
+ /// Add/Set a property key/value on the message
+ /// </summary>
+ IMessageBuilder Property(string key, string value);
+
+ /// <summary>
+ /// Set the sequence id of the message
+ /// </summary>
+ IMessageBuilder SequenceId(ulong sequenceId);
+
+ /// <summary>
+ /// Set the consumer name
+ /// </summary>
+ Task<MessageId> Send(ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
new file mode 100644
index 0000000..b59b75b
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -0,0 +1,22 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A producer abstraction
+ /// </summary>
+ public interface IProducer : IStateChanged<ProducerState>, IDisposable
+ {
+ /// <summary>
+ /// Sends a message
+ /// </summary>
+ Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Sends a message and metadata
+ /// </summary>
+ Task<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
new file mode 100644
index 0000000..00edff5
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -0,0 +1,28 @@
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A producer building abstraction
+ /// </summary>
+ public interface IProducerBuilder
+ {
+ /// <summary>
+ /// Set the producer name. This is optional.
+ /// </summary>
+ IProducerBuilder ProducerName(string name);
+
+ /// <summary>
+ /// Set the initial sequence id. Default is 0.
+ /// </summary>
+ IProducerBuilder InitialSequenceId(ulong initialSequenceId);
+
+ /// <summary>
+ /// Set the topic for this producer. This is required.
+ /// </summary>
+ IProducerBuilder Topic(string topic);
+
+ /// <summary>
+ /// Create the producer
+ /// </summary>
+ IProducer Create();
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
new file mode 100644
index 0000000..9e9d9c3
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -0,0 +1,25 @@
+using System;
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A pulsar client abstraction
+ /// </summary>
+ public interface IPulsarClient : IDisposable
+ {
+ /// <summary>
+ /// Create a producer
+ /// </summary>
+ IProducer CreateProducer(ProducerOptions options);
+
+ /// <summary>
+ /// Create a consumer
+ /// </summary>
+ IConsumer CreateConsumer(ConsumerOptions options);
+
+ /// <summary>
+ /// Create a reader
+ /// </summary>
+ IReader CreateReader(ReaderOptions options);
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
new file mode 100644
index 0000000..8edafd5
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -0,0 +1,20 @@
+using System;
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A pulsar client building abstraction
+ /// </summary>
+ public interface IPulsarClientBuilder
+ {
+ /// <summary>
+ /// The service URL for the Pulsar cluster
+ /// </summary>
+ IPulsarClientBuilder ServiceUrl(Uri uri);
+
+ /// <summary>
+ /// Create the client
+ /// </summary>
+ IPulsarClient Build();
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
new file mode 100644
index 0000000..70813fa
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -0,0 +1,17 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A reader abstraction
+ /// </summary>
+ public interface IReader : IStateChanged<ReaderState>, IDisposable
+ {
+ /// <summary>
+ /// Receives a single message
+ /// </summary>
+ Task<Message> Receive(CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IReaderBuilder.cs b/src/DotPulsar/Abstractions/IReaderBuilder.cs
new file mode 100644
index 0000000..3e57577
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IReaderBuilder.cs
@@ -0,0 +1,33 @@
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A reader building abstraction
+ /// </summary>
+ public interface IReaderBuilder
+ {
+ /// <summary>
+ /// Set the reader name. This is optional.
+ /// </summary>
+ IReaderBuilder ReaderName(string name);
+
+ /// <summary>
+ /// Number of messages that will be prefetched. Default is 1000.
+ /// </summary>
+ IReaderBuilder MessagePrefetchCount(uint count);
+
+ /// <summary>
+ /// The initial reader position is set to the specified message id. This is required.
+ /// </summary>
+ IReaderBuilder StartMessageId(MessageId messageId);
+
+ /// <summary>
+ /// Set the topic for this reader. This is required.
+ /// </summary>
+ IReaderBuilder Topic(string topic);
+
+ /// <summary>
+ /// Create the reader
+ /// </summary>
+ IReader Create();
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs b/src/DotPulsar/Abstractions/IStateChanged.cs
new file mode 100644
index 0000000..fec32f3
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IStateChanged.cs
@@ -0,0 +1,49 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+ /// <summary>
+ /// A state change monitoring abstraction
+ /// </summary>
+ public interface IStateChanged<TState>
+ {
+ /// <summary>
+ /// Wait for the state to change to a specific state
+ /// </summary>
+ /// <returns>
+ /// The current state
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete
+ /// </remarks>
+ Task<TState> StateChangedTo(TState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Wait for the state to change from a specific state
+ /// </summary>
+ /// <returns>
+ /// The current state
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete
+ /// </remarks>
+ Task<TState> StateChangedFrom(TState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Ask whether the current state is final, meaning that it will never change
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not
+ /// </returns>
+ bool IsFinalState();
+
+ /// <summary>
+ /// Ask whether the provided state is final, meaning that it will never change
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not
+ /// </returns>
+ bool IsFinalState(TState state);
+ }
+}
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
new file mode 100644
index 0000000..65b7105
--- /dev/null
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -0,0 +1,20 @@
+namespace DotPulsar
+{
+ public sealed class ConsumerOptions
+ {
+ public ConsumerOptions()
+ {
+ InitialPosition = SubscriptionInitialPosition.Latest;
+ MessagePrefetchCount = 1000;
+ SubscriptionType = SubscriptionType.Exclusive;
+ }
+
+ public string ConsumerName { get; set; }
+ public SubscriptionInitialPosition InitialPosition { get; set; }
+ public int PriorityLevel { get; set; }
+ public uint MessagePrefetchCount { get; set; }
+ public string SubscriptionName { get; set; }
+ public SubscriptionType SubscriptionType { get; set; }
+ public string Topic { get; set; }
+ }
+}
diff --git a/src/DotPulsar/ConsumerState.cs b/src/DotPulsar/ConsumerState.cs
new file mode 100644
index 0000000..35dc4fd
--- /dev/null
+++ b/src/DotPulsar/ConsumerState.cs
@@ -0,0 +1,12 @@
+namespace DotPulsar
+{
+ public enum ConsumerState : byte
+ {
+ Active,
+ Closed,
+ Disconnected,
+ Faulted,
+ Inactive,
+ ReachedEndOfTopic
+ }
+}
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
new file mode 100644
index 0000000..1fbacaf
--- /dev/null
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -0,0 +1,26 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>netcoreapp2.2</TargetFramework>
+ <Version>0.1.0</Version>
+ <AssemblyVersion>$(Version)</AssemblyVersion>
+ <FileVersion>$(Version)</FileVersion>
+ <PackageReleaseNotes>Alpha release</PackageReleaseNotes>
+ <Description>Native .NET/C# client library for Apache Pulsar</Description>
+ <Authors>dblank</Authors>
+ <Company>Danske Commodities A/S</Company>
+ <Copyright>$(Company)</Copyright>
+ <PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
+ <NoWarn>1701;1702;1591</NoWarn>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
+ <DocumentationFile>\DotPulsar.xml</DocumentationFile>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <PackageReference Include="protobuf-net" Version="2.4.0" />
+ <PackageReference Include="System.IO.Pipelines" Version="4.5.3" />
+ </ItemGroup>
+
+</Project>
diff --git a/src/DotPulsar/Exceptions/AuthenticationException.cs b/src/DotPulsar/Exceptions/AuthenticationException.cs
new file mode 100644
index 0000000..6e9e098
--- /dev/null
+++ b/src/DotPulsar/Exceptions/AuthenticationException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class AuthenticationException : DotPulsarException
+ {
+ public AuthenticationException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/AuthorizationException.cs b/src/DotPulsar/Exceptions/AuthorizationException.cs
new file mode 100644
index 0000000..72f0dbb
--- /dev/null
+++ b/src/DotPulsar/Exceptions/AuthorizationException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class AuthorizationException : DotPulsarException
+ {
+ public AuthorizationException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/ChecksumException.cs b/src/DotPulsar/Exceptions/ChecksumException.cs
new file mode 100644
index 0000000..4068590
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ChecksumException.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class ChecksumException : DotPulsarException
+ {
+ public ChecksumException(string message) : base(message) { }
+
+ public ChecksumException(uint expectedChecksum, uint actualChecksum) : base($"Checksum mismatch. Excepted {expectedChecksum} but was actually {actualChecksum}") { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/ConsumerAssignException.cs b/src/DotPulsar/Exceptions/ConsumerAssignException.cs
new file mode 100644
index 0000000..eb4c1f7
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerAssignException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class ConsumerAssignException : DotPulsarException
+ {
+ public ConsumerAssignException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/ConsumerBusyException.cs b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
new file mode 100644
index 0000000..aaa1069
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class ConsumerBusyException : DotPulsarException
+ {
+ public ConsumerBusyException(string message) : base(message) { }
+ }
+}
\ No newline at end of file
diff --git a/src/DotPulsar/Exceptions/ConsumerClosedException.cs b/src/DotPulsar/Exceptions/ConsumerClosedException.cs
new file mode 100644
index 0000000..5c714f0
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerClosedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class ConsumerClosedException : DotPulsarException
+ {
+ public ConsumerClosedException() : base("Consumer has closed") { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/DotPulsarException.cs b/src/DotPulsar/Exceptions/DotPulsarException.cs
new file mode 100644
index 0000000..a57d1a0
--- /dev/null
+++ b/src/DotPulsar/Exceptions/DotPulsarException.cs
@@ -0,0 +1,11 @@
+using System;
+
+namespace DotPulsar.Exceptions
+{
+ public abstract class DotPulsarException : Exception
+ {
+ public DotPulsarException(string message) : base(message) { }
+
+ public DotPulsarException(string message, Exception innerException) : base(message, innerException) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs b/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
new file mode 100644
index 0000000..6deb8ce
--- /dev/null
+++ b/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class IncompatibleSchemaException : DotPulsarException
+ {
+ public IncompatibleSchemaException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/InvalidTopicNameException.cs b/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
new file mode 100644
index 0000000..784fb2e
--- /dev/null
+++ b/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class InvalidTopicNameException : DotPulsarException
+ {
+ public InvalidTopicNameException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/MetadataException.cs b/src/DotPulsar/Exceptions/MetadataException.cs
new file mode 100644
index 0000000..96a65fd
--- /dev/null
+++ b/src/DotPulsar/Exceptions/MetadataException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class MetadataException : DotPulsarException
+ {
+ public MetadataException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/PersistenceException.cs b/src/DotPulsar/Exceptions/PersistenceException.cs
new file mode 100644
index 0000000..918667b
--- /dev/null
+++ b/src/DotPulsar/Exceptions/PersistenceException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class PersistenceException : DotPulsarException
+ {
+ public PersistenceException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs b/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
new file mode 100644
index 0000000..c72bc0b
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class ProducerBlockedQuotaExceededException : DotPulsarException
+ {
+ public ProducerBlockedQuotaExceededException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/ProducerBusyException.cs b/src/DotPulsar/Exceptions/ProducerBusyException.cs
new file mode 100644
index 0000000..a465ba3
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProducerBusyException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class ProducerBusyException : DotPulsarException
+ {
+ public ProducerBusyException(string message) : base(message) { }
+ }
+}
\ No newline at end of file
diff --git a/src/DotPulsar/Exceptions/ProducerClosedException.cs b/src/DotPulsar/Exceptions/ProducerClosedException.cs
new file mode 100644
index 0000000..6fa5e4f
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProducerClosedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class ProducerClosedException : DotPulsarException
+ {
+ public ProducerClosedException() : base("Producer has closed") { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/PulsarClientClosedException.cs b/src/DotPulsar/Exceptions/PulsarClientClosedException.cs
new file mode 100644
index 0000000..988a4b9
--- /dev/null
+++ b/src/DotPulsar/Exceptions/PulsarClientClosedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class PulsarClientClosedException : DotPulsarException
+ {
+ public PulsarClientClosedException() : base("Client has closed") { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/ReaderClosedException.cs b/src/DotPulsar/Exceptions/ReaderClosedException.cs
new file mode 100644
index 0000000..0d08ed8
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ReaderClosedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class ReaderClosedException : DotPulsarException
+ {
+ public ReaderClosedException() : base("Reader has closed") { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs b/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
new file mode 100644
index 0000000..da3eaba
--- /dev/null
+++ b/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class SubscriptionNotFoundException : DotPulsarException
+ {
+ public SubscriptionNotFoundException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/TopicNotFoundException.cs b/src/DotPulsar/Exceptions/TopicNotFoundException.cs
new file mode 100644
index 0000000..d10a36c
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TopicNotFoundException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class TopicNotFoundException : DotPulsarException
+ {
+ public TopicNotFoundException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/TopicTerminatedException.cs b/src/DotPulsar/Exceptions/TopicTerminatedException.cs
new file mode 100644
index 0000000..2222040
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TopicTerminatedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class TopicTerminatedException : DotPulsarException
+ {
+ public TopicTerminatedException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/UnknownException.cs b/src/DotPulsar/Exceptions/UnknownException.cs
new file mode 100644
index 0000000..bd2772c
--- /dev/null
+++ b/src/DotPulsar/Exceptions/UnknownException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class UnknownException : DotPulsarException
+ {
+ public UnknownException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/UnsupportedVersionException.cs b/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
new file mode 100644
index 0000000..cdcb833
--- /dev/null
+++ b/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+ public sealed class UnsupportedVersionException : DotPulsarException
+ {
+ public UnsupportedVersionException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
new file mode 100644
index 0000000..c482522
--- /dev/null
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -0,0 +1,10 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Internal;
+
+namespace DotPulsar.Extensions
+{
+ public static class ProducerExtensions
+ {
+ public static IMessageBuilder NewMessage(this IProducer producer) => new MessageBuilder(producer);
+ }
+}
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
new file mode 100644
index 0000000..025f207
--- /dev/null
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -0,0 +1,12 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Internal;
+
+namespace DotPulsar.Extensions
+{
+ public static class PulsarClientExtensions
+ {
+ public static IProducerBuilder NewProducer(this IPulsarClient pulsarClient) => new ProducerBuilder(pulsarClient);
+ public static IConsumerBuilder NewConsumer(this IPulsarClient pulsarClient) => new ConsumerBuilder(pulsarClient);
+ public static IReaderBuilder NewReader(this IPulsarClient pulsarClient) => new ReaderBuilder(pulsarClient);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerProxy.cs b/src/DotPulsar/Internal/Abstractions/IConsumerProxy.cs
new file mode 100644
index 0000000..4025088
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerProxy.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IConsumerProxy : IEnqueue<MessagePackage>, IDequeue<MessagePackage>
+ {
+ void Active();
+ void Inactive();
+ void Disconnected();
+ void ReachedEndOfTopic();
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerStream.cs b/src/DotPulsar/Internal/Abstractions/IConsumerStream.cs
new file mode 100644
index 0000000..f3443ed
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerStream.cs
@@ -0,0 +1,16 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IConsumerStream : IDisposable
+ {
+ Task Send(CommandAck command);
+ Task<CommandSuccess> Send(CommandUnsubscribe command);
+ Task<CommandSuccess> Send(CommandSeek command);
+ Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command);
+ Task<Message> Receive(CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerStreamFactory.cs b/src/DotPulsar/Internal/Abstractions/IConsumerStreamFactory.cs
new file mode 100644
index 0000000..b8767ff
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerStreamFactory.cs
@@ -0,0 +1,10 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IConsumerStreamFactory
+ {
+ Task<IConsumerStream> CreateStream(IConsumerProxy proxy, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IDequeue.cs b/src/DotPulsar/Internal/Abstractions/IDequeue.cs
new file mode 100644
index 0000000..a456fed
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IDequeue.cs
@@ -0,0 +1,10 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IDequeue<T>
+ {
+ Task<T> Dequeue(CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IEnqueue.cs b/src/DotPulsar/Internal/Abstractions/IEnqueue.cs
new file mode 100644
index 0000000..100485d
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IEnqueue.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IEnqueue<T>
+ {
+ void Enqueue(T item);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IFaultStrategy.cs b/src/DotPulsar/Internal/Abstractions/IFaultStrategy.cs
new file mode 100644
index 0000000..b3e141e
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IFaultStrategy.cs
@@ -0,0 +1,10 @@
+using System;
+
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IFaultStrategy
+ {
+ FaultAction DetermineFaultAction(Exception exception);
+ TimeSpan TimeToWait { get; }
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerProxy.cs b/src/DotPulsar/Internal/Abstractions/IProducerProxy.cs
new file mode 100644
index 0000000..e6823f4
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IProducerProxy.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IProducerProxy
+ {
+ void Disconnected();
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerStream.cs b/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
new file mode 100644
index 0000000..a9e94c2
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
@@ -0,0 +1,11 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IProducerStream : IDisposable
+ {
+ Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerStreamFactory.cs b/src/DotPulsar/Internal/Abstractions/IProducerStreamFactory.cs
new file mode 100644
index 0000000..d0a5f06
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IProducerStreamFactory.cs
@@ -0,0 +1,10 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+ public interface IProducerStreamFactory
+ {
+ Task<IProducerStream> CreateStream(IProducerProxy proxy, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
new file mode 100644
index 0000000..b7d0b4f
--- /dev/null
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -0,0 +1,116 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class AsyncLock : IDisposable
+ {
+ private readonly LinkedList<CancelableCompletionSource<IDisposable>> _pending;
+ private readonly SemaphoreSlim _semaphoreSlim;
+ private readonly Releaser _releaser;
+ private readonly Task<IDisposable> _completedTask;
+ private bool _isDisposed;
+
+ public AsyncLock()
+ {
+ _pending = new LinkedList<CancelableCompletionSource<IDisposable>>();
+ _semaphoreSlim = new SemaphoreSlim(1, 1);
+ _releaser = new Releaser(Release);
+ _completedTask = Task.FromResult((IDisposable)_releaser);
+ _isDisposed = false;
+ }
+
+ public Task<IDisposable> Lock(CancellationToken cancellationToken = default)
+ {
+ LinkedListNode<CancelableCompletionSource<IDisposable>> node = null;
+
+ lock (_pending)
+ {
+ if (_isDisposed)
+ throw new ObjectDisposedException(nameof(AsyncLock));
+
+ if (_semaphoreSlim.CurrentCount == 1) //Lock is free
+ {
+ _semaphoreSlim.Wait(); //Will never block
+ return _completedTask;
+ }
+
+ //Lock was not free
+ var ccs = new CancelableCompletionSource<IDisposable>();
+ node = _pending.AddLast(ccs);
+ }
+
+ cancellationToken.Register(() => Cancel(node));
+
+ return node.Value.Task;
+ }
+
+ public void Dispose()
+ {
+ lock (_pending)
+ {
+ if (_isDisposed)
+ return;
+
+ _isDisposed = true;
+
+ foreach (var pending in _pending)
+ {
+ pending.SetException(new ObjectDisposedException(nameof(AsyncLock)));
+ pending.Dispose();
+ }
+
+ _pending.Clear();
+ }
+
+ _semaphoreSlim.Wait(); //Wait for possible lock-holder to finish
+ _semaphoreSlim.Release();
+ _semaphoreSlim.Dispose();
+ }
+
+ private void Cancel(LinkedListNode<CancelableCompletionSource<IDisposable>> node)
+ {
+ lock (_pending)
+ {
+ try
+ {
+ _pending.Remove(node);
+ node.Value.Dispose();
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+ }
+
+ private void Release()
+ {
+ lock (_pending)
+ {
+ if (_pending.Count > 0)
+ {
+ var node = _pending.First;
+ node.Value.SetResult(_releaser);
+ node.Value.Dispose();
+ _pending.RemoveFirst();
+ return;
+ }
+
+ if (_semaphoreSlim.CurrentCount == 0)
+ _semaphoreSlim.Release();
+ }
+ }
+
+ private class Releaser : IDisposable
+ {
+ private readonly Action _release;
+
+ public Releaser(Action release) => _release = release;
+
+ public void Dispose() => _release();
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs
new file mode 100644
index 0000000..4dbf084
--- /dev/null
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -0,0 +1,78 @@
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable
+ {
+ private readonly object _lock;
+ private readonly Queue<T> _queue;
+ private readonly LinkedList<CancelableCompletionSource<T>> _pendingDequeues;
+
+ public AsyncQueue()
+ {
+ _lock = new object();
+ _queue = new Queue<T>();
+ _pendingDequeues = new LinkedList<CancelableCompletionSource<T>>();
+ }
+
+ public void Enqueue(T item)
+ {
+ lock (_lock)
+ {
+ if (_pendingDequeues.Count > 0)
+ {
+ var tcs = _pendingDequeues.First;
+ _pendingDequeues.RemoveFirst();
+ tcs.Value.SetResult(item);
+ }
+ else
+ _queue.Enqueue(item);
+ }
+ }
+
+ public Task<T> Dequeue(CancellationToken cancellationToken = default)
+ {
+ LinkedListNode<CancelableCompletionSource<T>> node = null;
+
+ lock (_lock)
+ {
+ if (_queue.Count > 0)
+ return Task.FromResult(_queue.Dequeue());
+
+ node = _pendingDequeues.AddLast(new CancelableCompletionSource<T>());
+ }
+
+ node.Value.SetupCancellation(() => Cancel(node), cancellationToken);
+ return node.Value.Task;
+ }
+
+ public void Dispose()
+ {
+ lock (_lock)
+ {
+ foreach (var pendingDequeue in _pendingDequeues)
+ pendingDequeue.Dispose();
+
+ _pendingDequeues.Clear();
+ _queue.Clear();
+ }
+ }
+
+ private void Cancel(LinkedListNode<CancelableCompletionSource<T>> node)
+ {
+ lock (_lock)
+ {
+ try
+ {
+ node.Value.Dispose();
+ _pendingDequeues.Remove(node);
+ }
+ catch { }
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Awaitor.cs b/src/DotPulsar/Internal/Awaitor.cs
new file mode 100644
index 0000000..83e1143
--- /dev/null
+++ b/src/DotPulsar/Internal/Awaitor.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class Awaitor<T, Result> : IDisposable
+ {
+ private readonly Dictionary<T, TaskCompletionSource<Result>> _items;
+
+ public Awaitor() => _items = new Dictionary<T, TaskCompletionSource<Result>>();
+
+ public Task<Result> CreateTask(T item)
+ {
+ var tcs = new TaskCompletionSource<Result>(TaskCreationOptions.RunContinuationsAsynchronously);
+ _items.Add(item, tcs);
+ return tcs.Task;
+ }
+
+ public void SetResult(T item, Result result)
+ {
+ _items.Remove(item, out var tcs);
+ tcs.SetResult(result);
+ }
+
+ public void Dispose()
+ {
+ foreach (var item in _items.Values)
+ {
+ item.SetCanceled();
+ }
+
+ _items.Clear();
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/CancelableCompletionSource.cs b/src/DotPulsar/Internal/CancelableCompletionSource.cs
new file mode 100644
index 0000000..ebf87d4
--- /dev/null
+++ b/src/DotPulsar/Internal/CancelableCompletionSource.cs
@@ -0,0 +1,28 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class CancelableCompletionSource<T> : IDisposable
+ {
+ private readonly TaskCompletionSource<T> _source;
+ private CancellationTokenRegistration? _registration;
+
+ public CancelableCompletionSource() => _source = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ public void SetupCancellation(Action callback, CancellationToken token) => _registration = token.Register(() => callback());
+
+ public void SetResult(T result) => _ = _source.TrySetResult(result);
+
+ public void SetException(Exception exception) => _ = _source.TrySetException(exception);
+
+ public Task<T> Task => _source.Task;
+
+ public void Dispose()
+ {
+ _ = _source.TrySetCanceled();
+ _registration?.Dispose();
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
new file mode 100644
index 0000000..b5eec21
--- /dev/null
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -0,0 +1,191 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Buffers;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class Connection : IDisposable
+ {
+ private readonly AsyncLock _lock;
+ private readonly ProducerManager _producerManager;
+ private readonly ConsumerManager _consumerManager;
+ private readonly RequestResponseHandler _requestResponseHandler;
+ private readonly PingPongHandler _pingPongHandler;
+ private readonly PulsarStream _stream;
+
+ public Connection(Stream stream)
+ {
+ _lock = new AsyncLock();
+ _producerManager = new ProducerManager();
+ _consumerManager = new ConsumerManager();
+ _requestResponseHandler = new RequestResponseHandler();
+ _pingPongHandler = new PingPongHandler(this);
+ _stream = new PulsarStream(stream, HandleCommand);
+ }
+
+ public Task IsClosed => _stream.IsClosed;
+
+ public async Task<bool> IsActive()
+ {
+ using (await _lock.Lock())
+ {
+ return _producerManager.HasProducers || _consumerManager.HasConsumers;
+ }
+ }
+
+ public async Task<ProducerResponse> Send(CommandProducer command, IProducerProxy proxy)
+ {
+ Task<BaseCommand> responseTask = null;
+ using (await _lock.Lock())
+ {
+ _producerManager.Outgoing(command, proxy);
+ var baseCommand = command.AsBaseCommand();
+ responseTask = _requestResponseHandler.Outgoing(baseCommand);
+ var sequence = Serializer.Serialize(baseCommand);
+ await _stream.Send(sequence);
+ }
+
+ var response = await responseTask;
+ if (response.CommandType == BaseCommand.Type.Error)
+ {
+ _producerManager.Remove(command.ProducerId);
+ response.Error.Throw();
+ }
+
+ return new ProducerResponse(command.ProducerId, response.ProducerSuccess.ProducerName);
+ }
+
+ public async Task<SubscribeResponse> Send(CommandSubscribe command, IConsumerProxy proxy)
+ {
+ Task<BaseCommand> responseTask = null;
+ using (await _lock.Lock())
+ {
+ _consumerManager.Outgoing(command, proxy);
+ var baseCommand = command.AsBaseCommand();
+ responseTask = _requestResponseHandler.Outgoing(baseCommand);
+ var sequence = Serializer.Serialize(baseCommand);
+ await _stream.Send(sequence);
+ }
+
+ var response = await responseTask;
+ if (response.CommandType == BaseCommand.Type.Error)
+ {
+ _consumerManager.Remove(command.ConsumerId);
+ response.Error.Throw();
+ }
+
+ return new SubscribeResponse(command.ConsumerId);
+ }
+
+ public async Task Send(CommandPing command) => await Send(command.AsBaseCommand());
+ public async Task Send(CommandPong command) => await Send(command.AsBaseCommand());
+ public async Task Send(CommandAck command) => await Send(command.AsBaseCommand());
+ public async Task Send(CommandFlow command) => await Send(command.AsBaseCommand());
+
+ public async Task<BaseCommand> Send(CommandUnsubscribe command)
+ {
+ var response = await SendRequestResponse(command.AsBaseCommand());
+ if (response.CommandType == BaseCommand.Type.Success)
+ _consumerManager.Remove(command.ConsumerId);
+ return response;
+ }
+
+ public async Task<BaseCommand> Send(CommandConnect command) => await SendRequestResponse(command.AsBaseCommand());
+ public async Task<BaseCommand> Send(CommandLookupTopic command) => await SendRequestResponse(command.AsBaseCommand());
+ public async Task<BaseCommand> Send(CommandSeek command) => await SendRequestResponse(command.AsBaseCommand());
+ public async Task<BaseCommand> Send(CommandGetLastMessageId command) => await SendRequestResponse(command.AsBaseCommand());
+
+ public async Task<BaseCommand> Send(CommandCloseProducer command)
+ {
+ var response = await SendRequestResponse(command.AsBaseCommand());
+ if (response.CommandType == BaseCommand.Type.Success)
+ _producerManager.Remove(command.ProducerId);
+ return response;
+ }
+
+ public async Task<BaseCommand> Send(CommandCloseConsumer command)
+ {
+ var response = await SendRequestResponse(command.AsBaseCommand());
+ if (response.CommandType == BaseCommand.Type.Success)
+ _consumerManager.Remove(command.ConsumerId);
+ return response;
+ }
+
+ public async Task<BaseCommand> Send(SendPackage command)
+ {
+ Task<BaseCommand> response = null;
+ using (await _lock.Lock())
+ {
+ var baseCommand = command.Command.AsBaseCommand();
+ response = _requestResponseHandler.Outgoing(baseCommand);
+ var sequence = Serializer.Serialize(baseCommand, command.Metadata, command.Payload);
+ await _stream.Send(sequence);
+ }
+ return await response;
+ }
+
+ private async Task<BaseCommand> SendRequestResponse(BaseCommand command)
+ {
+ Task<BaseCommand> response = null;
+ using (await _lock.Lock())
+ {
+ response = _requestResponseHandler.Outgoing(command);
+ var sequence = Serializer.Serialize(command);
+ await _stream.Send(sequence);
+ }
+ return await response;
+ }
+
+ private async Task Send(BaseCommand command)
+ {
+ using (await _lock.Lock())
+ {
+ var sequence = Serializer.Serialize(command);
+ await _stream.Send(sequence);
+ }
+ }
+
+ private void HandleCommand(uint commandSize, ReadOnlySequence<byte> sequence)
+ {
+ var command = Serializer.Deserialize<BaseCommand>(sequence.Slice(0, commandSize));
+
+ switch (command.CommandType)
+ {
+ case BaseCommand.Type.Message:
+ _consumerManager.Incoming(new MessagePackage(command.Message, sequence.Slice(commandSize)));
+ return;
+ case BaseCommand.Type.CloseConsumer:
+ _consumerManager.Incoming(command.CloseConsumer);
+ return;
+ case BaseCommand.Type.ActiveConsumerChange:
+ _consumerManager.Incoming(command.ActiveConsumerChange);
+ return;
+ case BaseCommand.Type.ReachedEndOfTopic:
+ _consumerManager.Incoming(command.ReachedEndOfTopic);
+ return;
+ case BaseCommand.Type.CloseProducer:
+ _producerManager.Incoming(command.CloseProducer);
+ return;
+ case BaseCommand.Type.Ping:
+ _pingPongHandler.Incoming(command.Ping);
+ return;
+ default:
+ _requestResponseHandler.Incoming(command);
+ return;
+ }
+ }
+
+ public void Dispose()
+ {
+ _lock.Dispose();
+ _consumerManager.Dispose();
+ _producerManager.Dispose();
+ _requestResponseHandler.Dispose();
+ _stream.Dispose();
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
new file mode 100644
index 0000000..de3ac0b
--- /dev/null
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -0,0 +1,143 @@
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Collections.Concurrent;
+using System.ComponentModel;
+using System.Linq;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ConnectionPool : IDisposable
+ {
+ private readonly AsyncLock _lock;
+ private readonly int _protocolVersion;
+ private readonly string _clientVersion;
+ private readonly Uri _serviceUrl;
+ private readonly ConcurrentDictionary<Uri, Connection> _connections;
+ private readonly CancellationTokenSource _cancellationTokenSource;
+ private readonly Task _closeInactiveConnections;
+
+ public ConnectionPool(int protocolVersion, string clientVersion, Uri serviceUrl)
+ {
+ _lock = new AsyncLock();
+ _protocolVersion = protocolVersion;
+ _clientVersion = clientVersion;
+ _serviceUrl = serviceUrl;
+ _connections = new ConcurrentDictionary<Uri, Connection>();
+ _cancellationTokenSource = new CancellationTokenSource();
+ _closeInactiveConnections = CloseInactiveConnections(TimeSpan.FromSeconds(60), _cancellationTokenSource.Token);
+ }
+
+ public void Dispose() //While we wait for IAsyncDisposable
+ {
+ _cancellationTokenSource.Cancel();
+ _closeInactiveConnections.Wait();
+
+ _lock.Dispose();
+
+ foreach (var serviceUrl in _connections.Keys.ToArray())
+ {
+ Deregister(serviceUrl);
+ }
+ }
+
+ public async Task<Connection> FindConnectionForTopic(string topic, CancellationToken cancellationToken)
+ {
+ var connection = await CreateConnection(_serviceUrl, cancellationToken);
+
+ var authoritative = false;
+
+ while (true)
+ {
+ var lookup = new CommandLookupTopic
+ {
+ Topic = topic,
+ Authoritative = authoritative
+ };
+ var response = await connection.Send(lookup);
+ response.Expect(BaseCommand.Type.LookupResponse);
+
+ switch (response.LookupTopicResponse.Response)
+ {
+ case CommandLookupTopicResponse.LookupType.Connect:
+ return connection;
+ case CommandLookupTopicResponse.LookupType.Redirect:
+ authoritative = response.LookupTopicResponse.Authoritative;
+ connection = await CreateConnection(new Uri(response.LookupTopicResponse.BrokerServiceUrl), cancellationToken);
+ continue;
+ case CommandLookupTopicResponse.LookupType.Failed:
+ response.LookupTopicResponse.Throw();
+ break;
+ default:
+ throw new InvalidEnumArgumentException("LookupType", (int)response.LookupTopicResponse.Response, typeof(CommandLookupTopicResponse.LookupType));
+ }
+ }
+ }
+
+ private async Task<Connection> CreateConnection(Uri serviceUrl, CancellationToken cancellationToken)
+ {
+ using (await _lock.Lock(cancellationToken))
+ {
+ if (_connections.TryGetValue(serviceUrl, out Connection connection))
+ return connection;
+
+ var tcpClient = new TcpClient();
+ await tcpClient.ConnectAsync(serviceUrl.Host, serviceUrl.Port);
+
+ connection = new Connection(tcpClient.GetStream());
+ Register(serviceUrl, connection);
+
+ var connect = new CommandConnect
+ {
+ ProtocolVersion = _protocolVersion,
+ ClientVersion = _clientVersion
+ };
+
+ var response = await connection.Send(connect);
+ response.Expect(BaseCommand.Type.Connected);
+
+ return connection;
+ }
+ }
+
+ private void Register(Uri serviceUrl, Connection connection)
+ {
+ _connections[serviceUrl] = connection;
+ connection.IsClosed.ContinueWith(t => Deregister(serviceUrl));
+ }
+
+ private void Deregister(Uri serviceUrl)
+ {
+ if (_connections.TryRemove(serviceUrl, out Connection connection))
+ connection.Dispose();
+ }
+
+ private async Task CloseInactiveConnections(TimeSpan interval, CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.Delay(interval, cancellationToken);
+
+ using (await _lock.Lock(cancellationToken))
+ {
+ var serviceUrls = _connections.Keys;
+ foreach (var serviceUrl in serviceUrls)
+ {
+ var connection = _connections[serviceUrl];
+ if (connection == null)
+ continue;
+ if (!await connection.IsActive())
+ Deregister(serviceUrl);
+ }
+ }
+ }
+ catch { }
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
new file mode 100644
index 0000000..3317313
--- /dev/null
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -0,0 +1,156 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class Consumer : IConsumer
+ {
+ private readonly Executor _executor;
+ private readonly CommandAck _cachedCommandAck;
+ private readonly IConsumerStreamFactory _streamFactory;
+ private readonly IFaultStrategy _faultStrategy;
+ private readonly bool _setProxyState;
+ private readonly StateManager<ConsumerState> _stateManager;
+ private readonly CancellationTokenSource _connectTokenSource;
+ private readonly Task _connectTask;
+ private Action _throwIfClosedOrFaulted;
+ private IConsumerStream Stream { get; set; }
+
+ public Consumer(IConsumerStreamFactory streamFactory, IFaultStrategy faultStrategy, bool setProxyState)
+ {
+ _executor = new Executor(ExecutorOnException);
+ _cachedCommandAck = new CommandAck();
+ _stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+ _streamFactory = streamFactory;
+ _faultStrategy = faultStrategy;
+ _setProxyState = setProxyState;
+ _connectTokenSource = new CancellationTokenSource();
+ Stream = new NotReadyStream();
+ _connectTask = Connect(_connectTokenSource.Token);
+ _throwIfClosedOrFaulted = () => { };
+ }
+
+ public async Task<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+ public async Task<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+ public bool IsFinalState() => _stateManager.IsFinalState();
+ public bool IsFinalState(ConsumerState state) => _stateManager.IsFinalState(state);
+
+ public void Dispose()
+ {
+ _executor.Dispose();
+ _connectTokenSource.Cancel();
+ _connectTask.Wait();
+ }
+
+ public async Task<Message> Receive(CancellationToken cancellationToken) => await _executor.Execute(() => Stream.Receive(cancellationToken), cancellationToken);
+
+ public async Task Acknowledge(Message message, CancellationToken cancellationToken)
+ => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken);
+
+ public async Task Acknowledge(MessageId messageId, CancellationToken cancellationToken)
+ => await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken);
+
+ public async Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
+ => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
+
+ public async Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
+ => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
+
+ public async Task Unsubscribe(CancellationToken cancellationToken)
+ {
+ _ = await _executor.Execute(() => Stream.Send(new CommandUnsubscribe()), cancellationToken);
+ HasClosed();
+ }
+
+ public async Task Seek(MessageId messageId, CancellationToken cancellationToken)
+ {
+ var seek = new CommandSeek { MessageId = messageId.Data };
+ _ = await _executor.Execute(() => Stream.Send(seek), cancellationToken);
+ return;
+ }
+
+ public async Task<MessageId> GetLastMessageId(CancellationToken cancellationToken)
+ {
+ var response = await _executor.Execute(() => Stream.Send(new CommandGetLastMessageId()), cancellationToken);
+ return new MessageId(response.LastMessageId);
+ }
+
+ private async Task Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
+ {
+ await _executor.Execute(() =>
+ {
+ _cachedCommandAck.Type = ackType;
+ _cachedCommandAck.MessageIds.Clear();
+ _cachedCommandAck.MessageIds.Add(messageIdData);
+ return Stream.Send(_cachedCommandAck);
+ }, cancellationToken);
+ }
+
+ private async Task ExecutorOnException(Exception exception, CancellationToken cancellationToken)
+ {
+ _throwIfClosedOrFaulted();
+
+ switch (_faultStrategy.DetermineFaultAction(exception))
+ {
+ case FaultAction.Retry:
+ await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+ break;
+ case FaultAction.Relookup:
+ await _stateManager.StateChangedFrom(ConsumerState.Disconnected, cancellationToken);
+ break;
+ case FaultAction.Fault:
+ HasFaulted(exception);
+ break;
+ }
+
+ _throwIfClosedOrFaulted();
+ }
+
+ private void HasFaulted(Exception exception)
+ {
+ _throwIfClosedOrFaulted = () => throw exception;
+ _stateManager.SetState(ConsumerState.Faulted);
+ }
+
+ private void HasClosed()
+ {
+ _throwIfClosedOrFaulted = () => throw new ConsumerClosedException();
+ _stateManager.SetState(ConsumerState.Closed);
+ }
+
+ private async Task Connect(CancellationToken cancellationToken)
+ {
+ try
+ {
+ while (true)
+ {
+ using (var proxy = new ConsumerProxy(_stateManager, new AsyncQueue<MessagePackage>()))
+ using (Stream = await _streamFactory.CreateStream(proxy, cancellationToken))
+ {
+ if (_setProxyState)
+ proxy.Active();
+ else
+ await _stateManager.StateChangedFrom(ConsumerState.Disconnected, cancellationToken);
+
+ await _stateManager.StateChangedTo(ConsumerState.Disconnected, cancellationToken);
+ if (_stateManager.IsFinalState())
+ return;
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ HasClosed();
+ }
+ catch (Exception exception)
+ {
+ HasFaulted(exception);
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
new file mode 100644
index 0000000..522240f
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -0,0 +1,60 @@
+using DotPulsar.Abstractions;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ConsumerBuilder : IConsumerBuilder
+ {
+ private readonly IPulsarClient _pulsarClient;
+ private readonly ConsumerOptions _options;
+
+ public ConsumerBuilder(IPulsarClient pulsarClient)
+ {
+ _pulsarClient = pulsarClient;
+ _options = new ConsumerOptions();
+ }
+
+ public IConsumerBuilder ConsumerName(string name)
+ {
+ _options.ConsumerName = name;
+ return this;
+ }
+
+ public IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition)
+ {
+ _options.InitialPosition = initialPosition;
+ return this;
+ }
+
+ public IConsumerBuilder PriorityLevel(int priorityLevel)
+ {
+ _options.PriorityLevel = priorityLevel;
+ return this;
+ }
+
+ public IConsumerBuilder MessagePrefetchCount(uint count)
+ {
+ _options.MessagePrefetchCount = count;
+ return this;
+ }
+
+ public IConsumerBuilder SubscriptionName(string name)
+ {
+ _options.SubscriptionName = name;
+ return this;
+ }
+
+ public IConsumerBuilder SubscriptionType(SubscriptionType type)
+ {
+ _options.SubscriptionType = type;
+ return this;
+ }
+
+ public IConsumerBuilder Topic(string topic)
+ {
+ _options.Topic = topic;
+ return this;
+ }
+
+ public IConsumer Create() => _pulsarClient.CreateConsumer(_options);
+ }
+}
diff --git a/src/DotPulsar/Internal/ConsumerManager.cs b/src/DotPulsar/Internal/ConsumerManager.cs
new file mode 100644
index 0000000..c461283
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerManager.cs
@@ -0,0 +1,61 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ConsumerManager : IDisposable
+ {
+ private readonly IdLookup<IConsumerProxy> _proxies;
+
+ public ConsumerManager() => _proxies = new IdLookup<IConsumerProxy>();
+
+ public bool HasConsumers => !_proxies.IsEmpty();
+
+ public void Outgoing(CommandSubscribe subscribe, IConsumerProxy proxy) => subscribe.ConsumerId = _proxies.Add(proxy);
+
+ public void Dispose()
+ {
+ foreach (var id in _proxies.AllIds())
+ {
+ RemoveConsumer(id);
+ }
+ }
+
+ public void Incoming(MessagePackage package)
+ {
+ var consumerId = package.Command.ConsumerId;
+ var proxy = _proxies[consumerId];
+ proxy?.Enqueue(package);
+ }
+
+ public void Incoming(CommandCloseConsumer command) => RemoveConsumer(command.ConsumerId);
+
+ public void Incoming(CommandActiveConsumerChange command)
+ {
+ var proxy = _proxies[command.ConsumerId];
+ if (proxy == null) return;
+
+ if (command.IsActive)
+ proxy.Active();
+ else
+ proxy.Inactive();
+ }
+
+ public void Incoming(CommandReachedEndOfTopic command)
+ {
+ var proxy = _proxies[command.ConsumerId];
+ proxy?.ReachedEndOfTopic();
+ }
+
+ public void Remove(ulong consumerId) => _proxies.Remove(consumerId);
+
+ private void RemoveConsumer(ulong consumerId)
+ {
+ var proxy = _proxies[consumerId];
+ if (proxy == null) return;
+ proxy.Disconnected();
+ _proxies.Remove(consumerId);
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ConsumerProxy.cs b/src/DotPulsar/Internal/ConsumerProxy.cs
new file mode 100644
index 0000000..8d87a48
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerProxy.cs
@@ -0,0 +1,54 @@
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ConsumerProxy : IConsumerProxy, IDisposable
+ {
+ private readonly object _lock;
+ private readonly StateManager<ConsumerState> _stateManager;
+ private readonly AsyncQueue<MessagePackage> _queue;
+ private bool _hasDisconnected;
+
+ public ConsumerProxy(StateManager<ConsumerState> stateManager, AsyncQueue<MessagePackage> queue)
+ {
+ _lock = new object();
+ _stateManager = stateManager;
+ _queue = queue;
+ _hasDisconnected = false;
+ }
+
+ public void Active() => SetState(ConsumerState.Active);
+ public void Inactive() => SetState(ConsumerState.Inactive);
+ public void ReachedEndOfTopic() => SetState(ConsumerState.ReachedEndOfTopic);
+
+ public void Disconnected()
+ {
+ lock (_lock)
+ {
+ if (_hasDisconnected)
+ return;
+
+ _stateManager.SetState(ConsumerState.Disconnected);
+ _hasDisconnected = true;
+ }
+ }
+
+
+ public void Enqueue(MessagePackage package) => _queue.Enqueue(package);
+ public async Task<MessagePackage> Dequeue(CancellationToken cancellationToken) => await _queue.Dequeue(cancellationToken);
+
+ private void SetState(ConsumerState state)
+ {
+ lock (_lock)
+ {
+ if (!_hasDisconnected)
+ _stateManager.SetState(state);
+ }
+ }
+
+ public void Dispose() => _queue.Dispose();
+ }
+}
diff --git a/src/DotPulsar/Internal/ConsumerStream.cs b/src/DotPulsar/Internal/ConsumerStream.cs
new file mode 100644
index 0000000..0dea1fd
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerStream.cs
@@ -0,0 +1,151 @@
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ConsumerStream : IConsumerStream
+ {
+ private readonly ulong _id;
+ private readonly IDequeue<MessagePackage> _dequeue;
+ private readonly Connection _connection;
+ private readonly IFaultStrategy _faultStrategy;
+ private readonly IConsumerProxy _proxy;
+ private readonly CommandFlow _commandFlow;
+ private uint _sendWhenZero;
+ private bool _firstBatch;
+
+ public ConsumerStream(ulong id, uint messagePrefetchCount, IDequeue<MessagePackage> dequeue, Connection connection, IFaultStrategy faultStrategy, IConsumerProxy proxy)
+ {
+ _id = id;
+ _dequeue = dequeue;
+ _connection = connection;
+ _faultStrategy = faultStrategy;
+ _proxy = proxy;
+ _commandFlow = new CommandFlow { ConsumerId = id, MessagePermits = messagePrefetchCount };
+ _sendWhenZero = 0;
+ _firstBatch = true;
+ }
+
+ public async Task<Message> Receive(CancellationToken cancellationToken)
+ {
+ while (true)
+ {
+ if (_sendWhenZero == 0) //TODO should sending the flow command be handled on other thread and thereby not slow down the consumer?
+ {
+ await _connection.Send(_commandFlow);
+
+ if (_firstBatch)
+ {
+ _commandFlow.MessagePermits = (uint)Math.Ceiling(_commandFlow.MessagePermits * 0.5);
+ _firstBatch = false;
+ }
+
+ _sendWhenZero = _commandFlow.MessagePermits;
+ }
+
+ var messagePackage = await _dequeue.Dequeue(cancellationToken);
+ _sendWhenZero--;
+
+ try
+ {
+ return Serializer.Deserialize(messagePackage);
+ }
+ catch (ChecksumException)
+ {
+ var ack = new CommandAck
+ {
+ Type = CommandAck.AckType.Individual,
+ validation_error = CommandAck.ValidationError.ChecksumMismatch
+ };
+ ack.MessageIds.Add(messagePackage.Command.MessageId);
+ await Send(ack);
+ }
+ }
+ }
+
+ public async Task Send(CommandAck command)
+ {
+ try
+ {
+ command.ConsumerId = _id;
+ await _connection.Send(command);
+ }
+ catch (Exception exception)
+ {
+ OnException(exception);
+ throw;
+ }
+ }
+
+ public async Task<CommandSuccess> Send(CommandUnsubscribe command)
+ {
+ try
+ {
+ command.ConsumerId = _id;
+ var response = await _connection.Send(command);
+ response.Expect(BaseCommand.Type.Success);
+ return response.Success;
+ }
+ catch (Exception exception)
+ {
+ OnException(exception);
+ throw;
+ }
+ }
+
+ public async Task<CommandSuccess> Send(CommandSeek command)
+ {
+ try
+ {
+ command.ConsumerId = _id;
+ var response = await _connection.Send(command);
+ response.Expect(BaseCommand.Type.Success);
+ return response.Success;
+ }
+ catch (Exception exception)
+ {
+ OnException(exception);
+ throw;
+ }
+ }
+
+ public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command)
+ {
+ try
+ {
+ command.ConsumerId = _id;
+ var response = await _connection.Send(command);
+ response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
+ return response.GetLastMessageIdResponse;
+ }
+ catch (Exception exception)
+ {
+ OnException(exception);
+ throw;
+ }
+ }
+
+ public void Dispose()
+ {
+ try
+ {
+ _connection.Send(new CommandCloseConsumer { ConsumerId = _id }).Wait();
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+
+ private void OnException(Exception exception)
+ {
+ if (_faultStrategy.DetermineFaultAction(exception) == FaultAction.Relookup)
+ _proxy.Disconnected();
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ConsumerStreamFactory.cs b/src/DotPulsar/Internal/ConsumerStreamFactory.cs
new file mode 100644
index 0000000..d8fd673
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerStreamFactory.cs
@@ -0,0 +1,81 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ConsumerStreamFactory : IConsumerStreamFactory
+ {
+ private readonly ConnectionPool _connectionTool;
+ private readonly IFaultStrategy _faultStrategy;
+ private readonly CommandSubscribe _subscribe;
+ private readonly uint _messagePrefetchCount;
+
+ public ConsumerStreamFactory(ConnectionPool connectionManager, ConsumerOptions options, IFaultStrategy faultStrategy)
+ {
+ _connectionTool = connectionManager;
+ _faultStrategy = faultStrategy;
+ _messagePrefetchCount = options.MessagePrefetchCount;
+
+ _subscribe = new CommandSubscribe
+ {
+ ConsumerName = options.ConsumerName,
+ initialPosition = (CommandSubscribe.InitialPosition)options.InitialPosition,
+ PriorityLevel = options.PriorityLevel,
+ Subscription = options.SubscriptionName,
+ Topic = options.Topic,
+ Type = (CommandSubscribe.SubType)options.SubscriptionType
+ };
+ }
+
+ public ConsumerStreamFactory(ConnectionPool connectionManager, ReaderOptions options, IFaultStrategy faultStrategy)
+ {
+ _connectionTool = connectionManager;
+ _faultStrategy = faultStrategy;
+ _messagePrefetchCount = options.MessagePrefetchCount;
+
+ _subscribe = new CommandSubscribe
+ {
+ ConsumerName = options.ReaderName,
+ Durable = false,
+ StartMessageId = options.StartMessageId.Data,
+ Subscription = "Reader-" + Guid.NewGuid().ToString("N"),
+ Topic = options.Topic
+ };
+ }
+
+ public async Task<IConsumerStream> CreateStream(IConsumerProxy proxy, CancellationToken cancellationToken)
+ {
+ while (true)
+ {
+ try
+ {
+ var connection = await _connectionTool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
+ var response = await connection.Send(_subscribe, proxy);
+ return new ConsumerStream(response.ConsumerId, _messagePrefetchCount, proxy, connection, _faultStrategy, proxy);
+ }
+ catch (OperationCanceledException)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ throw;
+ else
+ await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+ }
+ catch (Exception exception)
+ {
+ switch (_faultStrategy.DetermineFaultAction(exception))
+ {
+ case FaultAction.Relookup:
+ case FaultAction.Retry:
+ await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+ continue;
+ }
+
+ throw;
+ }
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Crc32C.cs b/src/DotPulsar/Internal/Crc32C.cs
new file mode 100644
index 0000000..eaee280
--- /dev/null
+++ b/src/DotPulsar/Internal/Crc32C.cs
@@ -0,0 +1,77 @@
+using System.Buffers;
+
+namespace DotPulsar.Internal
+{
+ public static class Crc32C
+ {
+ private const uint Generator = 0x82F63B78u;
+
+ private static readonly uint[] Lookup;
+
+ static Crc32C()
+ {
+ Lookup = new uint[16 * 256];
+
+ for (uint i = 0; i < 256; i++)
+ {
+ var entry = i;
+ for (var j = 0; j < 16; j++)
+ {
+ for (var k = 0; k < 8; k++)
+ {
+ entry = (entry & 1) == 1 ? Generator ^ (entry >> 1) : (entry >> 1);
+ }
+ Lookup[(j * 256) + i] = entry;
+ }
+ }
+ }
+
+ public static uint Calculate(ReadOnlySequence<byte> sequence)
+ {
+ var block = new uint[16];
+ var checksum = uint.MaxValue;
+ var remaningBytes = sequence.Length;
+ var readingBlock = remaningBytes >= 16;
+ var offset = 15;
+
+ foreach (var memory in sequence)
+ {
+ var span = memory.Span;
+ for (var i = 0; i < span.Length; ++i)
+ {
+ var currentByte = span[i];
+
+ if (!readingBlock)
+ {
+ checksum = Lookup[(byte)(checksum ^ currentByte)] ^ checksum >> 8;
+ continue;
+ }
+
+ var offSetBase = offset * 256;
+
+ if (offset > 11)
+ block[offset] = Lookup[offSetBase + ((byte)(checksum >> (8 * (15 - offset))) ^ currentByte)];
+ else
+ block[offset] = Lookup[offSetBase + currentByte];
+
+ --remaningBytes;
+
+ if (offset == 0)
+ {
+ offset = 15;
+ readingBlock = remaningBytes >= 16;
+ checksum = 0;
+ for (var j = 0; j < block.Length; ++j)
+ checksum ^= block[j];
+ }
+ else
+ {
+ --offset;
+ }
+ }
+ }
+
+ return checksum ^ uint.MaxValue;
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs b/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
new file mode 100644
index 0000000..328cc70
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
@@ -0,0 +1,9 @@
+using DotPulsar.Exceptions;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class ConsumerNotFoundException : DotPulsarException
+ {
+ public ConsumerNotFoundException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
new file mode 100644
index 0000000..ca43151
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
@@ -0,0 +1,9 @@
+using DotPulsar.Exceptions;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class ServiceNotReadyException : DotPulsarException
+ {
+ public ServiceNotReadyException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/StreamNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/StreamNotReadyException.cs
new file mode 100644
index 0000000..5618e19
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/StreamNotReadyException.cs
@@ -0,0 +1,9 @@
+using DotPulsar.Exceptions;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class StreamNotReadyException : DotPulsarException
+ {
+ public StreamNotReadyException() : base("The service is not ready yet") { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs b/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
new file mode 100644
index 0000000..d44620f
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
@@ -0,0 +1,9 @@
+using DotPulsar.Exceptions;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class TooManyRequestsException : DotPulsarException
+ {
+ public TooManyRequestsException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs b/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
new file mode 100644
index 0000000..d783deb
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
@@ -0,0 +1,11 @@
+using DotPulsar.Exceptions;
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+ public sealed class UnexpectedResponseException : DotPulsarException
+ {
+ public UnexpectedResponseException(string message) : base(message) { }
+ public UnexpectedResponseException(string message, Exception innerException) : base(message, innerException) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Executor.cs b/src/DotPulsar/Internal/Executor.cs
new file mode 100644
index 0000000..1f5b5b6
--- /dev/null
+++ b/src/DotPulsar/Internal/Executor.cs
@@ -0,0 +1,102 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class Executor : IDisposable
+ {
+ private readonly AsyncLock _lock;
+ private readonly Func<Exception, CancellationToken, Task> _onException;
+
+ public Executor(Func<Exception, CancellationToken, Task> onException)
+ {
+ _lock = new AsyncLock();
+ _onException = onException;
+ }
+
+ public void Dispose() => _lock.Dispose();
+
+ public async Task Execute(Action action, CancellationToken cancellationToken)
+ {
+ using (await _lock.Lock(cancellationToken))
+ {
+ while (true)
+ {
+ try
+ {
+ action();
+ return;
+ }
+ catch (Exception exception)
+ {
+ await _onException(exception, cancellationToken);
+ }
+
+ cancellationToken.ThrowIfCancellationRequested();
+ }
+ }
+ }
+
+ public async Task Execute(Func<Task> func, CancellationToken cancellationToken)
+ {
+ using (await _lock.Lock(cancellationToken))
+ {
+ while (true)
+ {
+ try
+ {
+ await func();
+ return;
+ }
+ catch (Exception exception)
+ {
+ await _onException(exception, cancellationToken);
+ }
+
+ cancellationToken.ThrowIfCancellationRequested();
+ }
+ }
+ }
+
+ public async Task<TResult> Execute<TResult>(Func<TResult> func, CancellationToken cancellationToken)
+ {
+ using (await _lock.Lock(cancellationToken))
+ {
+ while (true)
+ {
+ try
+ {
+ return func();
+ }
+ catch (Exception exception)
+ {
+ await _onException(exception, cancellationToken);
+ }
+
+ cancellationToken.ThrowIfCancellationRequested();
+ }
+ }
+ }
+
+ public async Task<TResult> Execute<TResult>(Func<Task<TResult>> func, CancellationToken cancellationToken)
+ {
+ using (await _lock.Lock(cancellationToken))
+ {
+ while (true)
+ {
+ try
+ {
+ return await func();
+ }
+ catch (Exception exception)
+ {
+ await _onException(exception, cancellationToken);
+ }
+
+ cancellationToken.ThrowIfCancellationRequested();
+ }
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
new file mode 100644
index 0000000..29d743c
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -0,0 +1,193 @@
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Exceptions;
+using DotPulsar.Internal.PulsarApi;
+
+namespace DotPulsar.Internal.Extensions
+{
+ public static class CommandExtensions
+ {
+ public static void Expect(this BaseCommand command, params BaseCommand.Type[] types)
+ {
+ var actual = command.CommandType;
+
+ foreach (var type in types)
+ {
+ if (type == actual)
+ return;
+ }
+
+ switch (actual)
+ {
+ case BaseCommand.Type.Error:
+ command.Error.Throw();
+ return;
+ case BaseCommand.Type.SendError:
+ command.SendError.Throw();
+ return;
+ }
+
+ throw new UnexpectedResponseException($"Expected '{string.Join(",", types)}' but got '{actual}'");
+ }
+
+ public static void Throw(this CommandSendError command) => Throw(command.Error, command.Message);
+
+ public static void Throw(this CommandLookupTopicResponse command) => Throw(command.Error, command.Message);
+
+ public static void Throw(this CommandError error) => Throw(error.Error, error.Message);
+
+ private static void Throw(ServerError error, string message)
+ {
+ switch (error)
+ {
+ case ServerError.AuthenticationError: throw new AuthenticationException(message);
+ case ServerError.AuthorizationError: throw new AuthorizationException(message);
+ case ServerError.ChecksumError: throw new ChecksumException(message);
+ case ServerError.ConsumerAssignError: throw new ConsumerAssignException(message);
+ case ServerError.ConsumerBusy: throw new ConsumerBusyException(message);
+ case ServerError.ConsumerNotFound: throw new ConsumerNotFoundException(message);
+ case ServerError.IncompatibleSchema: throw new IncompatibleSchemaException(message);
+ case ServerError.InvalidTopicName: throw new InvalidTopicNameException(message);
+ case ServerError.MetadataError: throw new MetadataException(message);
+ case ServerError.PersistenceError: throw new PersistenceException(message);
+ case ServerError.ProducerBlockedQuotaExceededError:
+ case ServerError.ProducerBlockedQuotaExceededException:
+ throw new ProducerBlockedQuotaExceededException(message + ". Error code: " + error);
+ case ServerError.ProducerBusy: throw new ProducerBusyException(message);
+ case ServerError.ServiceNotReady: throw new ServiceNotReadyException(message);
+ case ServerError.SubscriptionNotFound: throw new SubscriptionNotFoundException(message);
+ case ServerError.TooManyRequests: throw new TooManyRequestsException(message);
+ case ServerError.TopicNotFound: throw new TopicNotFoundException(message);
+ case ServerError.TopicTerminatedError: throw new TopicTerminatedException(message);
+ case ServerError.UnsupportedVersionError: throw new UnsupportedVersionException(message);
+ case ServerError.UnknownError:
+ default: throw new UnknownException(message + ". Error code: " + error);
+ }
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandAck command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Ack,
+ Ack = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandConnect command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Connect,
+ Connect = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandPing command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Ping,
+ Ping = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandPong command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Pong,
+ Pong = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandProducer command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Producer,
+ Producer = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandGetLastMessageId command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.GetLastMessageId,
+ GetLastMessageId = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandUnsubscribe command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Unsubscribe,
+ Unsubscribe = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandSubscribe command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Subscribe,
+ Subscribe = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandLookupTopic command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Lookup,
+ LookupTopic = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandSend command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Send,
+ Send = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandFlow command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Flow,
+ Flow = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandCloseProducer command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.CloseProducer,
+ CloseProducer = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandCloseConsumer command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.CloseConsumer,
+ CloseConsumer = command
+ };
+ }
+
+ public static BaseCommand AsBaseCommand(this CommandSeek command)
+ {
+ return new BaseCommand
+ {
+ CommandType = BaseCommand.Type.Seek,
+ Seek = command
+ };
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
new file mode 100644
index 0000000..e477974
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -0,0 +1,82 @@
+using System;
+using System.Buffers;
+
+namespace DotPulsar.Internal.Extensions
+{
+ public static class ReadOnlySequenceExtensions
+ {
+ public static bool StartsWith<T>(this ReadOnlySequence<T> sequence, ReadOnlyMemory<T> target) where T : IEquatable<T>
+ {
+ if (target.Length > sequence.Length)
+ return false;
+
+ var targetIndex = 0;
+ var targetSpan = target.Span;
+
+ foreach (var memory in sequence)
+ {
+ var span = memory.Span;
+ for (var i = 0; i < span.Length; ++i)
+ {
+ if (!span[i].Equals(targetSpan[targetIndex]))
+ return false;
+
+ ++targetIndex;
+ if (targetIndex == targetSpan.Length)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public static uint ReadUInt32(this ReadOnlySequence<byte> sequence, int start, bool isBigEndian)
+ {
+ if (sequence.Length < (4 + start))
+ throw new ArgumentOutOfRangeException(nameof(start), start, "Sequence must be at least 4 bytes long from 'start' to end");
+
+ var reverse = isBigEndian != BitConverter.IsLittleEndian;
+ var union = new UIntUnion();
+ var read = 0;
+
+ foreach (var memory in sequence)
+ {
+ if (start > memory.Length)
+ {
+ start -= memory.Length;
+ continue;
+ }
+
+ var span = memory.Span;
+ for (var i = start; i < span.Length; ++i, ++read)
+ {
+ switch (read)
+ {
+ case 0:
+ if (reverse) union.B0 = span[i];
+ else union.B3 = span[i];
+ continue;
+ case 1:
+ if (reverse) union.B1 = span[i];
+ else union.B2 = span[i];
+ continue;
+ case 2:
+ if (reverse) union.B2 = span[i];
+ else union.B1 = span[i];
+ continue;
+ case 3:
+ if (reverse) union.B3 = span[i];
+ else union.B0 = span[i];
+ break;
+ }
+ }
+
+ if (read == 3)
+ break;
+ start = 0;
+ }
+
+ return union.UInt;
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/FaultAction.cs b/src/DotPulsar/Internal/FaultAction.cs
new file mode 100644
index 0000000..263567a
--- /dev/null
+++ b/src/DotPulsar/Internal/FaultAction.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar.Internal
+{
+ public enum FaultAction : byte
+ {
+ Retry,
+ Relookup,
+ Fault
+ }
+}
diff --git a/src/DotPulsar/Internal/FaultStrategy.cs b/src/DotPulsar/Internal/FaultStrategy.cs
new file mode 100644
index 0000000..cfd91a0
--- /dev/null
+++ b/src/DotPulsar/Internal/FaultStrategy.cs
@@ -0,0 +1,30 @@
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
+using System;
+
+namespace DotPulsar.Internal
+{
+ public sealed class FaultStrategy : IFaultStrategy
+ {
+ public FaultStrategy(int timeToWaitInMilliseconds)
+ {
+ TimeToWait = TimeSpan.FromMilliseconds(timeToWaitInMilliseconds);
+ }
+
+ public TimeSpan TimeToWait { get; }
+
+ public FaultAction DetermineFaultAction(Exception exception)
+ {
+ switch (exception)
+ {
+ case TooManyRequestsException _: return FaultAction.Retry;
+ case StreamNotReadyException _: return FaultAction.Relookup;
+ case ServiceNotReadyException _: return FaultAction.Relookup;
+ case DotPulsarException _: return FaultAction.Fault;
+ }
+
+ return FaultAction.Relookup;
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/IdLookup.cs b/src/DotPulsar/Internal/IdLookup.cs
new file mode 100644
index 0000000..433d9e2
--- /dev/null
+++ b/src/DotPulsar/Internal/IdLookup.cs
@@ -0,0 +1,59 @@
+using System.Collections.Generic;
+
+namespace DotPulsar.Internal
+{
+ public sealed class IdLookup<T> where T : class
+ {
+ private T[] _items;
+
+ public IdLookup() => _items = new T[0];
+
+ public bool IsEmpty()
+ {
+ for (var i = 0; i < _items.Length; ++i)
+ {
+ if (_items[i] != null)
+ return false;
+ }
+
+ return true;
+ }
+
+ public ulong[] AllIds()
+ {
+ var activeIds = new List<ulong>();
+ for (var i = 0; i < _items.Length; ++i)
+ {
+ if (_items[i] != null)
+ activeIds.Add((ulong)i);
+ }
+ return activeIds.ToArray();
+ }
+
+ public ulong Add(T item)
+ {
+ for (int i = 0; i < _items.Length; ++i)
+ {
+ if (_items[i] != null)
+ continue;
+
+ _items[i] = item;
+ return (ulong)i;
+ }
+
+ var newArray = new T[_items.Length + 1];
+ _items.CopyTo(newArray, 0);
+ var id = newArray.Length - 1;
+ newArray[id] = item;
+ _items = newArray;
+ return (ulong)id;
+ }
+
+ public void Remove(ulong id) => _items[(int)id] = null;
+
+ public T this[ulong id]
+ {
+ get => _items[(int)id];
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
new file mode 100644
index 0000000..394a3f6
--- /dev/null
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -0,0 +1,39 @@
+using DotPulsar.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class MessageBuilder : IMessageBuilder
+ {
+ private readonly IProducer _producer;
+ private readonly MessageMetadata _metadata;
+
+ public MessageBuilder(IProducer producer)
+ {
+ _producer = producer;
+ _metadata = new MessageMetadata();
+ }
+
+ public IMessageBuilder EventTime(ulong eventTime)
+ {
+ _metadata.EventTime = eventTime;
+ return this;
+ }
+
+ public IMessageBuilder Property(string key, string value)
+ {
+ _metadata[key] = value;
+ return this;
+ }
+
+ public IMessageBuilder SequenceId(ulong sequenceId)
+ {
+ _metadata.SequenceId = sequenceId;
+ return this;
+ }
+
+ public async Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) => await _producer.Send(_metadata, data, cancellationToken);
+ }
+}
diff --git a/src/DotPulsar/Internal/MessagePackage.cs b/src/DotPulsar/Internal/MessagePackage.cs
new file mode 100644
index 0000000..2aaf738
--- /dev/null
+++ b/src/DotPulsar/Internal/MessagePackage.cs
@@ -0,0 +1,17 @@
+using DotPulsar.Internal.PulsarApi;
+using System.Buffers;
+
+namespace DotPulsar.Internal
+{
+ public sealed class MessagePackage
+ {
+ public MessagePackage(CommandMessage command, ReadOnlySequence<byte> data)
+ {
+ Command = command;
+ Data = data;
+ }
+
+ public CommandMessage Command { get; }
+ public ReadOnlySequence<byte> Data { get; }
+ }
+}
diff --git a/src/DotPulsar/Internal/NotReadyStream.cs b/src/DotPulsar/Internal/NotReadyStream.cs
new file mode 100644
index 0000000..0b4f888
--- /dev/null
+++ b/src/DotPulsar/Internal/NotReadyStream.cs
@@ -0,0 +1,28 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class NotReadyStream : IConsumerStream, IProducerStream
+ {
+ public void Dispose() { }
+
+ public Task<Message> Receive(CancellationToken cancellationToken) => throw GetException();
+
+ public Task Send(CommandAck command) => throw GetException();
+
+ public Task<CommandSuccess> Send(CommandUnsubscribe command) => throw GetException();
+
+ public Task<CommandSuccess> Send(CommandSeek command) => throw GetException();
+
+ public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command) => throw GetException();
+
+ public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload) => throw GetException();
+
+ private Exception GetException() => new StreamNotReadyException();
+ }
+}
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs
new file mode 100644
index 0000000..e2367b4
--- /dev/null
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -0,0 +1,21 @@
+using DotPulsar.Internal.PulsarApi;
+
+namespace DotPulsar.Internal
+{
+ public sealed class PingPongHandler
+ {
+ private readonly Connection _connection;
+ private readonly CommandPong _pong;
+
+ public PingPongHandler(Connection connection)
+ {
+ _connection = connection;
+ _pong = new CommandPong();
+ }
+
+ public void Incoming(CommandPing ping)
+ {
+ _ = _connection.Send(_pong);
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
new file mode 100644
index 0000000..d37d351
--- /dev/null
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -0,0 +1,116 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class Producer : IProducer
+ {
+ private readonly Executor _executor;
+ private readonly IProducerStreamFactory _streamFactory;
+ private readonly IFaultStrategy _faultStrategy;
+ private readonly StateManager<ProducerState> _stateManager;
+ private readonly CancellationTokenSource _connectTokenSource;
+ private readonly Task _connectTask;
+ private Action _throwIfClosedOrFaulted;
+ private IProducerStream Stream { get; set; }
+
+ public Producer(IProducerStreamFactory streamFactory, IFaultStrategy faultStrategy)
+ {
+ _executor = new Executor(ExecutorOnException);
+ _streamFactory = streamFactory;
+ _faultStrategy = faultStrategy;
+ _stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+ _connectTokenSource = new CancellationTokenSource();
+ Stream = new NotReadyStream();
+ _connectTask = Connect(_connectTokenSource.Token);
+ _throwIfClosedOrFaulted = () => { };
+ }
+
+ public async Task<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+ public async Task<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+ public bool IsFinalState() => _stateManager.IsFinalState();
+ public bool IsFinalState(ProducerState state) => _stateManager.IsFinalState(state);
+
+ public void Dispose()
+ {
+ _executor.Dispose();
+ _connectTokenSource.Cancel();
+ _connectTask.Wait();
+ }
+
+ public async Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
+ => await Send(new MessageMetadata(), data, cancellationToken);
+
+ public async Task<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
+ => await Send(metadata.Metadata, data, cancellationToken);
+
+ private async Task<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload, CancellationToken cancellationToken)
+ {
+ var response = await _executor.Execute(() => Stream.Send(metadata, payload), cancellationToken);
+ return new MessageId(response.MessageId);
+ }
+
+ private async Task ExecutorOnException(Exception exception, CancellationToken cancellationToken)
+ {
+ _throwIfClosedOrFaulted();
+
+ switch (_faultStrategy.DetermineFaultAction(exception))
+ {
+ case FaultAction.Retry:
+ await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+ break;
+ case FaultAction.Relookup:
+ await _stateManager.StateChangedTo(ProducerState.Connected, cancellationToken);
+ break;
+ case FaultAction.Fault:
+ HasFaulted(exception);
+ break;
+ }
+
+ _throwIfClosedOrFaulted();
+ }
+
+ private void HasFaulted(Exception exception)
+ {
+ _throwIfClosedOrFaulted = () => throw exception;
+ _stateManager.SetState(ProducerState.Faulted);
+ }
+
+ private void HasClosed()
+ {
+ _throwIfClosedOrFaulted = () => throw new ProducerClosedException();
+ _stateManager.SetState(ProducerState.Closed);
+ }
+
+ private async Task Connect(CancellationToken cancellationToken)
+ {
+ try
+ {
+ while (true)
+ {
+ var proxy = new ProducerProxy(_stateManager);
+
+ using (Stream = await _streamFactory.CreateStream(proxy, cancellationToken))
+ {
+ proxy.Connected();
+ await _stateManager.StateChangedFrom(ProducerState.Connected, cancellationToken);
+ if (_stateManager.IsFinalState())
+ return;
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ HasClosed();
+ }
+ catch (Exception exception)
+ {
+ HasFaulted(exception);
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
new file mode 100644
index 0000000..654086c
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -0,0 +1,36 @@
+using DotPulsar.Abstractions;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ProducerBuilder : IProducerBuilder
+ {
+ private readonly IPulsarClient _pulsarClient;
+ private readonly ProducerOptions _options;
+
+ public ProducerBuilder(IPulsarClient pulsarClient)
+ {
+ _pulsarClient = pulsarClient;
+ _options = new ProducerOptions();
+ }
+
+ public IProducerBuilder ProducerName(string name)
+ {
+ _options.ProducerName = name;
+ return this;
+ }
+
+ public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
+ {
+ _options.InitialSequenceId = initialSequenceId;
+ return this;
+ }
+
+ public IProducerBuilder Topic(string topic)
+ {
+ _options.Topic = topic;
+ return this;
+ }
+
+ public IProducer Create() => _pulsarClient.CreateProducer(_options);
+ }
+}
diff --git a/src/DotPulsar/Internal/ProducerManager.cs b/src/DotPulsar/Internal/ProducerManager.cs
new file mode 100644
index 0000000..2bd6494
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerManager.cs
@@ -0,0 +1,37 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ProducerManager : IDisposable
+ {
+ private readonly IdLookup<IProducerProxy> _proxies;
+
+ public ProducerManager() => _proxies = new IdLookup<IProducerProxy>();
+
+ public bool HasProducers => !_proxies.IsEmpty();
+
+ public void Outgoing(CommandProducer producer, IProducerProxy proxy) => producer.ProducerId = _proxies.Add(proxy);
+
+ public void Remove(ulong producerId) => _proxies.Remove(producerId);
+
+ public void Incoming(CommandCloseProducer command) => RemoveProducer(command.ProducerId);
+
+ public void Dispose()
+ {
+ foreach (var id in _proxies.AllIds())
+ {
+ RemoveProducer(id);
+ }
+ }
+
+ private void RemoveProducer(ulong producerId)
+ {
+ var stateManager = _proxies[producerId];
+ if (stateManager == null) return;
+ stateManager.Disconnected();
+ _proxies.Remove(producerId);
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ProducerProxy.cs b/src/DotPulsar/Internal/ProducerProxy.cs
new file mode 100644
index 0000000..f5711e5
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerProxy.cs
@@ -0,0 +1,39 @@
+using DotPulsar.Internal.Abstractions;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ProducerProxy : IProducerProxy
+ {
+ private readonly object _lock;
+ private readonly StateManager<ProducerState> _stateManager;
+ private bool _hasDisconnected;
+
+ public ProducerProxy(StateManager<ProducerState> stateManager)
+ {
+ _lock = new object();
+ _stateManager = stateManager;
+ _hasDisconnected = false;
+ }
+
+ public void Connected()
+ {
+ lock (_lock)
+ {
+ if (!_hasDisconnected)
+ _stateManager.SetState(ProducerState.Connected);
+ }
+ }
+
+ public void Disconnected()
+ {
+ lock (_lock)
+ {
+ if (_hasDisconnected)
+ return;
+
+ _stateManager.SetState(ProducerState.Disconnected);
+ _hasDisconnected = true;
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ProducerResponse.cs b/src/DotPulsar/Internal/ProducerResponse.cs
new file mode 100644
index 0000000..458d186
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerResponse.cs
@@ -0,0 +1,14 @@
+namespace DotPulsar.Internal
+{
+ public sealed class ProducerResponse
+ {
+ public ProducerResponse(ulong producerId, string producerName)
+ {
+ ProducerId = producerId;
+ ProducerName = producerName;
+ }
+
+ public ulong ProducerId { get; }
+ public string ProducerName { get; }
+ }
+}
diff --git a/src/DotPulsar/Internal/ProducerStream.cs b/src/DotPulsar/Internal/ProducerStream.cs
new file mode 100644
index 0000000..7b296e0
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerStream.cs
@@ -0,0 +1,69 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ProducerStream : IProducerStream
+ {
+ private readonly SendPackage _cachedSendPackage;
+ private readonly ulong _id;
+ private readonly string _name;
+ private readonly SequenceId _sequenceId;
+ private readonly Connection _connection;
+ private readonly IFaultStrategy _faultStrategy;
+ private readonly IProducerProxy _proxy;
+
+ public ProducerStream(ulong id, string name, SequenceId sequenceId, Connection connection, IFaultStrategy faultStrategy, IProducerProxy proxy)
+ {
+ _cachedSendPackage = new SendPackage(new CommandSend { ProducerId = id, NumMessages = 1 });
+ _id = id;
+ _name = name;
+ _sequenceId = sequenceId;
+ _connection = connection;
+ _faultStrategy = faultStrategy;
+ _proxy = proxy;
+ }
+
+ public void Dispose()
+ {
+ try
+ {
+ _connection.Send(new CommandCloseProducer { ProducerId = _id }).Wait();
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+
+ public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload)
+ {
+ try
+ {
+ metadata.PublishTime = (ulong)DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ metadata.ProducerName = _name;
+
+ if (metadata.SequenceId == 0)
+ metadata.SequenceId = _sequenceId.Current;
+
+ _cachedSendPackage.Command.SequenceId = metadata.SequenceId;
+ _cachedSendPackage.Metadata = metadata;
+ _cachedSendPackage.Payload = payload;
+
+ var response = await _connection.Send(_cachedSendPackage);
+ response.Expect(BaseCommand.Type.SendReceipt); //TODO find out if we should increment on SendError
+ _sequenceId.Increment();
+ return response.SendReceipt;
+ }
+ catch (Exception exception)
+ {
+ if (_faultStrategy.DetermineFaultAction(exception) == FaultAction.Relookup)
+ _proxy.Disconnected();
+ throw;
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ProducerStreamFactory.cs b/src/DotPulsar/Internal/ProducerStreamFactory.cs
new file mode 100644
index 0000000..5c3f138
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerStreamFactory.cs
@@ -0,0 +1,62 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ProducerStreamFactory : IProducerStreamFactory
+ {
+ private readonly ConnectionPool _connectionTool;
+ private readonly ProducerOptions _options;
+ private readonly IFaultStrategy _faultStrategy;
+ private readonly SequenceId _sequenceId;
+
+ public ProducerStreamFactory(ConnectionPool connectionManager, ProducerOptions options, IFaultStrategy faultStrategy)
+ {
+ _connectionTool = connectionManager;
+ _options = options;
+ _faultStrategy = faultStrategy;
+ _sequenceId = new SequenceId(options.InitialSequenceId);
+ }
+
+ public async Task<IProducerStream> CreateStream(IProducerProxy proxy, CancellationToken cancellationToken)
+ {
+ var commandProducer = new CommandProducer
+ {
+ ProducerName = _options.ProducerName,
+ Topic = _options.Topic
+ };
+
+ while (true)
+ {
+ try
+ {
+ var connection = await _connectionTool.FindConnectionForTopic(_options.Topic, cancellationToken);
+ var response = await connection.Send(commandProducer, proxy);
+ return new ProducerStream(response.ProducerId, response.ProducerName, _sequenceId, connection, _faultStrategy, proxy);
+ }
+ catch (OperationCanceledException)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ throw;
+ else
+ await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+ }
+ catch (Exception exception)
+ {
+ switch (_faultStrategy.DetermineFaultAction(exception))
+ {
+ case FaultAction.Relookup:
+ case FaultAction.Retry:
+ await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+ continue;
+ }
+
+ throw;
+ }
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs b/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
new file mode 100644
index 0000000..7560c72
--- /dev/null
+++ b/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
@@ -0,0 +1,1953 @@
+namespace DotPulsar.Internal.PulsarApi
+{
+ // This file was generated by a tool; you should avoid making direct changes.
+ // Consider using 'partial classes' to extend these types
+ // Input: PulsarApi.proto (Last edit: ba24d73)
+
+#pragma warning disable CS1591, CS0612, CS3021, IDE1006
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class Schema : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"name", IsRequired = true)]
+ public string Name { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"schema_data", IsRequired = true)]
+ public byte[] SchemaData { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, IsRequired = true)]
+ public SchemaType Type { get; set; }
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"properties")]
+ public global::System.Collections.Generic.List<KeyValue> Properties { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum SchemaType
+ {
+ None = 0,
+ String = 1,
+ Json = 2,
+ Protobuf = 3,
+ Avro = 4,
+ Bool = 5,
+ Int8 = 6,
+ Int16 = 7,
+ Int32 = 8,
+ Int64 = 9,
+ Float = 10,
+ Double = 11,
+ Date = 12,
+ Time = 13,
+ Timestamp = 14,
+ KeyValue = 15,
+ }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class MessageIdData : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true)]
+ public ulong LedgerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true)]
+ public ulong EntryId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"partition")]
+ [global::System.ComponentModel.DefaultValue(-1)]
+ public int Partition
+ {
+ get { return __pbn__Partition ?? -1; }
+ set { __pbn__Partition = value; }
+ }
+ public bool ShouldSerializePartition() => __pbn__Partition != null;
+ public void ResetPartition() => __pbn__Partition = null;
+ private int? __pbn__Partition;
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"batch_index")]
+ [global::System.ComponentModel.DefaultValue(-1)]
+ public int BatchIndex
+ {
+ get { return __pbn__BatchIndex ?? -1; }
+ set { __pbn__BatchIndex = value; }
+ }
+ public bool ShouldSerializeBatchIndex() => __pbn__BatchIndex != null;
+ public void ResetBatchIndex() => __pbn__BatchIndex = null;
+ private int? __pbn__BatchIndex;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class KeyValue : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"key", IsRequired = true)]
+ public string Key { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"value", IsRequired = true)]
+ public string Value { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class KeyLongValue : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"key", IsRequired = true)]
+ public string Key { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"value", IsRequired = true)]
+ public ulong Value { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class EncryptionKeys : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"key", IsRequired = true)]
+ public string Key { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"value", IsRequired = true)]
+ public byte[] Value { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"metadata")]
+ public global::System.Collections.Generic.List<KeyValue> Metadatas { get; } = new global::System.Collections.Generic.List<KeyValue>();
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class MessageMetadata : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"producer_name", IsRequired = true)]
+ public string ProducerName { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"sequence_id", IsRequired = true)]
+ public ulong SequenceId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"publish_time", IsRequired = true)]
+ public ulong PublishTime { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"properties")]
+ public global::System.Collections.Generic.List<KeyValue> Properties { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"replicated_from")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ReplicatedFrom
+ {
+ get { return __pbn__ReplicatedFrom ?? ""; }
+ set { __pbn__ReplicatedFrom = value; }
+ }
+ public bool ShouldSerializeReplicatedFrom() => __pbn__ReplicatedFrom != null;
+ public void ResetReplicatedFrom() => __pbn__ReplicatedFrom = null;
+ private string __pbn__ReplicatedFrom;
+
+ [global::ProtoBuf.ProtoMember(6, Name = @"partition_key")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string PartitionKey
+ {
+ get { return __pbn__PartitionKey ?? ""; }
+ set { __pbn__PartitionKey = value; }
+ }
+ public bool ShouldSerializePartitionKey() => __pbn__PartitionKey != null;
+ public void ResetPartitionKey() => __pbn__PartitionKey = null;
+ private string __pbn__PartitionKey;
+
+ [global::ProtoBuf.ProtoMember(7, Name = @"replicate_to")]
+ public global::System.Collections.Generic.List<string> ReplicateToes { get; } = new global::System.Collections.Generic.List<string>();
+
+ [global::ProtoBuf.ProtoMember(8, Name = @"compression")]
+ [global::System.ComponentModel.DefaultValue(CompressionType.None)]
+ public CompressionType Compression
+ {
+ get { return __pbn__Compression ?? CompressionType.None; }
+ set { __pbn__Compression = value; }
+ }
+ public bool ShouldSerializeCompression() => __pbn__Compression != null;
+ public void ResetCompression() => __pbn__Compression = null;
+ private CompressionType? __pbn__Compression;
+
+ [global::ProtoBuf.ProtoMember(9, Name = @"uncompressed_size")]
+ [global::System.ComponentModel.DefaultValue(0)]
+ public uint UncompressedSize
+ {
+ get { return __pbn__UncompressedSize ?? 0; }
+ set { __pbn__UncompressedSize = value; }
+ }
+ public bool ShouldSerializeUncompressedSize() => __pbn__UncompressedSize != null;
+ public void ResetUncompressedSize() => __pbn__UncompressedSize = null;
+ private uint? __pbn__UncompressedSize;
+
+ [global::ProtoBuf.ProtoMember(11, Name = @"num_messages_in_batch")]
+ [global::System.ComponentModel.DefaultValue(1)]
+ public int NumMessagesInBatch
+ {
+ get { return __pbn__NumMessagesInBatch ?? 1; }
+ set { __pbn__NumMessagesInBatch = value; }
+ }
+ public bool ShouldSerializeNumMessagesInBatch() => __pbn__NumMessagesInBatch != null;
+ public void ResetNumMessagesInBatch() => __pbn__NumMessagesInBatch = null;
+ private int? __pbn__NumMessagesInBatch;
+
+ [global::ProtoBuf.ProtoMember(12, Name = @"event_time")]
+ [global::System.ComponentModel.DefaultValue(0)]
+ public ulong EventTime
+ {
+ get { return __pbn__EventTime ?? 0; }
+ set { __pbn__EventTime = value; }
+ }
+ public bool ShouldSerializeEventTime() => __pbn__EventTime != null;
+ public void ResetEventTime() => __pbn__EventTime = null;
+ private ulong? __pbn__EventTime;
+
+ [global::ProtoBuf.ProtoMember(13, Name = @"encryption_keys")]
+ public global::System.Collections.Generic.List<EncryptionKeys> EncryptionKeys { get; } = new global::System.Collections.Generic.List<EncryptionKeys>();
+
+ [global::ProtoBuf.ProtoMember(14, Name = @"encryption_algo")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string EncryptionAlgo
+ {
+ get { return __pbn__EncryptionAlgo ?? ""; }
+ set { __pbn__EncryptionAlgo = value; }
+ }
+ public bool ShouldSerializeEncryptionAlgo() => __pbn__EncryptionAlgo != null;
+ public void ResetEncryptionAlgo() => __pbn__EncryptionAlgo = null;
+ private string __pbn__EncryptionAlgo;
+
+ [global::ProtoBuf.ProtoMember(15, Name = @"encryption_param")]
+ public byte[] EncryptionParam
+ {
+ get { return __pbn__EncryptionParam; }
+ set { __pbn__EncryptionParam = value; }
+ }
+ public bool ShouldSerializeEncryptionParam() => __pbn__EncryptionParam != null;
+ public void ResetEncryptionParam() => __pbn__EncryptionParam = null;
+ private byte[] __pbn__EncryptionParam;
+
+ [global::ProtoBuf.ProtoMember(16, Name = @"schema_version")]
+ public byte[] SchemaVersion
+ {
+ get { return __pbn__SchemaVersion; }
+ set { __pbn__SchemaVersion = value; }
+ }
+ public bool ShouldSerializeSchemaVersion() => __pbn__SchemaVersion != null;
+ public void ResetSchemaVersion() => __pbn__SchemaVersion = null;
+ private byte[] __pbn__SchemaVersion;
+
+ [global::ProtoBuf.ProtoMember(17, Name = @"partition_key_b64_encoded")]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool PartitionKeyB64Encoded
+ {
+ get { return __pbn__PartitionKeyB64Encoded ?? false; }
+ set { __pbn__PartitionKeyB64Encoded = value; }
+ }
+ public bool ShouldSerializePartitionKeyB64Encoded() => __pbn__PartitionKeyB64Encoded != null;
+ public void ResetPartitionKeyB64Encoded() => __pbn__PartitionKeyB64Encoded = null;
+ private bool? __pbn__PartitionKeyB64Encoded;
+
+ [global::ProtoBuf.ProtoMember(18, Name = @"ordering_key")]
+ public byte[] OrderingKey
+ {
+ get { return __pbn__OrderingKey; }
+ set { __pbn__OrderingKey = value; }
+ }
+ public bool ShouldSerializeOrderingKey() => __pbn__OrderingKey != null;
+ public void ResetOrderingKey() => __pbn__OrderingKey = null;
+ private byte[] __pbn__OrderingKey;
+
+ [global::ProtoBuf.ProtoMember(19, Name = @"deliver_at_time")]
+ public long DeliverAtTime
+ {
+ get { return __pbn__DeliverAtTime.GetValueOrDefault(); }
+ set { __pbn__DeliverAtTime = value; }
+ }
+ public bool ShouldSerializeDeliverAtTime() => __pbn__DeliverAtTime != null;
+ public void ResetDeliverAtTime() => __pbn__DeliverAtTime = null;
+ private long? __pbn__DeliverAtTime;
+
+ [global::ProtoBuf.ProtoMember(20, Name = @"marker_type")]
+ public int MarkerType
+ {
+ get { return __pbn__MarkerType.GetValueOrDefault(); }
+ set { __pbn__MarkerType = value; }
+ }
+ public bool ShouldSerializeMarkerType() => __pbn__MarkerType != null;
+ public void ResetMarkerType() => __pbn__MarkerType = null;
+ private int? __pbn__MarkerType;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class SingleMessageMetadata : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"properties")]
+ public global::System.Collections.Generic.List<KeyValue> Properties { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"partition_key")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string PartitionKey
+ {
+ get { return __pbn__PartitionKey ?? ""; }
+ set { __pbn__PartitionKey = value; }
+ }
+ public bool ShouldSerializePartitionKey() => __pbn__PartitionKey != null;
+ public void ResetPartitionKey() => __pbn__PartitionKey = null;
+ private string __pbn__PartitionKey;
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"payload_size", IsRequired = true)]
+ public int PayloadSize { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"compacted_out")]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool CompactedOut
+ {
+ get { return __pbn__CompactedOut ?? false; }
+ set { __pbn__CompactedOut = value; }
+ }
+ public bool ShouldSerializeCompactedOut() => __pbn__CompactedOut != null;
+ public void ResetCompactedOut() => __pbn__CompactedOut = null;
+ private bool? __pbn__CompactedOut;
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"event_time")]
+ [global::System.ComponentModel.DefaultValue(0)]
+ public ulong EventTime
+ {
+ get { return __pbn__EventTime ?? 0; }
+ set { __pbn__EventTime = value; }
+ }
+ public bool ShouldSerializeEventTime() => __pbn__EventTime != null;
+ public void ResetEventTime() => __pbn__EventTime = null;
+ private ulong? __pbn__EventTime;
+
+ [global::ProtoBuf.ProtoMember(6, Name = @"partition_key_b64_encoded")]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool PartitionKeyB64Encoded
+ {
+ get { return __pbn__PartitionKeyB64Encoded ?? false; }
+ set { __pbn__PartitionKeyB64Encoded = value; }
+ }
+ public bool ShouldSerializePartitionKeyB64Encoded() => __pbn__PartitionKeyB64Encoded != null;
+ public void ResetPartitionKeyB64Encoded() => __pbn__PartitionKeyB64Encoded = null;
+ private bool? __pbn__PartitionKeyB64Encoded;
+
+ [global::ProtoBuf.ProtoMember(7, Name = @"ordering_key")]
+ public byte[] OrderingKey
+ {
+ get { return __pbn__OrderingKey; }
+ set { __pbn__OrderingKey = value; }
+ }
+ public bool ShouldSerializeOrderingKey() => __pbn__OrderingKey != null;
+ public void ResetOrderingKey() => __pbn__OrderingKey = null;
+ private byte[] __pbn__OrderingKey;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandConnect : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"client_version", IsRequired = true)]
+ public string ClientVersion { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"auth_method")]
+ [global::System.ComponentModel.DefaultValue(AuthMethod.AuthMethodNone)]
+ public AuthMethod AuthMethod
+ {
+ get { return __pbn__AuthMethod ?? AuthMethod.AuthMethodNone; }
+ set { __pbn__AuthMethod = value; }
+ }
+ public bool ShouldSerializeAuthMethod() => __pbn__AuthMethod != null;
+ public void ResetAuthMethod() => __pbn__AuthMethod = null;
+ private AuthMethod? __pbn__AuthMethod;
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"auth_method_name")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string AuthMethodName
+ {
+ get { return __pbn__AuthMethodName ?? ""; }
+ set { __pbn__AuthMethodName = value; }
+ }
+ public bool ShouldSerializeAuthMethodName() => __pbn__AuthMethodName != null;
+ public void ResetAuthMethodName() => __pbn__AuthMethodName = null;
+ private string __pbn__AuthMethodName;
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"auth_data")]
+ public byte[] AuthData
+ {
+ get { return __pbn__AuthData; }
+ set { __pbn__AuthData = value; }
+ }
+ public bool ShouldSerializeAuthData() => __pbn__AuthData != null;
+ public void ResetAuthData() => __pbn__AuthData = null;
+ private byte[] __pbn__AuthData;
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"protocol_version")]
+ [global::System.ComponentModel.DefaultValue(0)]
+ public int ProtocolVersion
+ {
+ get { return __pbn__ProtocolVersion ?? 0; }
+ set { __pbn__ProtocolVersion = value; }
+ }
+ public bool ShouldSerializeProtocolVersion() => __pbn__ProtocolVersion != null;
+ public void ResetProtocolVersion() => __pbn__ProtocolVersion = null;
+ private int? __pbn__ProtocolVersion;
+
+ [global::ProtoBuf.ProtoMember(6, Name = @"proxy_to_broker_url")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ProxyToBrokerUrl
+ {
+ get { return __pbn__ProxyToBrokerUrl ?? ""; }
+ set { __pbn__ProxyToBrokerUrl = value; }
+ }
+ public bool ShouldSerializeProxyToBrokerUrl() => __pbn__ProxyToBrokerUrl != null;
+ public void ResetProxyToBrokerUrl() => __pbn__ProxyToBrokerUrl = null;
+ private string __pbn__ProxyToBrokerUrl;
+
+ [global::ProtoBuf.ProtoMember(7, Name = @"original_principal")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalPrincipal
+ {
+ get { return __pbn__OriginalPrincipal ?? ""; }
+ set { __pbn__OriginalPrincipal = value; }
+ }
+ public bool ShouldSerializeOriginalPrincipal() => __pbn__OriginalPrincipal != null;
+ public void ResetOriginalPrincipal() => __pbn__OriginalPrincipal = null;
+ private string __pbn__OriginalPrincipal;
+
+ [global::ProtoBuf.ProtoMember(8, Name = @"original_auth_data")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalAuthData
+ {
+ get { return __pbn__OriginalAuthData ?? ""; }
+ set { __pbn__OriginalAuthData = value; }
+ }
+ public bool ShouldSerializeOriginalAuthData() => __pbn__OriginalAuthData != null;
+ public void ResetOriginalAuthData() => __pbn__OriginalAuthData = null;
+ private string __pbn__OriginalAuthData;
+
+ [global::ProtoBuf.ProtoMember(9, Name = @"original_auth_method")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalAuthMethod
+ {
+ get { return __pbn__OriginalAuthMethod ?? ""; }
+ set { __pbn__OriginalAuthMethod = value; }
+ }
+ public bool ShouldSerializeOriginalAuthMethod() => __pbn__OriginalAuthMethod != null;
+ public void ResetOriginalAuthMethod() => __pbn__OriginalAuthMethod = null;
+ private string __pbn__OriginalAuthMethod;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandConnected : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"server_version", IsRequired = true)]
+ public string ServerVersion { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"protocol_version")]
+ [global::System.ComponentModel.DefaultValue(0)]
+ public int ProtocolVersion
+ {
+ get { return __pbn__ProtocolVersion ?? 0; }
+ set { __pbn__ProtocolVersion = value; }
+ }
+ public bool ShouldSerializeProtocolVersion() => __pbn__ProtocolVersion != null;
+ public void ResetProtocolVersion() => __pbn__ProtocolVersion = null;
+ private int? __pbn__ProtocolVersion;
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"max_message_size")]
+ public int MaxMessageSize
+ {
+ get { return __pbn__MaxMessageSize.GetValueOrDefault(); }
+ set { __pbn__MaxMessageSize = value; }
+ }
+ public bool ShouldSerializeMaxMessageSize() => __pbn__MaxMessageSize != null;
+ public void ResetMaxMessageSize() => __pbn__MaxMessageSize = null;
+ private int? __pbn__MaxMessageSize;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandAuthResponse : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"client_version")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ClientVersion
+ {
+ get { return __pbn__ClientVersion ?? ""; }
+ set { __pbn__ClientVersion = value; }
+ }
+ public bool ShouldSerializeClientVersion() => __pbn__ClientVersion != null;
+ public void ResetClientVersion() => __pbn__ClientVersion = null;
+ private string __pbn__ClientVersion;
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"response")]
+ public AuthData Response { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"protocol_version")]
+ [global::System.ComponentModel.DefaultValue(0)]
+ public int ProtocolVersion
+ {
+ get { return __pbn__ProtocolVersion ?? 0; }
+ set { __pbn__ProtocolVersion = value; }
+ }
+ public bool ShouldSerializeProtocolVersion() => __pbn__ProtocolVersion != null;
+ public void ResetProtocolVersion() => __pbn__ProtocolVersion = null;
+ private int? __pbn__ProtocolVersion;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandAuthChallenge : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"server_version")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ServerVersion
+ {
+ get { return __pbn__ServerVersion ?? ""; }
+ set { __pbn__ServerVersion = value; }
+ }
+ public bool ShouldSerializeServerVersion() => __pbn__ServerVersion != null;
+ public void ResetServerVersion() => __pbn__ServerVersion = null;
+ private string __pbn__ServerVersion;
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"challenge")]
+ public AuthData Challenge { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"protocol_version")]
+ [global::System.ComponentModel.DefaultValue(0)]
+ public int ProtocolVersion
+ {
+ get { return __pbn__ProtocolVersion ?? 0; }
+ set { __pbn__ProtocolVersion = value; }
+ }
+ public bool ShouldSerializeProtocolVersion() => __pbn__ProtocolVersion != null;
+ public void ResetProtocolVersion() => __pbn__ProtocolVersion = null;
+ private int? __pbn__ProtocolVersion;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class AuthData : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"auth_method_name")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string AuthMethodName
+ {
+ get { return __pbn__AuthMethodName ?? ""; }
+ set { __pbn__AuthMethodName = value; }
+ }
+ public bool ShouldSerializeAuthMethodName() => __pbn__AuthMethodName != null;
+ public void ResetAuthMethodName() => __pbn__AuthMethodName = null;
+ private string __pbn__AuthMethodName;
+
+ [global::ProtoBuf.ProtoMember(2)]
+ public byte[] auth_data
+ {
+ get { return __pbn__auth_data; }
+ set { __pbn__auth_data = value; }
+ }
+ public bool ShouldSerializeauth_data() => __pbn__auth_data != null;
+ public void Resetauth_data() => __pbn__auth_data = null;
+ private byte[] __pbn__auth_data;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandSubscribe : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"topic", IsRequired = true)]
+ public string Topic { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"subscription", IsRequired = true)]
+ public string Subscription { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, IsRequired = true)]
+ public SubType Type { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(6, Name = @"consumer_name")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ConsumerName
+ {
+ get { return __pbn__ConsumerName ?? ""; }
+ set { __pbn__ConsumerName = value; }
+ }
+ public bool ShouldSerializeConsumerName() => __pbn__ConsumerName != null;
+ public void ResetConsumerName() => __pbn__ConsumerName = null;
+ private string __pbn__ConsumerName;
+
+ [global::ProtoBuf.ProtoMember(7, Name = @"priority_level")]
+ public int PriorityLevel
+ {
+ get { return __pbn__PriorityLevel.GetValueOrDefault(); }
+ set { __pbn__PriorityLevel = value; }
+ }
+ public bool ShouldSerializePriorityLevel() => __pbn__PriorityLevel != null;
+ public void ResetPriorityLevel() => __pbn__PriorityLevel = null;
+ private int? __pbn__PriorityLevel;
+
+ [global::ProtoBuf.ProtoMember(8, Name = @"durable")]
+ [global::System.ComponentModel.DefaultValue(true)]
+ public bool Durable
+ {
+ get { return __pbn__Durable ?? true; }
+ set { __pbn__Durable = value; }
+ }
+ public bool ShouldSerializeDurable() => __pbn__Durable != null;
+ public void ResetDurable() => __pbn__Durable = null;
+ private bool? __pbn__Durable;
+
+ [global::ProtoBuf.ProtoMember(9, Name = @"start_message_id")]
+ public MessageIdData StartMessageId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(10, Name = @"metadata")]
+ public global::System.Collections.Generic.List<KeyValue> Metadatas { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+ [global::ProtoBuf.ProtoMember(11, Name = @"read_compacted")]
+ public bool ReadCompacted
+ {
+ get { return __pbn__ReadCompacted.GetValueOrDefault(); }
+ set { __pbn__ReadCompacted = value; }
+ }
+ public bool ShouldSerializeReadCompacted() => __pbn__ReadCompacted != null;
+ public void ResetReadCompacted() => __pbn__ReadCompacted = null;
+ private bool? __pbn__ReadCompacted;
+
+ [global::ProtoBuf.ProtoMember(12, Name = @"schema")]
+ public Schema Schema { get; set; }
+
+ [global::ProtoBuf.ProtoMember(13)]
+ [global::System.ComponentModel.DefaultValue(InitialPosition.Latest)]
+ public InitialPosition initialPosition
+ {
+ get { return __pbn__initialPosition ?? InitialPosition.Latest; }
+ set { __pbn__initialPosition = value; }
+ }
+ public bool ShouldSerializeinitialPosition() => __pbn__initialPosition != null;
+ public void ResetinitialPosition() => __pbn__initialPosition = null;
+ private InitialPosition? __pbn__initialPosition;
+
+ [global::ProtoBuf.ProtoMember(14, Name = @"replicate_subscription_state")]
+ public bool ReplicateSubscriptionState
+ {
+ get { return __pbn__ReplicateSubscriptionState.GetValueOrDefault(); }
+ set { __pbn__ReplicateSubscriptionState = value; }
+ }
+ public bool ShouldSerializeReplicateSubscriptionState() => __pbn__ReplicateSubscriptionState != null;
+ public void ResetReplicateSubscriptionState() => __pbn__ReplicateSubscriptionState = null;
+ private bool? __pbn__ReplicateSubscriptionState;
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum SubType
+ {
+ Exclusive = 0,
+ Shared = 1,
+ Failover = 2,
+ [global::ProtoBuf.ProtoEnum(Name = @"Key_Shared")]
+ KeyShared = 3,
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum InitialPosition
+ {
+ Latest = 0,
+ Earliest = 1,
+ }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandPartitionedTopicMetadata : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"topic", IsRequired = true)]
+ public string Topic { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"original_principal")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalPrincipal
+ {
+ get { return __pbn__OriginalPrincipal ?? ""; }
+ set { __pbn__OriginalPrincipal = value; }
+ }
+ public bool ShouldSerializeOriginalPrincipal() => __pbn__OriginalPrincipal != null;
+ public void ResetOriginalPrincipal() => __pbn__OriginalPrincipal = null;
+ private string __pbn__OriginalPrincipal;
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"original_auth_data")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalAuthData
+ {
+ get { return __pbn__OriginalAuthData ?? ""; }
+ set { __pbn__OriginalAuthData = value; }
+ }
+ public bool ShouldSerializeOriginalAuthData() => __pbn__OriginalAuthData != null;
+ public void ResetOriginalAuthData() => __pbn__OriginalAuthData = null;
+ private string __pbn__OriginalAuthData;
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"original_auth_method")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalAuthMethod
+ {
+ get { return __pbn__OriginalAuthMethod ?? ""; }
+ set { __pbn__OriginalAuthMethod = value; }
+ }
+ public bool ShouldSerializeOriginalAuthMethod() => __pbn__OriginalAuthMethod != null;
+ public void ResetOriginalAuthMethod() => __pbn__OriginalAuthMethod = null;
+ private string __pbn__OriginalAuthMethod;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandPartitionedTopicMetadataResponse : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"partitions")]
+ public uint Partitions
+ {
+ get { return __pbn__Partitions.GetValueOrDefault(); }
+ set { __pbn__Partitions = value; }
+ }
+ public bool ShouldSerializePartitions() => __pbn__Partitions != null;
+ public void ResetPartitions() => __pbn__Partitions = null;
+ private uint? __pbn__Partitions;
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"response")]
+ [global::System.ComponentModel.DefaultValue(LookupType.Success)]
+ public LookupType Response
+ {
+ get { return __pbn__Response ?? LookupType.Success; }
+ set { __pbn__Response = value; }
+ }
+ public bool ShouldSerializeResponse() => __pbn__Response != null;
+ public void ResetResponse() => __pbn__Response = null;
+ private LookupType? __pbn__Response;
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"error")]
+ [global::System.ComponentModel.DefaultValue(ServerError.UnknownError)]
+ public ServerError Error
+ {
+ get { return __pbn__Error ?? ServerError.UnknownError; }
+ set { __pbn__Error = value; }
+ }
+ public bool ShouldSerializeError() => __pbn__Error != null;
+ public void ResetError() => __pbn__Error = null;
+ private ServerError? __pbn__Error;
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"message")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string Message
+ {
+ get { return __pbn__Message ?? ""; }
+ set { __pbn__Message = value; }
+ }
+ public bool ShouldSerializeMessage() => __pbn__Message != null;
+ public void ResetMessage() => __pbn__Message = null;
+ private string __pbn__Message;
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum LookupType
+ {
+ Success = 0,
+ Failed = 1,
+ }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandLookupTopic : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"topic", IsRequired = true)]
+ public string Topic { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"authoritative")]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool Authoritative
+ {
+ get { return __pbn__Authoritative ?? false; }
+ set { __pbn__Authoritative = value; }
+ }
+ public bool ShouldSerializeAuthoritative() => __pbn__Authoritative != null;
+ public void ResetAuthoritative() => __pbn__Authoritative = null;
+ private bool? __pbn__Authoritative;
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"original_principal")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalPrincipal
+ {
+ get { return __pbn__OriginalPrincipal ?? ""; }
+ set { __pbn__OriginalPrincipal = value; }
+ }
+ public bool ShouldSerializeOriginalPrincipal() => __pbn__OriginalPrincipal != null;
+ public void ResetOriginalPrincipal() => __pbn__OriginalPrincipal = null;
+ private string __pbn__OriginalPrincipal;
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"original_auth_data")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalAuthData
+ {
+ get { return __pbn__OriginalAuthData ?? ""; }
+ set { __pbn__OriginalAuthData = value; }
+ }
+ public bool ShouldSerializeOriginalAuthData() => __pbn__OriginalAuthData != null;
+ public void ResetOriginalAuthData() => __pbn__OriginalAuthData = null;
+ private string __pbn__OriginalAuthData;
+
+ [global::ProtoBuf.ProtoMember(6, Name = @"original_auth_method")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string OriginalAuthMethod
+ {
+ get { return __pbn__OriginalAuthMethod ?? ""; }
+ set { __pbn__OriginalAuthMethod = value; }
+ }
+ public bool ShouldSerializeOriginalAuthMethod() => __pbn__OriginalAuthMethod != null;
+ public void ResetOriginalAuthMethod() => __pbn__OriginalAuthMethod = null;
+ private string __pbn__OriginalAuthMethod;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandLookupTopicResponse : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string BrokerServiceUrl
+ {
+ get { return __pbn__brokerServiceUrl ?? ""; }
+ set { __pbn__brokerServiceUrl = value; }
+ }
+ public bool ShouldSerializebrokerServiceUrl() => __pbn__brokerServiceUrl != null;
+ public void ResetbrokerServiceUrl() => __pbn__brokerServiceUrl = null;
+ private string __pbn__brokerServiceUrl;
+
+ [global::ProtoBuf.ProtoMember(2)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string BrokerServiceUrlTls
+ {
+ get { return __pbn__brokerServiceUrlTls ?? ""; }
+ set { __pbn__brokerServiceUrlTls = value; }
+ }
+ public bool ShouldSerializebrokerServiceUrlTls() => __pbn__brokerServiceUrlTls != null;
+ public void ResetbrokerServiceUrlTls() => __pbn__brokerServiceUrlTls = null;
+ private string __pbn__brokerServiceUrlTls;
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"response")]
+ [global::System.ComponentModel.DefaultValue(LookupType.Redirect)]
+ public LookupType Response
+ {
+ get { return __pbn__Response ?? LookupType.Redirect; }
+ set { __pbn__Response = value; }
+ }
+ public bool ShouldSerializeResponse() => __pbn__Response != null;
+ public void ResetResponse() => __pbn__Response = null;
+ private LookupType? __pbn__Response;
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"authoritative")]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool Authoritative
+ {
+ get { return __pbn__Authoritative ?? false; }
+ set { __pbn__Authoritative = value; }
+ }
+ public bool ShouldSerializeAuthoritative() => __pbn__Authoritative != null;
+ public void ResetAuthoritative() => __pbn__Authoritative = null;
+ private bool? __pbn__Authoritative;
+
+ [global::ProtoBuf.ProtoMember(6, Name = @"error")]
+ [global::System.ComponentModel.DefaultValue(ServerError.UnknownError)]
+ public ServerError Error
+ {
+ get { return __pbn__Error ?? ServerError.UnknownError; }
+ set { __pbn__Error = value; }
+ }
+ public bool ShouldSerializeError() => __pbn__Error != null;
+ public void ResetError() => __pbn__Error = null;
+ private ServerError? __pbn__Error;
+
+ [global::ProtoBuf.ProtoMember(7, Name = @"message")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string Message
+ {
+ get { return __pbn__Message ?? ""; }
+ set { __pbn__Message = value; }
+ }
+ public bool ShouldSerializeMessage() => __pbn__Message != null;
+ public void ResetMessage() => __pbn__Message = null;
+ private string __pbn__Message;
+
+ [global::ProtoBuf.ProtoMember(8, Name = @"proxy_through_service_url")]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool ProxyThroughServiceUrl
+ {
+ get { return __pbn__ProxyThroughServiceUrl ?? false; }
+ set { __pbn__ProxyThroughServiceUrl = value; }
+ }
+ public bool ShouldSerializeProxyThroughServiceUrl() => __pbn__ProxyThroughServiceUrl != null;
+ public void ResetProxyThroughServiceUrl() => __pbn__ProxyThroughServiceUrl = null;
+ private bool? __pbn__ProxyThroughServiceUrl;
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum LookupType
+ {
+ Redirect = 0,
+ Connect = 1,
+ Failed = 2,
+ }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandProducer : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"topic", IsRequired = true)]
+ public string Topic { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"producer_id", IsRequired = true)]
+ public ulong ProducerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"producer_name")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ProducerName
+ {
+ get { return __pbn__ProducerName ?? ""; }
+ set { __pbn__ProducerName = value; }
+ }
+ public bool ShouldSerializeProducerName() => __pbn__ProducerName != null;
+ public void ResetProducerName() => __pbn__ProducerName = null;
+ private string __pbn__ProducerName;
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"encrypted")]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool Encrypted
+ {
+ get { return __pbn__Encrypted ?? false; }
+ set { __pbn__Encrypted = value; }
+ }
+ public bool ShouldSerializeEncrypted() => __pbn__Encrypted != null;
+ public void ResetEncrypted() => __pbn__Encrypted = null;
+ private bool? __pbn__Encrypted;
+
+ [global::ProtoBuf.ProtoMember(6, Name = @"metadata")]
+ public global::System.Collections.Generic.List<KeyValue> Metadatas { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+ [global::ProtoBuf.ProtoMember(7, Name = @"schema")]
+ public Schema Schema { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandSend : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"producer_id", IsRequired = true)]
+ public ulong ProducerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"sequence_id", IsRequired = true)]
+ public ulong SequenceId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"num_messages")]
+ [global::System.ComponentModel.DefaultValue(1)]
+ public int NumMessages
+ {
+ get { return __pbn__NumMessages ?? 1; }
+ set { __pbn__NumMessages = value; }
+ }
+ public bool ShouldSerializeNumMessages() => __pbn__NumMessages != null;
+ public void ResetNumMessages() => __pbn__NumMessages = null;
+ private int? __pbn__NumMessages;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandSendReceipt : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"producer_id", IsRequired = true)]
+ public ulong ProducerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"sequence_id", IsRequired = true)]
+ public ulong SequenceId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"message_id")]
+ public MessageIdData MessageId { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandSendError : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"producer_id", IsRequired = true)]
+ public ulong ProducerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"sequence_id", IsRequired = true)]
+ public ulong SequenceId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"error", IsRequired = true)]
+ public ServerError Error { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"message", IsRequired = true)]
+ public string Message { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandMessage : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"message_id", IsRequired = true)]
+ public MessageIdData MessageId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"redelivery_count")]
+ [global::System.ComponentModel.DefaultValue(0)]
+ public uint RedeliveryCount
+ {
+ get { return __pbn__RedeliveryCount ?? 0; }
+ set { __pbn__RedeliveryCount = value; }
+ }
+ public bool ShouldSerializeRedeliveryCount() => __pbn__RedeliveryCount != null;
+ public void ResetRedeliveryCount() => __pbn__RedeliveryCount = null;
+ private uint? __pbn__RedeliveryCount;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandAck : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true)]
+ public AckType Type { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"message_id")]
+ public global::System.Collections.Generic.List<MessageIdData> MessageIds { get; } = new global::System.Collections.Generic.List<MessageIdData>(1);
+
+ [global::ProtoBuf.ProtoMember(4)]
+ [global::System.ComponentModel.DefaultValue(ValidationError.UncompressedSizeCorruption)]
+ public ValidationError validation_error
+ {
+ get { return __pbn__validation_error ?? ValidationError.UncompressedSizeCorruption; }
+ set { __pbn__validation_error = value; }
+ }
+ public bool ShouldSerializevalidation_error() => __pbn__validation_error != null;
+ public void Resetvalidation_error() => __pbn__validation_error = null;
+ private ValidationError? __pbn__validation_error;
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"properties")]
+ public global::System.Collections.Generic.List<KeyLongValue> Properties { get; } = new global::System.Collections.Generic.List<KeyLongValue>();
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum AckType
+ {
+ Individual = 0,
+ Cumulative = 1,
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum ValidationError
+ {
+ UncompressedSizeCorruption = 0,
+ DecompressionError = 1,
+ ChecksumMismatch = 2,
+ BatchDeSerializeError = 3,
+ DecryptionError = 4,
+ }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandActiveConsumerChange : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"is_active")]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool IsActive
+ {
+ get { return __pbn__IsActive ?? false; }
+ set { __pbn__IsActive = value; }
+ }
+ public bool ShouldSerializeIsActive() => __pbn__IsActive != null;
+ public void ResetIsActive() => __pbn__IsActive = null;
+ private bool? __pbn__IsActive;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandFlow : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true)]
+ public uint MessagePermits { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandUnsubscribe : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandSeek : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"message_id")]
+ public MessageIdData MessageId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"message_publish_time")]
+ public ulong MessagePublishTime
+ {
+ get { return __pbn__MessagePublishTime.GetValueOrDefault(); }
+ set { __pbn__MessagePublishTime = value; }
+ }
+ public bool ShouldSerializeMessagePublishTime() => __pbn__MessagePublishTime != null;
+ public void ResetMessagePublishTime() => __pbn__MessagePublishTime = null;
+ private ulong? __pbn__MessagePublishTime;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandReachedEndOfTopic : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandCloseProducer : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"producer_id", IsRequired = true)]
+ public ulong ProducerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandCloseConsumer : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandRedeliverUnacknowledgedMessages : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"message_ids")]
+ public global::System.Collections.Generic.List<MessageIdData> MessageIds { get; } = new global::System.Collections.Generic.List<MessageIdData>();
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandSuccess : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"schema")]
+ public Schema Schema { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandProducerSuccess : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"producer_name", IsRequired = true)]
+ public string ProducerName { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"last_sequence_id")]
+ [global::System.ComponentModel.DefaultValue(-1)]
+ public long LastSequenceId
+ {
+ get { return __pbn__LastSequenceId ?? -1; }
+ set { __pbn__LastSequenceId = value; }
+ }
+ public bool ShouldSerializeLastSequenceId() => __pbn__LastSequenceId != null;
+ public void ResetLastSequenceId() => __pbn__LastSequenceId = null;
+ private long? __pbn__LastSequenceId;
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"schema_version")]
+ public byte[] SchemaVersion
+ {
+ get { return __pbn__SchemaVersion; }
+ set { __pbn__SchemaVersion = value; }
+ }
+ public bool ShouldSerializeSchemaVersion() => __pbn__SchemaVersion != null;
+ public void ResetSchemaVersion() => __pbn__SchemaVersion = null;
+ private byte[] __pbn__SchemaVersion;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandError : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"error", IsRequired = true)]
+ public ServerError Error { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"message", IsRequired = true)]
+ public string Message { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandPing : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandPong : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandConsumerStats : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandConsumerStatsResponse : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"error_code")]
+ [global::System.ComponentModel.DefaultValue(ServerError.UnknownError)]
+ public ServerError ErrorCode
+ {
+ get { return __pbn__ErrorCode ?? ServerError.UnknownError; }
+ set { __pbn__ErrorCode = value; }
+ }
+ public bool ShouldSerializeErrorCode() => __pbn__ErrorCode != null;
+ public void ResetErrorCode() => __pbn__ErrorCode = null;
+ private ServerError? __pbn__ErrorCode;
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"error_message")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ErrorMessage
+ {
+ get { return __pbn__ErrorMessage ?? ""; }
+ set { __pbn__ErrorMessage = value; }
+ }
+ public bool ShouldSerializeErrorMessage() => __pbn__ErrorMessage != null;
+ public void ResetErrorMessage() => __pbn__ErrorMessage = null;
+ private string __pbn__ErrorMessage;
+
+ [global::ProtoBuf.ProtoMember(4)]
+ public double MsgRateOut
+ {
+ get { return __pbn__msgRateOut.GetValueOrDefault(); }
+ set { __pbn__msgRateOut = value; }
+ }
+ public bool ShouldSerializemsgRateOut() => __pbn__msgRateOut != null;
+ public void ResetmsgRateOut() => __pbn__msgRateOut = null;
+ private double? __pbn__msgRateOut;
+
+ [global::ProtoBuf.ProtoMember(5)]
+ public double MsgThroughputOut
+ {
+ get { return __pbn__msgThroughputOut.GetValueOrDefault(); }
+ set { __pbn__msgThroughputOut = value; }
+ }
+ public bool ShouldSerializemsgThroughputOut() => __pbn__msgThroughputOut != null;
+ public void ResetmsgThroughputOut() => __pbn__msgThroughputOut = null;
+ private double? __pbn__msgThroughputOut;
+
+ [global::ProtoBuf.ProtoMember(6)]
+ public double MsgRateRedeliver
+ {
+ get { return __pbn__msgRateRedeliver.GetValueOrDefault(); }
+ set { __pbn__msgRateRedeliver = value; }
+ }
+ public bool ShouldSerializemsgRateRedeliver() => __pbn__msgRateRedeliver != null;
+ public void ResetmsgRateRedeliver() => __pbn__msgRateRedeliver = null;
+ private double? __pbn__msgRateRedeliver;
+
+ [global::ProtoBuf.ProtoMember(7)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ConsumerName
+ {
+ get { return __pbn__consumerName ?? ""; }
+ set { __pbn__consumerName = value; }
+ }
+ public bool ShouldSerializeconsumerName() => __pbn__consumerName != null;
+ public void ResetconsumerName() => __pbn__consumerName = null;
+ private string __pbn__consumerName;
+
+ [global::ProtoBuf.ProtoMember(8)]
+ public ulong AvailablePermits
+ {
+ get { return __pbn__availablePermits.GetValueOrDefault(); }
+ set { __pbn__availablePermits = value; }
+ }
+ public bool ShouldSerializeavailablePermits() => __pbn__availablePermits != null;
+ public void ResetavailablePermits() => __pbn__availablePermits = null;
+ private ulong? __pbn__availablePermits;
+
+ [global::ProtoBuf.ProtoMember(9)]
+ public ulong UnackedMessages
+ {
+ get { return __pbn__unackedMessages.GetValueOrDefault(); }
+ set { __pbn__unackedMessages = value; }
+ }
+ public bool ShouldSerializeunackedMessages() => __pbn__unackedMessages != null;
+ public void ResetunackedMessages() => __pbn__unackedMessages = null;
+ private ulong? __pbn__unackedMessages;
+
+ [global::ProtoBuf.ProtoMember(10)]
+ public bool BlockedConsumerOnUnackedMsgs
+ {
+ get { return __pbn__blockedConsumerOnUnackedMsgs.GetValueOrDefault(); }
+ set { __pbn__blockedConsumerOnUnackedMsgs = value; }
+ }
+ public bool ShouldSerializeblockedConsumerOnUnackedMsgs() => __pbn__blockedConsumerOnUnackedMsgs != null;
+ public void ResetblockedConsumerOnUnackedMsgs() => __pbn__blockedConsumerOnUnackedMsgs = null;
+ private bool? __pbn__blockedConsumerOnUnackedMsgs;
+
+ [global::ProtoBuf.ProtoMember(11, Name = @"address")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string Address
+ {
+ get { return __pbn__Address ?? ""; }
+ set { __pbn__Address = value; }
+ }
+ public bool ShouldSerializeAddress() => __pbn__Address != null;
+ public void ResetAddress() => __pbn__Address = null;
+ private string __pbn__Address;
+
+ [global::ProtoBuf.ProtoMember(12)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ConnectedSince
+ {
+ get { return __pbn__connectedSince ?? ""; }
+ set { __pbn__connectedSince = value; }
+ }
+ public bool ShouldSerializeconnectedSince() => __pbn__connectedSince != null;
+ public void ResetconnectedSince() => __pbn__connectedSince = null;
+ private string __pbn__connectedSince;
+
+ [global::ProtoBuf.ProtoMember(13, Name = @"type")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string Type
+ {
+ get { return __pbn__Type ?? ""; }
+ set { __pbn__Type = value; }
+ }
+ public bool ShouldSerializeType() => __pbn__Type != null;
+ public void ResetType() => __pbn__Type = null;
+ private string __pbn__Type;
+
+ [global::ProtoBuf.ProtoMember(14)]
+ public double MsgRateExpired
+ {
+ get { return __pbn__msgRateExpired.GetValueOrDefault(); }
+ set { __pbn__msgRateExpired = value; }
+ }
+ public bool ShouldSerializemsgRateExpired() => __pbn__msgRateExpired != null;
+ public void ResetmsgRateExpired() => __pbn__msgRateExpired = null;
+ private double? __pbn__msgRateExpired;
+
+ [global::ProtoBuf.ProtoMember(15)]
+ public ulong MsgBacklog
+ {
+ get { return __pbn__msgBacklog.GetValueOrDefault(); }
+ set { __pbn__msgBacklog = value; }
+ }
+ public bool ShouldSerializemsgBacklog() => __pbn__msgBacklog != null;
+ public void ResetmsgBacklog() => __pbn__msgBacklog = null;
+ private ulong? __pbn__msgBacklog;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandGetLastMessageId : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+ public ulong ConsumerId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandGetLastMessageIdResponse : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"last_message_id", IsRequired = true)]
+ public MessageIdData LastMessageId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandGetTopicsOfNamespace : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"namespace", IsRequired = true)]
+ public string Namespace { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3)]
+ [global::System.ComponentModel.DefaultValue(Mode.Persistent)]
+ public Mode mode
+ {
+ get { return __pbn__mode ?? Mode.Persistent; }
+ set { __pbn__mode = value; }
+ }
+ public bool ShouldSerializemode() => __pbn__mode != null;
+ public void Resetmode() => __pbn__mode = null;
+ private Mode? __pbn__mode;
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum Mode
+ {
+ [global::ProtoBuf.ProtoEnum(Name = @"PERSISTENT")]
+ Persistent = 0,
+ [global::ProtoBuf.ProtoEnum(Name = @"NON_PERSISTENT")]
+ NonPersistent = 1,
+ [global::ProtoBuf.ProtoEnum(Name = @"ALL")]
+ All = 2,
+ }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandGetTopicsOfNamespaceResponse : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"topics")]
+ public global::System.Collections.Generic.List<string> Topics { get; } = new global::System.Collections.Generic.List<string>();
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandGetSchema : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"topic", IsRequired = true)]
+ public string Topic { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"schema_version")]
+ public byte[] SchemaVersion
+ {
+ get { return __pbn__SchemaVersion; }
+ set { __pbn__SchemaVersion = value; }
+ }
+ public bool ShouldSerializeSchemaVersion() => __pbn__SchemaVersion != null;
+ public void ResetSchemaVersion() => __pbn__SchemaVersion = null;
+ private byte[] __pbn__SchemaVersion;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class CommandGetSchemaResponse : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+ public ulong RequestId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"error_code")]
+ [global::System.ComponentModel.DefaultValue(ServerError.UnknownError)]
+ public ServerError ErrorCode
+ {
+ get { return __pbn__ErrorCode ?? ServerError.UnknownError; }
+ set { __pbn__ErrorCode = value; }
+ }
+ public bool ShouldSerializeErrorCode() => __pbn__ErrorCode != null;
+ public void ResetErrorCode() => __pbn__ErrorCode = null;
+ private ServerError? __pbn__ErrorCode;
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"error_message")]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string ErrorMessage
+ {
+ get { return __pbn__ErrorMessage ?? ""; }
+ set { __pbn__ErrorMessage = value; }
+ }
+ public bool ShouldSerializeErrorMessage() => __pbn__ErrorMessage != null;
+ public void ResetErrorMessage() => __pbn__ErrorMessage = null;
+ private string __pbn__ErrorMessage;
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"schema")]
+ public Schema Schema { get; set; }
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"schema_version")]
+ public byte[] SchemaVersion
+ {
+ get { return __pbn__SchemaVersion; }
+ set { __pbn__SchemaVersion = value; }
+ }
+ public bool ShouldSerializeSchemaVersion() => __pbn__SchemaVersion != null;
+ public void ResetSchemaVersion() => __pbn__SchemaVersion = null;
+ private byte[] __pbn__SchemaVersion;
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public partial class BaseCommand : global::ProtoBuf.IExtensible
+ {
+ private global::ProtoBuf.IExtension __pbn__extensionData;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true)]
+ public Type CommandType { get; set; } = Type.Connect;
+
+ [global::ProtoBuf.ProtoMember(2, Name = @"connect")]
+ public CommandConnect Connect { get; set; }
+
+ [global::ProtoBuf.ProtoMember(3, Name = @"connected")]
+ public CommandConnected Connected { get; set; }
+
+ [global::ProtoBuf.ProtoMember(4, Name = @"subscribe")]
+ public CommandSubscribe Subscribe { get; set; }
+
+ [global::ProtoBuf.ProtoMember(5, Name = @"producer")]
+ public CommandProducer Producer { get; set; }
+
+ [global::ProtoBuf.ProtoMember(6, Name = @"send")]
+ public CommandSend Send { get; set; }
+
+ [global::ProtoBuf.ProtoMember(7, Name = @"send_receipt")]
+ public CommandSendReceipt SendReceipt { get; set; }
+
+ [global::ProtoBuf.ProtoMember(8, Name = @"send_error")]
+ public CommandSendError SendError { get; set; }
+
+ [global::ProtoBuf.ProtoMember(9, Name = @"message")]
+ public CommandMessage Message { get; set; }
+
+ [global::ProtoBuf.ProtoMember(10, Name = @"ack")]
+ public CommandAck Ack { get; set; }
+
+ [global::ProtoBuf.ProtoMember(11, Name = @"flow")]
+ public CommandFlow Flow { get; set; }
+
+ [global::ProtoBuf.ProtoMember(12, Name = @"unsubscribe")]
+ public CommandUnsubscribe Unsubscribe { get; set; }
+
+ [global::ProtoBuf.ProtoMember(13, Name = @"success")]
+ public CommandSuccess Success { get; set; }
+
+ [global::ProtoBuf.ProtoMember(14, Name = @"error")]
+ public CommandError Error { get; set; }
+
+ [global::ProtoBuf.ProtoMember(15, Name = @"close_producer")]
+ public CommandCloseProducer CloseProducer { get; set; }
+
+ [global::ProtoBuf.ProtoMember(16, Name = @"close_consumer")]
+ public CommandCloseConsumer CloseConsumer { get; set; }
+
+ [global::ProtoBuf.ProtoMember(17, Name = @"producer_success")]
+ public CommandProducerSuccess ProducerSuccess { get; set; }
+
+ [global::ProtoBuf.ProtoMember(18, Name = @"ping")]
+ public CommandPing Ping { get; set; }
+
+ [global::ProtoBuf.ProtoMember(19, Name = @"pong")]
+ public CommandPong Pong { get; set; }
+
+ [global::ProtoBuf.ProtoMember(20)]
+ public CommandRedeliverUnacknowledgedMessages RedeliverUnacknowledgedMessages { get; set; }
+
+ [global::ProtoBuf.ProtoMember(21)]
+ public CommandPartitionedTopicMetadata PartitionMetadata { get; set; }
+
+ [global::ProtoBuf.ProtoMember(22)]
+ public CommandPartitionedTopicMetadataResponse PartitionMetadataResponse { get; set; }
+
+ [global::ProtoBuf.ProtoMember(23)]
+ public CommandLookupTopic LookupTopic { get; set; }
+
+ [global::ProtoBuf.ProtoMember(24)]
+ public CommandLookupTopicResponse LookupTopicResponse { get; set; }
+
+ [global::ProtoBuf.ProtoMember(25)]
+ public CommandConsumerStats ConsumerStats { get; set; }
+
+ [global::ProtoBuf.ProtoMember(26)]
+ public CommandConsumerStatsResponse ConsumerStatsResponse { get; set; }
+
+ [global::ProtoBuf.ProtoMember(27)]
+ public CommandReachedEndOfTopic ReachedEndOfTopic { get; set; }
+
+ [global::ProtoBuf.ProtoMember(28, Name = @"seek")]
+ public CommandSeek Seek { get; set; }
+
+ [global::ProtoBuf.ProtoMember(29)]
+ public CommandGetLastMessageId GetLastMessageId { get; set; }
+
+ [global::ProtoBuf.ProtoMember(30)]
+ public CommandGetLastMessageIdResponse GetLastMessageIdResponse { get; set; }
+
+ [global::ProtoBuf.ProtoMember(31, Name = @"active_consumer_change")]
+ public CommandActiveConsumerChange ActiveConsumerChange { get; set; }
+
+ [global::ProtoBuf.ProtoMember(32)]
+ public CommandGetTopicsOfNamespace GetTopicsOfNamespace { get; set; }
+
+ [global::ProtoBuf.ProtoMember(33)]
+ public CommandGetTopicsOfNamespaceResponse GetTopicsOfNamespaceResponse { get; set; }
+
+ [global::ProtoBuf.ProtoMember(34)]
+ public CommandGetSchema GetSchema { get; set; }
+
+ [global::ProtoBuf.ProtoMember(35)]
+ public CommandGetSchemaResponse GetSchemaResponse { get; set; }
+
+ [global::ProtoBuf.ProtoMember(36)]
+ public CommandAuthChallenge AuthChallenge { get; set; }
+
+ [global::ProtoBuf.ProtoMember(37)]
+ public CommandAuthResponse AuthResponse { get; set; }
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum Type
+ {
+ [global::ProtoBuf.ProtoEnum(Name = @"CONNECT")]
+ Connect = 2,
+ [global::ProtoBuf.ProtoEnum(Name = @"CONNECTED")]
+ Connected = 3,
+ [global::ProtoBuf.ProtoEnum(Name = @"SUBSCRIBE")]
+ Subscribe = 4,
+ [global::ProtoBuf.ProtoEnum(Name = @"PRODUCER")]
+ Producer = 5,
+ [global::ProtoBuf.ProtoEnum(Name = @"SEND")]
+ Send = 6,
+ [global::ProtoBuf.ProtoEnum(Name = @"SEND_RECEIPT")]
+ SendReceipt = 7,
+ [global::ProtoBuf.ProtoEnum(Name = @"SEND_ERROR")]
+ SendError = 8,
+ [global::ProtoBuf.ProtoEnum(Name = @"MESSAGE")]
+ Message = 9,
+ [global::ProtoBuf.ProtoEnum(Name = @"ACK")]
+ Ack = 10,
+ [global::ProtoBuf.ProtoEnum(Name = @"FLOW")]
+ Flow = 11,
+ [global::ProtoBuf.ProtoEnum(Name = @"UNSUBSCRIBE")]
+ Unsubscribe = 12,
+ [global::ProtoBuf.ProtoEnum(Name = @"SUCCESS")]
+ Success = 13,
+ [global::ProtoBuf.ProtoEnum(Name = @"ERROR")]
+ Error = 14,
+ [global::ProtoBuf.ProtoEnum(Name = @"CLOSE_PRODUCER")]
+ CloseProducer = 15,
+ [global::ProtoBuf.ProtoEnum(Name = @"CLOSE_CONSUMER")]
+ CloseConsumer = 16,
+ [global::ProtoBuf.ProtoEnum(Name = @"PRODUCER_SUCCESS")]
+ ProducerSuccess = 17,
+ [global::ProtoBuf.ProtoEnum(Name = @"PING")]
+ Ping = 18,
+ [global::ProtoBuf.ProtoEnum(Name = @"PONG")]
+ Pong = 19,
+ [global::ProtoBuf.ProtoEnum(Name = @"REDELIVER_UNACKNOWLEDGED_MESSAGES")]
+ RedeliverUnacknowledgedMessages = 20,
+ [global::ProtoBuf.ProtoEnum(Name = @"PARTITIONED_METADATA")]
+ PartitionedMetadata = 21,
+ [global::ProtoBuf.ProtoEnum(Name = @"PARTITIONED_METADATA_RESPONSE")]
+ PartitionedMetadataResponse = 22,
+ [global::ProtoBuf.ProtoEnum(Name = @"LOOKUP")]
+ Lookup = 23,
+ [global::ProtoBuf.ProtoEnum(Name = @"LOOKUP_RESPONSE")]
+ LookupResponse = 24,
+ [global::ProtoBuf.ProtoEnum(Name = @"CONSUMER_STATS")]
+ ConsumerStats = 25,
+ [global::ProtoBuf.ProtoEnum(Name = @"CONSUMER_STATS_RESPONSE")]
+ ConsumerStatsResponse = 26,
+ [global::ProtoBuf.ProtoEnum(Name = @"REACHED_END_OF_TOPIC")]
+ ReachedEndOfTopic = 27,
+ [global::ProtoBuf.ProtoEnum(Name = @"SEEK")]
+ Seek = 28,
+ [global::ProtoBuf.ProtoEnum(Name = @"GET_LAST_MESSAGE_ID")]
+ GetLastMessageId = 29,
+ [global::ProtoBuf.ProtoEnum(Name = @"GET_LAST_MESSAGE_ID_RESPONSE")]
+ GetLastMessageIdResponse = 30,
+ [global::ProtoBuf.ProtoEnum(Name = @"ACTIVE_CONSUMER_CHANGE")]
+ ActiveConsumerChange = 31,
+ [global::ProtoBuf.ProtoEnum(Name = @"GET_TOPICS_OF_NAMESPACE")]
+ GetTopicsOfNamespace = 32,
+ [global::ProtoBuf.ProtoEnum(Name = @"GET_TOPICS_OF_NAMESPACE_RESPONSE")]
+ GetTopicsOfNamespaceResponse = 33,
+ [global::ProtoBuf.ProtoEnum(Name = @"GET_SCHEMA")]
+ GetSchema = 34,
+ [global::ProtoBuf.ProtoEnum(Name = @"GET_SCHEMA_RESPONSE")]
+ GetSchemaResponse = 35,
+ [global::ProtoBuf.ProtoEnum(Name = @"AUTH_CHALLENGE")]
+ AuthChallenge = 36,
+ [global::ProtoBuf.ProtoEnum(Name = @"AUTH_RESPONSE")]
+ AuthResponse = 37,
+ }
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum CompressionType
+ {
+ [global::ProtoBuf.ProtoEnum(Name = @"NONE")]
+ None = 0,
+ [global::ProtoBuf.ProtoEnum(Name = @"LZ4")]
+ Lz4 = 1,
+ [global::ProtoBuf.ProtoEnum(Name = @"ZLIB")]
+ Zlib = 2,
+ [global::ProtoBuf.ProtoEnum(Name = @"ZSTD")]
+ Zstd = 3,
+ [global::ProtoBuf.ProtoEnum(Name = @"SNAPPY")]
+ Snappy = 4,
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum ServerError
+ {
+ UnknownError = 0,
+ MetadataError = 1,
+ PersistenceError = 2,
+ AuthenticationError = 3,
+ AuthorizationError = 4,
+ ConsumerBusy = 5,
+ ServiceNotReady = 6,
+ ProducerBlockedQuotaExceededError = 7,
+ ProducerBlockedQuotaExceededException = 8,
+ ChecksumError = 9,
+ UnsupportedVersionError = 10,
+ TopicNotFound = 11,
+ SubscriptionNotFound = 12,
+ ConsumerNotFound = 13,
+ TooManyRequests = 14,
+ TopicTerminatedError = 15,
+ ProducerBusy = 16,
+ InvalidTopicName = 17,
+ IncompatibleSchema = 18,
+ ConsumerAssignError = 19,
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum AuthMethod
+ {
+ AuthMethodNone = 0,
+ AuthMethodYcaV1 = 1,
+ AuthMethodAthens = 2,
+ }
+
+ [global::ProtoBuf.ProtoContract()]
+ public enum ProtocolVersion
+ {
+ [global::ProtoBuf.ProtoEnum(Name = @"v0")]
+ V0 = 0,
+ [global::ProtoBuf.ProtoEnum(Name = @"v1")]
+ V1 = 1,
+ [global::ProtoBuf.ProtoEnum(Name = @"v2")]
+ V2 = 2,
+ [global::ProtoBuf.ProtoEnum(Name = @"v3")]
+ V3 = 3,
+ [global::ProtoBuf.ProtoEnum(Name = @"v4")]
+ V4 = 4,
+ [global::ProtoBuf.ProtoEnum(Name = @"v5")]
+ V5 = 5,
+ [global::ProtoBuf.ProtoEnum(Name = @"v6")]
+ V6 = 6,
+ [global::ProtoBuf.ProtoEnum(Name = @"v7")]
+ V7 = 7,
+ [global::ProtoBuf.ProtoEnum(Name = @"v8")]
+ V8 = 8,
+ [global::ProtoBuf.ProtoEnum(Name = @"v9")]
+ V9 = 9,
+ [global::ProtoBuf.ProtoEnum(Name = @"v10")]
+ V10 = 10,
+ [global::ProtoBuf.ProtoEnum(Name = @"v11")]
+ V11 = 11,
+ [global::ProtoBuf.ProtoEnum(Name = @"v12")]
+ V12 = 12,
+ [global::ProtoBuf.ProtoEnum(Name = @"v13")]
+ V13 = 13,
+ [global::ProtoBuf.ProtoEnum(Name = @"v14")]
+ V14 = 14,
+ }
+}
+
+#pragma warning restore CS1591, CS0612, CS3021, IDE1006
\ No newline at end of file
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
new file mode 100644
index 0000000..31241e8
--- /dev/null
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -0,0 +1,35 @@
+using DotPulsar.Abstractions;
+using System;
+using System.Reflection;
+
+namespace DotPulsar.Internal
+{
+ public sealed class PulsarClientBuilder : IPulsarClientBuilder
+ {
+ private static readonly int ProtocolVersion;
+ private static readonly string ClientVersion;
+
+ static PulsarClientBuilder()
+ {
+ ProtocolVersion = 12;
+ var assemblyName = Assembly.GetCallingAssembly().GetName();
+ ClientVersion = assemblyName.Name + " " + assemblyName.Version.ToString(3);
+ }
+
+ private Uri _serviceUrl;
+
+ public PulsarClientBuilder() => _serviceUrl = new Uri("pulsar://localhost:6650");
+
+ public IPulsarClientBuilder ServiceUrl(Uri uri)
+ {
+ _serviceUrl = uri;
+ return this;
+ }
+
+ public IPulsarClient Build()
+ {
+ var connectionPool = new ConnectionPool(ProtocolVersion, ClientVersion, _serviceUrl);
+ return new PulsarClient(connectionPool);
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
new file mode 100644
index 0000000..ed18b48
--- /dev/null
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -0,0 +1,118 @@
+using DotPulsar.Internal.Extensions;
+using System;
+using System.Buffers;
+using System.IO;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class PulsarStream : IDisposable
+ {
+ private readonly Stream _stream;
+ private readonly Action<uint, ReadOnlySequence<byte>> _handler;
+ private readonly CancellationTokenSource _tokenSource;
+
+ public PulsarStream(Stream stream, Action<uint, ReadOnlySequence<byte>> handler)
+ {
+ _stream = stream;
+ _handler = handler;
+ _tokenSource = new CancellationTokenSource();
+ var pipe = new Pipe();
+ var fill = FillPipe(_stream, pipe.Writer, _tokenSource.Token);
+ var read = ReadPipe(pipe.Reader, _tokenSource.Token);
+ IsClosed = Task.WhenAny(fill, read);
+ }
+
+ public Task IsClosed { get; }
+
+ public async Task Send(ReadOnlySequence<byte> sequence)
+ {
+ try
+ {
+ foreach (var segment in sequence)
+ {
+ await _stream.WriteAsync(segment);
+ }
+ }
+ catch
+ {
+ _tokenSource.Cancel();
+ throw;
+ }
+ }
+
+ public void Dispose()
+ {
+ _tokenSource.Cancel();
+ _stream.Dispose();
+ }
+
+ private async Task FillPipe(Stream stream, PipeWriter writer, CancellationToken cancellationToken)
+ {
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var memory = writer.GetMemory(84999); // LOH - 1 byte
+ var bytesRead = await stream.ReadAsync(memory, cancellationToken);
+ if (bytesRead == 0)
+ break;
+
+ writer.Advance(bytesRead);
+
+ var result = await writer.FlushAsync(cancellationToken);
+ if (result.IsCompleted)
+ break;
+ }
+ }
+ catch
+ {
+ _tokenSource.Cancel();
+ }
+
+ writer.Complete();
+ }
+
+ private async Task ReadPipe(PipeReader reader, CancellationToken cancellationToken)
+ {
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var result = await reader.ReadAsync(cancellationToken);
+ var buffer = result.Buffer;
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ if (buffer.Length < 4)
+ break;
+
+ var frameSize = buffer.ReadUInt32(0, true);
+ var totalSize = frameSize + 4;
+ if (buffer.Length < totalSize)
+ break;
+
+ var commandSize = buffer.ReadUInt32(4, true);
+
+ _handler(commandSize, buffer.Slice(8, totalSize - 8));
+
+ buffer = buffer.Slice(totalSize);
+ }
+
+ if (result.IsCompleted)
+ break;
+
+ reader.AdvanceTo(buffer.Start);
+ }
+ }
+ catch
+ {
+ _tokenSource.Cancel();
+ }
+
+ reader.Complete();
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
new file mode 100644
index 0000000..a4427e0
--- /dev/null
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -0,0 +1,105 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class Reader : IReader
+ {
+ private readonly Executor _executor;
+ private readonly IConsumerStreamFactory _streamFactory;
+ private readonly IFaultStrategy _faultStrategy;
+ private readonly StateManager<ReaderState> _stateManager;
+ private readonly CancellationTokenSource _connectTokenSource;
+ private readonly Task _connectTask;
+ private Action _throwIfClosedOrFaulted;
+ private IConsumerStream Stream { get; set; }
+
+ public Reader(IConsumerStreamFactory streamFactory, IFaultStrategy faultStrategy)
+ {
+ _executor = new Executor(ExecutorOnException);
+ _stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
+ _streamFactory = streamFactory;
+ _faultStrategy = faultStrategy;
+ _connectTokenSource = new CancellationTokenSource();
+ Stream = new NotReadyStream();
+ _connectTask = Connect(_connectTokenSource.Token);
+ _throwIfClosedOrFaulted = () => { };
+ }
+
+ public async Task<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+ public async Task<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+ public bool IsFinalState() => _stateManager.IsFinalState();
+ public bool IsFinalState(ReaderState state) => _stateManager.IsFinalState(state);
+
+ public void Dispose()
+ {
+ _executor.Dispose();
+ _connectTokenSource.Cancel();
+ _connectTask.Wait();
+ }
+
+ public async Task<Message> Receive(CancellationToken cancellationToken) => await _executor.Execute(() => Stream.Receive(cancellationToken), cancellationToken);
+
+ private async Task ExecutorOnException(Exception exception, CancellationToken cancellationToken)
+ {
+ _throwIfClosedOrFaulted();
+
+ switch (_faultStrategy.DetermineFaultAction(exception))
+ {
+ case FaultAction.Retry:
+ await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+ break;
+ case FaultAction.Relookup:
+ await _stateManager.StateChangedTo(ReaderState.Connected, cancellationToken);
+ break;
+ case FaultAction.Fault:
+ HasFaulted(exception);
+ break;
+ }
+
+ _throwIfClosedOrFaulted();
+ }
+
+ private void HasFaulted(Exception exception)
+ {
+ _throwIfClosedOrFaulted = () => throw exception;
+ _stateManager.SetState(ReaderState.Faulted);
+ }
+
+ private void HasClosed()
+ {
+ _throwIfClosedOrFaulted = () => throw new ReaderClosedException();
+ _stateManager.SetState(ReaderState.Closed);
+ }
+
+ private async Task Connect(CancellationToken cancellationToken)
+ {
+ try
+ {
+ while (true)
+ {
+ using (var proxy = new ReaderProxy(_stateManager, new AsyncQueue<MessagePackage>()))
+ using (Stream = await _streamFactory.CreateStream(proxy, cancellationToken))
+ {
+ proxy.Active();
+ await _stateManager.StateChangedFrom(ReaderState.Connected, cancellationToken);
+ if (_stateManager.IsFinalState())
+ return;
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ HasClosed();
+ }
+ catch (Exception exception)
+ {
+ HasFaulted(exception);
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
new file mode 100644
index 0000000..85de19d
--- /dev/null
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -0,0 +1,42 @@
+using DotPulsar.Abstractions;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ReaderBuilder : IReaderBuilder
+ {
+ private readonly IPulsarClient _pulsarClient;
+ private readonly ReaderOptions _options;
+
+ public ReaderBuilder(IPulsarClient pulsarClient)
+ {
+ _pulsarClient = pulsarClient;
+ _options = new ReaderOptions();
+ }
+
+ public IReaderBuilder ReaderName(string name)
+ {
+ _options.ReaderName = name;
+ return this;
+ }
+
+ public IReaderBuilder MessagePrefetchCount(uint count)
+ {
+ _options.MessagePrefetchCount = count;
+ return this;
+ }
+
+ public IReaderBuilder StartMessageId(MessageId messageId)
+ {
+ _options.StartMessageId = messageId;
+ return this;
+ }
+
+ public IReaderBuilder Topic(string topic)
+ {
+ _options.Topic = topic;
+ return this;
+ }
+
+ public IReader Create() => _pulsarClient.CreateReader(_options);
+ }
+}
diff --git a/src/DotPulsar/Internal/ReaderProxy.cs b/src/DotPulsar/Internal/ReaderProxy.cs
new file mode 100644
index 0000000..eb93582
--- /dev/null
+++ b/src/DotPulsar/Internal/ReaderProxy.cs
@@ -0,0 +1,53 @@
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class ReaderProxy : IConsumerProxy, IDisposable
+ {
+ private readonly object _lock;
+ private readonly StateManager<ReaderState> _stateManager;
+ private readonly AsyncQueue<MessagePackage> _queue;
+ private bool _hasDisconnected;
+
+ public ReaderProxy(StateManager<ReaderState> stateManager, AsyncQueue<MessagePackage> queue)
+ {
+ _lock = new object();
+ _stateManager = stateManager;
+ _queue = queue;
+ _hasDisconnected = false;
+ }
+
+ public void Active() => SetState(ReaderState.Connected);
+ public void Inactive() => SetState(ReaderState.Connected);
+ public void ReachedEndOfTopic() => SetState(ReaderState.ReachedEndOfTopic);
+
+ public void Disconnected()
+ {
+ lock (_lock)
+ {
+ if (_hasDisconnected)
+ return;
+
+ _stateManager.SetState(ReaderState.Disconnected);
+ _hasDisconnected = true;
+ }
+ }
+
+ public void Enqueue(MessagePackage package) => _queue.Enqueue(package);
+ public async Task<MessagePackage> Dequeue(CancellationToken cancellationToken) => await _queue.Dequeue(cancellationToken);
+
+ private void SetState(ReaderState state)
+ {
+ lock (_lock)
+ {
+ if (!_hasDisconnected)
+ _stateManager.SetState(state);
+ }
+ }
+
+ public void Dispose() => _queue.Dispose();
+ }
+}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
new file mode 100644
index 0000000..8d11e95
--- /dev/null
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -0,0 +1,93 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class RequestResponseHandler : IDisposable
+ {
+ private readonly Awaitor<string, BaseCommand> _responses;
+ private ulong _requestId;
+
+ public RequestResponseHandler()
+ {
+ _responses = new Awaitor<string, BaseCommand>();
+ _requestId = 1;
+ }
+
+ public void Dispose() => _responses.Dispose();
+
+ public Task<BaseCommand> Outgoing(BaseCommand command)
+ {
+ SetRequestId(command);
+ return _responses.CreateTask(GetResponseIdentifier(command));
+ }
+
+ public void Incoming(BaseCommand command)
+ {
+ var identifier = GetResponseIdentifier(command);
+ if (identifier != null)
+ _responses.SetResult(identifier, command);
+ }
+
+ private void SetRequestId(BaseCommand cmd)
+ {
+ switch (cmd.CommandType)
+ {
+ case BaseCommand.Type.Seek:
+ cmd.Seek.RequestId = _requestId++;
+ return;
+ case BaseCommand.Type.Lookup:
+ cmd.LookupTopic.RequestId = _requestId++;
+ return;
+ case BaseCommand.Type.Error:
+ cmd.Error.RequestId = _requestId++;
+ return;
+ case BaseCommand.Type.Producer:
+ cmd.Producer.RequestId = _requestId++;
+ return;
+ case BaseCommand.Type.CloseProducer:
+ cmd.CloseProducer.RequestId = _requestId++;
+ return;
+ case BaseCommand.Type.Subscribe:
+ cmd.Subscribe.RequestId = _requestId++;
+ return;
+ case BaseCommand.Type.Unsubscribe:
+ cmd.Unsubscribe.RequestId = _requestId++;
+ return;
+ case BaseCommand.Type.CloseConsumer:
+ cmd.CloseConsumer.RequestId = _requestId++;
+ return;
+ case BaseCommand.Type.GetLastMessageId:
+ cmd.GetLastMessageId.RequestId = _requestId++;
+ return;
+ }
+ }
+
+ private static string GetResponseIdentifier(BaseCommand cmd)
+ {
+ switch (cmd.CommandType)
+ {
+ case BaseCommand.Type.Connect:
+ case BaseCommand.Type.Connected: return "Connected";
+ case BaseCommand.Type.Send: return cmd.Send.ProducerId.ToString() + '-' + cmd.Send.SequenceId.ToString();
+ case BaseCommand.Type.SendError: return cmd.SendError.ProducerId.ToString() + '-' + cmd.SendError.SequenceId.ToString();
+ case BaseCommand.Type.SendReceipt: return cmd.SendReceipt.ProducerId.ToString() + '-' + cmd.SendReceipt.SequenceId.ToString();
+ case BaseCommand.Type.Error: return cmd.Error.RequestId.ToString();
+ case BaseCommand.Type.Producer: return cmd.Producer.RequestId.ToString();
+ case BaseCommand.Type.ProducerSuccess: return cmd.ProducerSuccess.RequestId.ToString();
+ case BaseCommand.Type.CloseProducer: return cmd.CloseProducer.RequestId.ToString();
+ case BaseCommand.Type.Lookup: return cmd.LookupTopic.RequestId.ToString();
+ case BaseCommand.Type.LookupResponse: return cmd.LookupTopicResponse.RequestId.ToString();
+ case BaseCommand.Type.Unsubscribe: return cmd.Unsubscribe.RequestId.ToString();
+ case BaseCommand.Type.Subscribe: return cmd.Subscribe.RequestId.ToString();
+ case BaseCommand.Type.Success: return cmd.Success.RequestId.ToString();
+ case BaseCommand.Type.Seek: return cmd.Seek.RequestId.ToString();
+ case BaseCommand.Type.CloseConsumer: return cmd.CloseConsumer.RequestId.ToString();
+ case BaseCommand.Type.GetLastMessageId: return cmd.GetLastMessageId.RequestId.ToString();
+ case BaseCommand.Type.GetLastMessageIdResponse: return cmd.GetLastMessageIdResponse.RequestId.ToString();
+ default: throw new ArgumentOutOfRangeException("CommandType", cmd.CommandType, "CommandType not supported as request/response type");
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/SendPackage.cs b/src/DotPulsar/Internal/SendPackage.cs
new file mode 100644
index 0000000..7d9fbfd
--- /dev/null
+++ b/src/DotPulsar/Internal/SendPackage.cs
@@ -0,0 +1,14 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+
+namespace DotPulsar.Internal
+{
+ public sealed class SendPackage
+ {
+ public SendPackage(CommandSend command) => Command = command;
+
+ public CommandSend Command { get; }
+ public PulsarApi.MessageMetadata Metadata { get; set; }
+ public ReadOnlyMemory<byte> Payload { get; set; }
+ }
+}
diff --git a/src/DotPulsar/Internal/SequenceBuilder.cs b/src/DotPulsar/Internal/SequenceBuilder.cs
new file mode 100644
index 0000000..4b38e90
--- /dev/null
+++ b/src/DotPulsar/Internal/SequenceBuilder.cs
@@ -0,0 +1,63 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace DotPulsar.Internal
+{
+ public sealed class SequenceBuilder<T>
+ {
+ private readonly LinkedList<ReadOnlyMemory<T>> _elements;
+
+ public SequenceBuilder() => _elements = new LinkedList<ReadOnlyMemory<T>>();
+
+ public SequenceBuilder<T> Prepend(ReadOnlyMemory<T> memory)
+ {
+ _elements.AddFirst(memory);
+ return this;
+ }
+
+ public SequenceBuilder<T> Append(ReadOnlyMemory<T> memory)
+ {
+ _elements.AddLast(memory);
+ return this;
+ }
+
+ public long Length => _elements.Sum(e => e.Length);
+
+ public ReadOnlySequence<T> Build()
+ {
+ Segment start = null;
+ Segment current = null;
+
+ foreach (var element in _elements)
+ {
+ if (current is null)
+ {
+ current = new Segment(element);
+ start = current;
+ }
+ else
+ current = current.CreateNext(element);
+ }
+
+ return new ReadOnlySequence<T>(start, 0, current, current.Memory.Length);
+ }
+
+ private sealed class Segment : ReadOnlySequenceSegment<T>
+ {
+ public Segment(ReadOnlyMemory<T> memory, long runningIndex = 0)
+ {
+ Memory = memory;
+ RunningIndex = runningIndex;
+ }
+
+ public Segment CreateNext(ReadOnlyMemory<T> memory)
+ {
+ var segment = new Segment(memory, RunningIndex + Memory.Length);
+ Next = segment;
+ return segment;
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
new file mode 100644
index 0000000..624f37a
--- /dev/null
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -0,0 +1,16 @@
+namespace DotPulsar.Internal
+{
+ public sealed class SequenceId
+ {
+ public SequenceId(ulong initialSequenceId)
+ {
+ Current = initialSequenceId;
+ if (initialSequenceId > 0)
+ Increment();
+ }
+
+ public ulong Current { get; private set; }
+
+ public void Increment() => ++Current;
+ }
+}
diff --git a/src/DotPulsar/Internal/Serializer.cs b/src/DotPulsar/Internal/Serializer.cs
new file mode 100644
index 0000000..3c2d03c
--- /dev/null
+++ b/src/DotPulsar/Internal/Serializer.cs
@@ -0,0 +1,84 @@
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Buffers;
+using System.IO;
+
+namespace DotPulsar.Internal
+{
+ public static class Serializer
+ {
+ private static readonly byte[] MagicNumber = new byte[] { 0x0e, 0x01 };
+
+ public static T Deserialize<T>(ReadOnlySequence<byte> sequence)
+ {
+ using var ms = new MemoryStream(sequence.ToArray()); //TODO Fix this when protobuf-net start supporting sequences or .NET supports creating a stream from a sequence
+ return ProtoBuf.Serializer.Deserialize<T>(ms);
+ }
+
+ public static Message Deserialize(MessagePackage package)
+ {
+ var sequence = package.Data;
+ var magicNumberMatches = sequence.StartsWith(MagicNumber);
+ if (!magicNumberMatches)
+ throw new ChecksumException("Magic number don't match");
+ var expectedChecksum = sequence.ReadUInt32(2, true);
+ var actualChecksum = Crc32C.Calculate(sequence.Slice(6));
+ if (expectedChecksum != actualChecksum)
+ throw new ChecksumException(expectedChecksum, actualChecksum);
+ var metaSize = sequence.ReadUInt32(6, true);
+ var meta = Deserialize<PulsarApi.MessageMetadata>(sequence.Slice(10, metaSize));
+ var data = sequence.Slice(10 + metaSize);
+ return new Message(new MessageId(package.Command.MessageId), meta, data);
+ }
+
+ public static ReadOnlySequence<byte> Serialize(BaseCommand command)
+ {
+ var commandBytes = Serialize<BaseCommand>(command);
+ var commandSizeBytes = ToBigEndianBytes((uint)commandBytes.Length);
+ var totalSizeBytes = ToBigEndianBytes((uint)commandBytes.Length + 4);
+
+ return new SequenceBuilder<byte>()
+ .Append(totalSizeBytes)
+ .Append(commandSizeBytes)
+ .Append(commandBytes)
+ .Build();
+ }
+
+ public static ReadOnlySequence<byte> Serialize(BaseCommand command, PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload)
+ {
+ var commandBytes = Serialize<BaseCommand>(command);
+ var commandSizeBytes = ToBigEndianBytes((uint)commandBytes.Length);
+
+ var metadataBytes = Serialize(metadata);
+ var metadataSizeBytes = ToBigEndianBytes((uint)metadataBytes.Length);
+
+ var sb = new SequenceBuilder<byte>().Append(metadataSizeBytes).Append(metadataBytes).Append(payload);
+ var checksum = Crc32C.Calculate(sb.Build());
+
+ return sb.Prepend(ToBigEndianBytes(checksum))
+ .Prepend(MagicNumber)
+ .Prepend(commandBytes)
+ .Prepend(commandSizeBytes)
+ .Prepend(ToBigEndianBytes((uint)sb.Length))
+ .Build();
+ }
+
+ public static byte[] ToBigEndianBytes(uint integer)
+ {
+ var union = new UIntUnion(integer);
+ if (BitConverter.IsLittleEndian)
+ return new[] { union.B3, union.B2, union.B1, union.B0 };
+ else
+ return new[] { union.B0, union.B1, union.B2, union.B3 };
+ }
+
+ private static byte[] Serialize<T>(T item)
+ {
+ using var ms = new MemoryStream();
+ ProtoBuf.Serializer.Serialize(ms, item);
+ return ms.ToArray();
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/StateChanged.cs b/src/DotPulsar/Internal/StateChanged.cs
new file mode 100644
index 0000000..3690f44
--- /dev/null
+++ b/src/DotPulsar/Internal/StateChanged.cs
@@ -0,0 +1,8 @@
+namespace DotPulsar.Internal
+{
+ public enum StateChanged : byte
+ {
+ To,
+ From
+ }
+}
diff --git a/src/DotPulsar/Internal/StateManager.cs b/src/DotPulsar/Internal/StateManager.cs
new file mode 100644
index 0000000..84557df
--- /dev/null
+++ b/src/DotPulsar/Internal/StateManager.cs
@@ -0,0 +1,75 @@
+using DotPulsar.Abstractions;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class StateManager<TState> : IStateChanged<TState>
+ {
+ private readonly object _lock;
+ private readonly StateTaskCollection<TState> _stateTasks;
+ private readonly TState[] _finalStates;
+
+ public StateManager(TState initialState, params TState[] finalStates)
+ {
+ _lock = new object();
+ _stateTasks = new StateTaskCollection<TState>();
+ _finalStates = finalStates;
+ CurrentState = initialState;
+ }
+
+ public TState CurrentState { get; private set; }
+
+ public TState SetState(TState state)
+ {
+ lock (_lock)
+ {
+ if (IsFinalState(CurrentState) || CurrentState.Equals(state))
+ return CurrentState;
+
+ var formerState = CurrentState;
+ CurrentState = state;
+
+ if (IsFinalState(CurrentState))
+ _stateTasks.CompleteAllTasks(CurrentState);
+ else
+ _stateTasks.CompleteTasksAwaiting(CurrentState);
+
+ return formerState;
+ }
+ }
+
+ public Task<TState> StateChangedTo(TState state, CancellationToken cancellationToken)
+ {
+ lock (_lock)
+ {
+ if (IsFinalState(CurrentState) || CurrentState.Equals(state))
+ return Task.FromResult(CurrentState);
+ return _stateTasks.CreateTaskFor(state, StateChanged.To, cancellationToken);
+ }
+ }
+
+ public Task<TState> StateChangedFrom(TState state, CancellationToken cancellationToken)
+ {
+ lock (_lock)
+ {
+ if (IsFinalState(CurrentState) || !CurrentState.Equals(state))
+ return Task.FromResult(CurrentState);
+ return _stateTasks.CreateTaskFor(state, StateChanged.From, cancellationToken);
+ }
+ }
+
+ public bool IsFinalState(TState state)
+ {
+ for (var i = 0; i < _finalStates.Length; ++i)
+ {
+ if (_finalStates[i].Equals(state))
+ return true;
+ }
+
+ return false;
+ }
+
+ public bool IsFinalState() => IsFinalState(CurrentState);
+ }
+}
diff --git a/src/DotPulsar/Internal/StateTask.cs b/src/DotPulsar/Internal/StateTask.cs
new file mode 100644
index 0000000..aaa17be
--- /dev/null
+++ b/src/DotPulsar/Internal/StateTask.cs
@@ -0,0 +1,29 @@
+using System;
+
+namespace DotPulsar.Internal
+{
+ public sealed class StateTask<TState> : IDisposable
+ {
+ private readonly TState _state;
+ private readonly StateChanged _change;
+
+ public StateTask(TState state, StateChanged change)
+ {
+ _state = state;
+ _change = change;
+ CancelableCompletionSource = new CancelableCompletionSource<TState>();
+ }
+
+ public CancelableCompletionSource<TState> CancelableCompletionSource { get; }
+
+ public void Dispose() => CancelableCompletionSource.Dispose();
+
+ public bool IsAwaiting(TState state)
+ {
+ if (_change == StateChanged.To)
+ return _state.Equals(state);
+
+ return !_state.Equals(state);
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs b/src/DotPulsar/Internal/StateTaskCollection.cs
new file mode 100644
index 0000000..eaea664
--- /dev/null
+++ b/src/DotPulsar/Internal/StateTaskCollection.cs
@@ -0,0 +1,79 @@
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+ public sealed class StateTaskCollection<TState>
+ {
+ private readonly object _lock;
+ private readonly LinkedList<StateTask<TState>> _awaitors;
+
+ public StateTaskCollection()
+ {
+ _lock = new object();
+ _awaitors = new LinkedList<StateTask<TState>>();
+ }
+
+ public Task<TState> CreateTaskFor(TState state, StateChanged changed, CancellationToken cancellationToken)
+ {
+ LinkedListNode<StateTask<TState>> node;
+
+ lock (_lock)
+ {
+ node = _awaitors.AddFirst(new StateTask<TState>(state, changed));
+ }
+
+ node.Value.CancelableCompletionSource.SetupCancellation(() => TaskWasCanceled(node), cancellationToken);
+ return node.Value.CancelableCompletionSource.Task;
+ }
+
+ public void CompleteTasksAwaiting(TState state)
+ {
+ lock (_lock)
+ {
+ var awaitor = _awaitors.First;
+ while (awaitor != null)
+ {
+ var next = awaitor.Next;
+ if (awaitor.Value.IsAwaiting(state))
+ {
+ _awaitors.Remove(awaitor);
+ awaitor.Value.CancelableCompletionSource.SetResult(state);
+ awaitor.Value.CancelableCompletionSource.Dispose();
+ }
+ awaitor = next;
+ }
+ }
+ }
+
+ public void CompleteAllTasks(TState state)
+ {
+ lock (_lock)
+ {
+ foreach (var awaitor in _awaitors)
+ {
+ awaitor.CancelableCompletionSource.SetResult(state);
+ awaitor.CancelableCompletionSource.Dispose();
+ }
+ _awaitors.Clear();
+ }
+ }
+
+ private void TaskWasCanceled(LinkedListNode<StateTask<TState>> node)
+ {
+ lock (_lock)
+ {
+ try
+ {
+ _awaitors.Remove(node);
+ node.Value.Dispose();
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/SubscribeResponse.cs b/src/DotPulsar/Internal/SubscribeResponse.cs
new file mode 100644
index 0000000..a401525
--- /dev/null
+++ b/src/DotPulsar/Internal/SubscribeResponse.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar.Internal
+{
+ public sealed class SubscribeResponse
+ {
+ public SubscribeResponse(ulong consumerId) => ConsumerId = consumerId;
+
+ public ulong ConsumerId { get; }
+ }
+}
diff --git a/src/DotPulsar/Internal/UIntUnion.cs b/src/DotPulsar/Internal/UIntUnion.cs
new file mode 100644
index 0000000..160bddd
--- /dev/null
+++ b/src/DotPulsar/Internal/UIntUnion.cs
@@ -0,0 +1,38 @@
+using System.Runtime.InteropServices;
+
+namespace DotPulsar.Internal
+{
+ [StructLayout(LayoutKind.Explicit)]
+ public struct UIntUnion
+ {
+ public UIntUnion(byte b0, byte b1, byte b2, byte b3)
+ {
+ UInt = 0;
+ B0 = b0;
+ B1 = b1;
+ B2 = b2;
+ B3 = b3;
+ }
+
+ public UIntUnion(uint value)
+ {
+ B0 = 0;
+ B1 = 0;
+ B2 = 0;
+ B3 = 0;
+ UInt = value;
+ }
+
+ [FieldOffset(0)]
+ public byte B0;
+ [FieldOffset(1)]
+ public byte B1;
+ [FieldOffset(2)]
+ public byte B2;
+ [FieldOffset(3)]
+ public byte B3;
+
+ [FieldOffset(0)]
+ public uint UInt;
+ }
+}
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
new file mode 100644
index 0000000..eb553b5
--- /dev/null
+++ b/src/DotPulsar/Message.cs
@@ -0,0 +1,35 @@
+using System.Buffers;
+using System.Collections.Immutable;
+
+namespace DotPulsar
+{
+ public sealed class Message
+ {
+ private readonly Internal.PulsarApi.MessageMetadata _messageMetadata;
+ private ImmutableDictionary<string, string> _properties;
+
+ internal Message(MessageId messageId, Internal.PulsarApi.MessageMetadata messageMetadata, ReadOnlySequence<byte> data)
+ {
+ MessageId = messageId;
+ _messageMetadata = messageMetadata;
+ Data = data;
+ }
+
+ public MessageId MessageId { get; }
+ public ReadOnlySequence<byte> Data { get; }
+ public ulong EventTime => _messageMetadata.EventTime;
+ public string ProducerName => _messageMetadata.ProducerName;
+ public ulong PublishTime => _messageMetadata.PublishTime;
+ public ulong SequenceId => _messageMetadata.SequenceId;
+
+ public ImmutableDictionary<string, string> Properties
+ {
+ get
+ {
+ if (_properties == null)
+ _properties = _messageMetadata.Properties.ToImmutableDictionary(p => p.Key, p => p.Value);
+ return _properties;
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
new file mode 100644
index 0000000..1c163af
--- /dev/null
+++ b/src/DotPulsar/MessageId.cs
@@ -0,0 +1,46 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+
+namespace DotPulsar
+{
+ public sealed class MessageId : IEquatable<MessageId>
+ {
+ static MessageId()
+ {
+ Earliest = new MessageId(ulong.MaxValue, ulong.MaxValue, -1, -1);
+ Latest = new MessageId(long.MaxValue, long.MaxValue, -1, -1);
+ }
+
+ public static MessageId Earliest { get; }
+ public static MessageId Latest { get; }
+
+ internal MessageId(MessageIdData messageIdData) => Data = messageIdData;
+
+ public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex)
+ {
+ Data = new MessageIdData
+ {
+ LedgerId = ledgerId,
+ EntryId = entryId,
+ Partition = partition,
+ BatchIndex = batchIndex
+ };
+ }
+
+ internal MessageIdData Data { get; }
+
+ public ulong LedgerId => Data.LedgerId;
+ public ulong EntryId => Data.EntryId;
+ public int Partition => Data.Partition;
+ public int BatchIndex => Data.BatchIndex;
+
+ public override bool Equals(object o) => o is MessageId ? Equals((MessageId)o) : false;
+ public bool Equals(MessageId other) => LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
+
+ public static bool operator ==(MessageId x, MessageId y) => x.Equals(y);
+ public static bool operator !=(MessageId x, MessageId y) => !x.Equals(y);
+
+ public override int GetHashCode() => HashCode.Combine(LedgerId, EntryId, Partition, BatchIndex);
+ public override string ToString() => $"LedgerId: {LedgerId}, EntryId: {EntryId}, Partition: {Partition}, BatchIndex: {BatchIndex}";
+ }
+}
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
new file mode 100644
index 0000000..006abd2
--- /dev/null
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -0,0 +1,49 @@
+namespace DotPulsar
+{
+ public sealed class MessageMetadata
+ {
+ public MessageMetadata() => Metadata = new Internal.PulsarApi.MessageMetadata();
+
+ internal Internal.PulsarApi.MessageMetadata Metadata;
+
+ public ulong EventTime
+ {
+ get => Metadata.EventTime;
+ set => Metadata.EventTime = value;
+ }
+
+ public string this[string key]
+ {
+ get
+ {
+ for (var i = 0; i < Metadata.Properties.Count; ++i)
+ {
+ var keyValye = Metadata.Properties[i];
+ if (keyValye.Key == key)
+ return keyValye.Value;
+ }
+
+ return null;
+ }
+ set
+ {
+ for (var i = 0; i < Metadata.Properties.Count; ++i)
+ {
+ var keyValye = Metadata.Properties[i];
+ if (keyValye.Key != key)
+ continue;
+ keyValye.Value = value;
+ return;
+ }
+
+ Metadata.Properties.Add(new Internal.PulsarApi.KeyValue { Key = key, Value = value });
+ }
+ }
+
+ public ulong SequenceId
+ {
+ get => Metadata.SequenceId;
+ set => Metadata.SequenceId = value;
+ }
+ }
+}
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
new file mode 100644
index 0000000..c161b11
--- /dev/null
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar
+{
+ public sealed class ProducerOptions
+ {
+ public string ProducerName { get; set; }
+ public ulong InitialSequenceId { get; set; }
+ public string Topic { get; set; }
+ }
+}
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/ProducerState.cs
new file mode 100644
index 0000000..0326e88
--- /dev/null
+++ b/src/DotPulsar/ProducerState.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar
+{
+ public enum ProducerState : byte
+ {
+ Closed,
+ Connected,
+ Disconnected,
+ Faulted
+ }
+}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
new file mode 100644
index 0000000..7fa2219
--- /dev/null
+++ b/src/DotPulsar/PulsarClient.cs
@@ -0,0 +1,94 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal;
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Collections.Generic;
+
+namespace DotPulsar
+{
+ public sealed class PulsarClient : IPulsarClient
+ {
+ private readonly object _lock;
+ private readonly IFaultStrategy _faultStrategy;
+ private readonly LinkedList<IDisposable> _disposabels;
+ private readonly ConnectionPool _connectionPool;
+ private bool _isClosed;
+
+ internal PulsarClient(ConnectionPool connectionPool)
+ {
+ _lock = new object();
+ _faultStrategy = new FaultStrategy(3000);
+ _disposabels = new LinkedList<IDisposable>();
+ _connectionPool = connectionPool;
+ _isClosed = false;
+ }
+
+ public static IPulsarClientBuilder Builder() => new PulsarClientBuilder();
+
+ public IProducer CreateProducer(ProducerOptions options)
+ {
+ lock (_lock)
+ {
+ ThrowIfClosed();
+ var producer = new Producer(new ProducerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
+ _disposabels.AddFirst(producer);
+ producer.StateChangedTo(ProducerState.Closed, default).ContinueWith(t => Remove(producer));
+ return producer;
+ }
+ }
+
+ public IConsumer CreateConsumer(ConsumerOptions options)
+ {
+ lock (_lock)
+ {
+ ThrowIfClosed();
+ var consumer = new Consumer(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy, options.SubscriptionType != SubscriptionType.Failover);
+ _disposabels.AddFirst(consumer);
+ consumer.StateChangedTo(ConsumerState.Closed, default).ContinueWith(t => Remove(consumer));
+ return consumer;
+ }
+ }
+
+ public IReader CreateReader(ReaderOptions options)
+ {
+ lock (_lock)
+ {
+ ThrowIfClosed();
+ var reader = new Reader(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
+ _disposabels.AddFirst(reader);
+ reader.StateChangedTo(ReaderState.Closed, default).ContinueWith(t => Remove(reader));
+ return reader;
+ }
+ }
+
+ public void Dispose() //While we wait for IAsyncDisposable
+ {
+ lock (_lock)
+ {
+ ThrowIfClosed();
+ _isClosed = true;
+ foreach (var disposable in _disposabels)
+ {
+ disposable.Dispose();
+ }
+ }
+
+ _connectionPool.Dispose();
+ }
+
+ private void ThrowIfClosed()
+ {
+ if (_isClosed)
+ throw new PulsarClientClosedException();
+ }
+
+ private void Remove(IDisposable disposable)
+ {
+ lock (_lock)
+ {
+ _disposabels.Remove(disposable);
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/ReaderOptions.cs b/src/DotPulsar/ReaderOptions.cs
new file mode 100644
index 0000000..a5087d1
--- /dev/null
+++ b/src/DotPulsar/ReaderOptions.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar
+{
+ public sealed class ReaderOptions
+ {
+ public string ReaderName { get; set; }
+ public uint MessagePrefetchCount { get; set; }
+ public MessageId StartMessageId { get; set; }
+ public string Topic { get; set; }
+ }
+}
diff --git a/src/DotPulsar/ReaderState.cs b/src/DotPulsar/ReaderState.cs
new file mode 100644
index 0000000..32654e6
--- /dev/null
+++ b/src/DotPulsar/ReaderState.cs
@@ -0,0 +1,11 @@
+namespace DotPulsar
+{
+ public enum ReaderState : byte
+ {
+ Closed,
+ Connected,
+ Disconnected,
+ Faulted,
+ ReachedEndOfTopic
+ }
+}
diff --git a/src/DotPulsar/SubscriptionInitialPosition.cs b/src/DotPulsar/SubscriptionInitialPosition.cs
new file mode 100644
index 0000000..8c352d0
--- /dev/null
+++ b/src/DotPulsar/SubscriptionInitialPosition.cs
@@ -0,0 +1,8 @@
+namespace DotPulsar
+{
+ public enum SubscriptionInitialPosition : byte
+ {
+ Latest = 0,
+ Earliest = 1
+ }
+}
diff --git a/src/DotPulsar/SubscriptionType.cs b/src/DotPulsar/SubscriptionType.cs
new file mode 100644
index 0000000..aa815ce
--- /dev/null
+++ b/src/DotPulsar/SubscriptionType.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar
+{
+ public enum SubscriptionType : byte
+ {
+ Exclusive = 0,
+ Shared = 1,
+ Failover = 2
+ }
+}