Initial commit
diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..1ff0c42
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1,63 @@
+###############################################################################
+# Set default behavior to automatically normalize line endings.
+###############################################################################
+* text=auto
+
+###############################################################################
+# Set default behavior for command prompt diff.
+#
+# This is need for earlier builds of msysgit that does not have it on by
+# default for csharp files.
+# Note: This is only used by command line
+###############################################################################
+#*.cs     diff=csharp
+
+###############################################################################
+# Set the merge driver for project and solution files
+#
+# Merging from the command prompt will add diff markers to the files if there
+# are conflicts (Merging from VS is not affected by the settings below, in VS
+# the diff markers are never inserted). Diff markers may cause the following 
+# file extensions to fail to load in VS. An alternative would be to treat
+# these files as binary and thus will always conflict and require user
+# intervention with every merge. To do so, just uncomment the entries below
+###############################################################################
+#*.sln       merge=binary
+#*.csproj    merge=binary
+#*.vbproj    merge=binary
+#*.vcxproj   merge=binary
+#*.vcproj    merge=binary
+#*.dbproj    merge=binary
+#*.fsproj    merge=binary
+#*.lsproj    merge=binary
+#*.wixproj   merge=binary
+#*.modelproj merge=binary
+#*.sqlproj   merge=binary
+#*.wwaproj   merge=binary
+
+###############################################################################
+# behavior for image files
+#
+# image files are treated as binary by default.
+###############################################################################
+#*.jpg   binary
+#*.png   binary
+#*.gif   binary
+
+###############################################################################
+# diff behavior for common document formats
+# 
+# Convert binary document formats to text before diffing them. This feature
+# is only available from the command line. Turn it on by uncommenting the 
+# entries below.
+###############################################################################
+#*.doc   diff=astextplain
+#*.DOC   diff=astextplain
+#*.docx  diff=astextplain
+#*.DOCX  diff=astextplain
+#*.dot   diff=astextplain
+#*.DOT   diff=astextplain
+#*.pdf   diff=astextplain
+#*.PDF   diff=astextplain
+#*.rtf   diff=astextplain
+#*.RTF   diff=astextplain
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6e2e10d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,261 @@
+## Ignore Visual Studio temporary files, build results, and
+## files generated by popular Visual Studio add-ons.
+
+# User-specific files
+*.suo
+*.user
+*.userosscache
+*.sln.docstates
+
+# User-specific files (MonoDevelop/Xamarin Studio)
+*.userprefs
+
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+bld/
+[Bb]in/
+[Oo]bj/
+[Ll]og/
+
+# Visual Studio 2015 cache/options directory
+.vs/
+# Uncomment if you have tasks that create the project's static files in wwwroot
+wwwroot/
+
+# MSTest test Results
+[Tt]est[Rr]esult*/
+[Bb]uild[Ll]og.*
+
+# NUNIT
+*.VisualState.xml
+TestResult.xml
+
+# Build Results of an ATL Project
+[Dd]ebugPS/
+[Rr]eleasePS/
+dlldata.c
+
+# DNX
+project.lock.json
+project.fragment.lock.json
+artifacts/
+
+*_i.c
+*_p.c
+*_i.h
+*.ilk
+*.meta
+*.obj
+*.pch
+*.pdb
+*.pgc
+*.pgd
+*.rsp
+*.sbr
+*.tlb
+*.tli
+*.tlh
+*.tmp
+*.tmp_proj
+*.log
+*.vspscc
+*.vssscc
+.builds
+*.pidb
+*.svclog
+*.scc
+
+# Chutzpah Test files
+_Chutzpah*
+
+# Visual C++ cache files
+ipch/
+*.aps
+*.ncb
+*.opendb
+*.opensdf
+*.sdf
+*.cachefile
+*.VC.db
+*.VC.VC.opendb
+
+# Visual Studio profiler
+*.psess
+*.vsp
+*.vspx
+*.sap
+
+# TFS 2012 Local Workspace
+$tf/
+
+# Guidance Automation Toolkit
+*.gpState
+
+# ReSharper is a .NET coding add-in
+_ReSharper*/
+*.[Rr]e[Ss]harper
+*.DotSettings.user
+
+# JustCode is a .NET coding add-in
+.JustCode
+
+# TeamCity is a build add-in
+_TeamCity*
+
+# DotCover is a Code Coverage Tool
+*.dotCover
+
+# NCrunch
+_NCrunch_*
+.*crunch*.local.xml
+nCrunchTemp_*
+
+# MightyMoose
+*.mm.*
+AutoTest.Net/
+
+# Web workbench (sass)
+.sass-cache/
+
+# Installshield output folder
+[Ee]xpress/
+
+# DocProject is a documentation generator add-in
+DocProject/buildhelp/
+DocProject/Help/*.HxT
+DocProject/Help/*.HxC
+DocProject/Help/*.hhc
+DocProject/Help/*.hhk
+DocProject/Help/*.hhp
+DocProject/Help/Html2
+DocProject/Help/html
+
+# Click-Once directory
+publish/
+
+# Publish Web Output
+*.[Pp]ublish.xml
+*.azurePubxml
+# TODO: Comment the next line if you want to checkin your web deploy settings
+# but database connection strings (with potential passwords) will be unencrypted
+#*.pubxml
+*.publishproj
+
+# Microsoft Azure Web App publish settings. Comment the next line if you want to
+# checkin your Azure Web App publish settings, but sensitive information contained
+# in these scripts will be unencrypted
+PublishScripts/
+
+# NuGet Packages
+*.nupkg
+# The packages folder can be ignored because of Package Restore
+**/packages/*
+# except build/, which is used as an MSBuild target.
+!**/packages/build/
+# Uncomment if necessary however generally it will be regenerated when needed
+#!**/packages/repositories.config
+# NuGet v3's project.json files produces more ignoreable files
+*.nuget.props
+*.nuget.targets
+
+# Microsoft Azure Build Output
+csx/
+*.build.csdef
+
+# Microsoft Azure Emulator
+ecf/
+rcf/
+
+# Windows Store app package directories and files
+AppPackages/
+BundleArtifacts/
+Package.StoreAssociation.xml
+_pkginfo.txt
+
+# Visual Studio cache files
+# files ending in .cache can be ignored
+*.[Cc]ache
+# but keep track of directories ending in .cache
+!*.[Cc]ache/
+
+# Others
+ClientBin/
+~$*
+*~
+*.dbmdl
+*.dbproj.schemaview
+*.jfm
+*.pfx
+*.publishsettings
+node_modules/
+orleans.codegen.cs
+
+# Since there are multiple workflows, uncomment next line to ignore bower_components
+# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
+#bower_components/
+
+# RIA/Silverlight projects
+Generated_Code/
+
+# Backup & report files from converting an old project file
+# to a newer Visual Studio version. Backup files are not needed,
+# because we have git ;-)
+_UpgradeReport_Files/
+Backup*/
+UpgradeLog*.XML
+UpgradeLog*.htm
+
+# SQL Server files
+*.mdf
+*.ldf
+
+# Business Intelligence projects
+*.rdl.data
+*.bim.layout
+*.bim_*.settings
+
+# Microsoft Fakes
+FakesAssemblies/
+
+# GhostDoc plugin setting file
+*.GhostDoc.xml
+
+# Node.js Tools for Visual Studio
+.ntvs_analysis.dat
+
+# Visual Studio 6 build log
+*.plg
+
+# Visual Studio 6 workspace options file
+*.opt
+
+# Visual Studio LightSwitch build output
+**/*.HTMLClient/GeneratedArtifacts
+**/*.DesktopClient/GeneratedArtifacts
+**/*.DesktopClient/ModelManifest.xml
+**/*.Server/GeneratedArtifacts
+**/*.Server/ModelManifest.xml
+_Pvt_Extensions
+
+# Paket dependency manager
+.paket/paket.exe
+paket-files/
+
+# FAKE - F# Make
+.fake/
+
+# JetBrains Rider
+.idea/
+*.sln.iml
+
+# CodeRush
+.cr/
+
+# Python Tools for Visual Studio (PTVS)
+__pycache__/
+*.pyc
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..b3ae3d2
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright 2018 Daniel Blankensteiner
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..4bb5697
--- /dev/null
+++ b/README.md
@@ -0,0 +1,216 @@
+# DotPulsar
+
+Native .NET/C# client library for [Apache Pulsar](https://pulsar.apache.org/)
+
+## Getting Started
+
+DotPulsar is written entirely in C# and implement Apache Pulsar's [binary protocol](https://pulsar.apache.org/docs/en/develop-binary-protocol/). Other options was using the [C++ client library](https://pulsar.apache.org/docs/en/client-libraries-cpp/) (which is what the [Python client](https://pulsar.apache.org/docs/en/client-libraries-python/) and [Go client](https://pulsar.apache.org/docs/en/client-libraries-go/) do) or build on top of the [WebSocket API](https://pulsar.apache.org/docs/en/client-libraries-websocket/). We decided to implement the binary protocol in order to gain full control and maximize portability and performance.
+
+DotPulsar's API is strongly inspired by Apache Pulsar's official [Java client](https://pulsar.apache.org/docs/en/client-libraries-java/), but a 100% match is not goal.
+
+Let's see how to produce, consume and read messages.
+
+### Producing messages
+
+Producers can be created via the extension method show below, which follows the API from the Java client:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var producer = client.NewProducer().Topic("persistent://public/default/mytopic").Create();
+await producer.Send(System.Text.Encoding.UTF8.GetBytes("Hello World"));
+```
+
+If you are not a fan of extensions methods and builders, then there is another option:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var producerOptions = new ProducerOptions
+{
+    ProducerName = "MyProducer",
+    Topic = "persistent://public/default/mytopic"
+};
+var producer = client.CreateProducer(producerOptions);
+```
+
+In the above you only specify the data being sent, but you can also specify metadata:
+
+```csharp
+var data = Encoding.UTF8.GetBytes("Hello World");
+var messageId = await producer.NewMessage()
+                              .Property("SomeKey", "SomeValue") //EventTime and SequenceId can also be set
+                              .Send(data);
+```
+
+If you are not a fan of extensions methods and builders, then there is another option:
+
+```csharp
+var data = Encoding.UTF8.GetBytes("Hello World");
+var metadata = new MessageMetadata(); //EventTime and SequenceId can be set via properties
+metadata["SomeKey"] = "SomeValue";
+var messageId = await producer.Send(metadata, data));
+```
+
+### Consuming messages
+
+Consumers can be created via the extension method show below, which follows the API from the Java client:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var consumer = client.NewConsumer()
+                     .SubscriptionName("MySubscription")
+                     .Topic("persistent://public/default/mytopic")
+                     .Create();
+var message = await consumer.Receive();
+Console.WriteLine("Received Message: " + Encoding.UTF8.GetString(message.Data.ToArray()));
+await consumer.Acknowledge(message);
+```
+
+If you are not a fan of extensions methods and builders, then there is another option:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var consumerOptions = new ConsumerOptions
+{
+    SubscriptionName = "MySubscription",
+    Topic = "persistent://public/default/mytopic"
+};
+var consumer = client.CreateConsumer(consumerOptions);
+```
+
+### Reading messages
+
+Readers can be created via the extension method show below, which follows the API from the Java client:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var reader = client.NewReader()
+                   .StartMessageId(MessageId.Earliest)
+                   .Topic("persistent://public/default/mytopic")
+                   .Create();
+var message = await reader.Receive();
+Console.WriteLine("Received Message: " + Encoding.UTF8.GetString(message.Data.ToArray()));
+```
+
+If you are not a fan of extensions methods and builders, then there is another option:
+
+```csharp
+var client = PulsarClient.Builder().Build();
+var readerOptions = new ReaderOptions
+{
+    StartMessageId = MessageId.Earliest,
+    Topic = "persistent://public/default/mytopic"
+};
+var reader = client.CreateReader(readerOptions);
+```
+
+## Monitoring state
+
+Consumers, producers and readers all have states that can be monitored. Let's have a look at what states they can have.
+
+### Consumer states
+
+* Active (All is well)
+* Inactive (All is well. The subscription type is 'Failover' and you are not the active consumer)
+* Closed (The consumer or PulsarClient has been disposed)
+* Disconnected (The connection was lost and attempts are being made to reconnect)
+* Faulted (An unrecoverable error has occurred)
+* ReachedEndOfTopic (No more messages will be delivered)
+
+### Producer states
+
+* Closed (The producer or PulsarClient has been disposed)
+* Connected (All is well)
+* Disconnected (The connection was lost and attempts are being made to reconnect)
+* Faulted (An unrecoverable error has occurred)
+
+### Reader states
+
+* Closed (The reader or PulsarClient has been disposed)
+* Connected: (All is well)
+* Disconnected (The connection was lost and attempts are being made to reconnect)
+* Faulted (An unrecoverable error has occurred)
+* ReachedEndOfTopic (No more messages will be delivered)
+
+### How to
+
+Monitoring the state is easy, so let's see how to monitor when a consumer changes state:
+
+```csharp
+private static async Task MonitorConsumerState(IConsumer consumer, CancellationToken cancellationToken)
+{
+	var state = ConsumerState.Disconnected;
+
+	while (true)
+	{
+		state = await consumer.StateChangedFrom(state, cancellationToken);
+
+		switch (state)
+		{
+			case ConsumerState.Active:
+				Console.WriteLine("Consumer is active");
+				break;
+			case ConsumerState.Inactive:
+				Console.WriteLine("Consumer is inactive");
+				break;
+			case ConsumerState.Disconnected:
+				Console.WriteLine("Consumer is disconnected");
+				break;
+			case ConsumerState.Closed:
+				Console.WriteLine("Consumer has closed");
+				break;
+			case ConsumerState.ReachedEndOfTopic:
+				Console.WriteLine("Consumer has reached end of topic");
+				break;
+			case ConsumerState.Faulted:
+				Console.WriteLine("Consumer has faulted");
+				break;
+		}
+
+		if (consumer.IsFinalState(state))
+			return;
+	}
+}
+```
+
+Here the variable 'state' will contained to new state. You can both monitor going From (StateChangedFrom) and To (StateChangedTo) a state. 
+Some states are final, meaning the state can no longer change. For consumers 'Closed', 'Faulted' and 'ReachedEndOfTopic' are final states. When the consumer enter a final state, all monitoring tasks are completed. So if you e.g. are monitoring going to 'Diconnected' and the consumer is 'Closed', then you task will complete and return 'Closed'.
+
+## Built With
+
+* [protobuf-net](https://github.com/mgravell/protobuf-net) - Provides simple access to fast and efficient "Protocol Buffers" serialization from .NET applications
+* [System.IO.Pipelines](https://www.nuget.org/packages/System.IO.Pipelines/) - Single producer single consumer byte buffer management
+
+## Versioning
+
+We use [SemVer](http://semver.org/) for versioning. For the versions available, see the [tags on this repository](https://github.com/danske-commodities/dotpulsar/tags).
+
+## Authors
+
+* **Daniel Blankensteiner** - *Initial work* - [Danske Commodities](https://github.com/danske-commodities)
+
+See also the list of [contributors](https://github.com/danske-commodities/dotpulsar/contributors) who participated in this project.
+
+## License
+
+This project is licensed under the Apache License Version 2.0 - see the [LICENSE](LICENSE) file for details.
+
+## Roadmap
+
+1.0.0
+
+* Move to IAsyncDisposable and IAsyncEnumerable (will mean moving to .NET Standard 2.1)
+
+X.X.X //Future
+
+* Schemas
+* Authentication/Authorization (TLS Authentication, Athenz, Kerberos)
+* Partitioned topics
+* Topic compaction
+* Message compression (LZ4, ZLIB, ZSTD, SNAPPY)
+* Multi-topic subscriptions
+* Connection encryption
+* Message encryption
+* Batching
+* CommandConsumerStats/CommandConsumerStatsResponse
+* CommandGetTopicsOfNamespace/CommandGetTopicsOfNamespaceResponse
+* CommandPartitionedTopicMetadata/CommandPartitionedTopicMetadataResponse
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
new file mode 100644
index 0000000..13d206a
--- /dev/null
+++ b/src/Directory.Build.props
@@ -0,0 +1,7 @@
+<Project>
+
+  <PropertyGroup>
+    <LangVersion>8.0</LangVersion>
+  </PropertyGroup>
+
+</Project>
\ No newline at end of file
diff --git a/src/DotPulsar.Tests/DotPulsar.Tests.csproj b/src/DotPulsar.Tests/DotPulsar.Tests.csproj
new file mode 100644
index 0000000..34b3de6
--- /dev/null
+++ b/src/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -0,0 +1,21 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFramework>netcoreapp2.2</TargetFramework>
+    <IsPackable>false</IsPackable>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
+    <PackageReference Include="xunit" Version="2.4.1" />
+    <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
+      <PrivateAssets>all</PrivateAssets>
+      <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
+    </PackageReference>
+  </ItemGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\DotPulsar\DotPulsar.csproj" />
+  </ItemGroup>
+
+</Project>
diff --git a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
new file mode 100644
index 0000000..b5ba9b1
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -0,0 +1,137 @@
+using DotPulsar.Internal;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+    public class AsyncLockTests
+    {
+        [Fact]
+        public void Lock_GivenLockIsFree_ShouldReturnCompletedTask()
+        {
+            //Arrange
+            var sut = new AsyncLock();
+
+            //Act
+            var actual = sut.Lock();
+
+            //Assert
+            Assert.True(actual.IsCompleted);
+
+            //Annihilate 
+            actual.Result.Dispose();
+            sut.Dispose();
+        }
+
+        [Fact]
+        public async Task Lock_GivenLockIsTaken_ShouldReturnIncompletedTask()
+        {
+            //Arrange
+            var sut = new AsyncLock();
+            var alreadyTaken = await sut.Lock();
+
+            //Act
+            var actual = sut.Lock();
+
+            //Assert
+            Assert.False(actual.IsCompleted);
+
+            //Annihilate
+            alreadyTaken.Dispose();
+            actual.Result.Dispose();
+            sut.Dispose();
+        }
+
+        [Fact]
+        public async Task Lock_GivenLockIsDisposed_ShouldThrowObjectDisposedException()
+        {
+            //Arrange
+            var sut = new AsyncLock();
+            sut.Dispose();
+
+            //Act
+            var exception = await Record.ExceptionAsync(() => sut.Lock());
+
+            //Assert
+            Assert.IsType<ObjectDisposedException>(exception);
+        }
+
+        [Fact]
+        public async Task Lock_GivenLockIsDisposedWhileAwaitingLock_ShouldThrowObjectDisposedException()
+        {
+            //Arrange
+            var sut = new AsyncLock();
+            var gotLock = await sut.Lock();
+            var awaiting = sut.Lock();
+            _ = Task.Run(() => sut.Dispose());
+
+            //Act
+            var exception = await Record.ExceptionAsync(() => awaiting);
+
+            //Assert
+            Assert.IsType<ObjectDisposedException>(exception);
+
+            //Annihilate
+            sut.Dispose();
+            gotLock.Dispose();
+        }
+
+        [Fact]
+        public async Task Lock_GivenLockIsTakenAndCancellationTokenIsActivated_ShouldThrowTaskCanceledException()
+        {
+            //Arrange
+            var cts = new CancellationTokenSource();
+            var sut = new AsyncLock();
+            var gotLock = await sut.Lock();
+            var awaiting = sut.Lock(cts.Token);
+
+            //Act
+            cts.Cancel();
+            var exception = await Record.ExceptionAsync(() => awaiting);
+
+            //Assert
+            Assert.IsType<TaskCanceledException>(exception);
+
+            //Annihilate
+            cts.Dispose();
+            gotLock.Dispose();
+            sut.Dispose();
+        }
+
+        [Fact]
+        public async Task Dispose_GivenLockIsDisposedWhileItIsTaken_ShouldNotCompleteBeforeItIsReleased()
+        {
+            //Arrange
+            var sut = new AsyncLock();
+            var gotLock = await sut.Lock();
+            var disposeTask = Task.Run(() => sut.Dispose());
+            Assert.False(disposeTask.IsCompleted);
+
+            //Act
+            gotLock.Dispose();
+            await disposeTask;
+
+            //Assert
+            Assert.True(disposeTask.IsCompleted);
+
+            //Annihilate
+            sut.Dispose();
+        }
+
+        [Fact]
+        public void Dispose_WhenCalledMultipleTimes_ShouldBeSafeToDoSo()
+        {
+            //Arrange
+            var sut = new AsyncLock();
+
+            //Act
+            sut.Dispose();
+            var exception = Record.Exception(() => sut.Dispose());
+
+            //Assert
+            Assert.Null(exception);
+        }
+    }
+}
diff --git a/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs b/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs
new file mode 100644
index 0000000..4491804
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -0,0 +1,117 @@
+using DotPulsar.Internal;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+    public class AsyncQueueTests
+    {
+        [Fact]
+        public async Task Enqueue_GivenDequeueTaskWasWaiting_ShouldCompleteDequeueTask()
+        {
+            //Arrange
+            const int value = 1;
+            var queue = new AsyncQueue<int>();
+            var dequeueTask = queue.Dequeue();
+            queue.Enqueue(value);
+
+            //Act
+            var actual = await dequeueTask;
+
+            //Assert
+            Assert.Equal(value, actual);
+
+            //Annihilate
+            queue.Dispose();
+        }
+
+        [Fact]
+        public async Task DequeueAsync_GivenQueueWasNotEmpty_ShouldCompleteDequeueTask()
+        {
+            //Arrange
+            const int value = 1;
+            var queue = new AsyncQueue<int>();
+            queue.Enqueue(value);
+
+            //Act
+            var actual = await queue.Dequeue();
+
+            //Assert
+            Assert.Equal(value, actual);
+
+            //Annihilate
+            queue.Dispose();
+        }
+
+        [Fact]
+        public async Task DequeueAsync_GivenMultipleDequeues_ShouldCompleteInOrderedSequence()
+        {
+            //Arrange
+            const int value1 = 1, value2 = 2;
+            var queue = new AsyncQueue<int>();
+            var dequeue1 = queue.Dequeue();
+            var dequeue2 = queue.Dequeue();
+            queue.Enqueue(value1);
+            queue.Enqueue(value2);
+
+            //Act
+            var actual1 = await dequeue1;
+            var actual2 = await dequeue2;
+
+            //Assert
+            Assert.Equal(value1, actual1);
+            Assert.Equal(value2, actual2);
+
+            //Annihilate
+            queue.Dispose();
+        }
+
+        [Fact]
+        public async Task DequeueAsync_GivenSequenceOfInput_ShouldReturnSameSequenceOfOutput()
+        {
+            //Arrange
+            const int value1 = 1, value2 = 2;
+            var queue = new AsyncQueue<int>();
+            queue.Enqueue(value1);
+            queue.Enqueue(value2);
+
+            //Act
+            var actual1 = await queue.Dequeue();
+            var actual2 = await queue.Dequeue();
+
+            //Assert
+            Assert.Equal(value1, actual1);
+            Assert.Equal(value2, actual2);
+
+            //Annihilate
+            queue.Dispose();
+        }
+
+        [Fact]
+        public async Task DequeueAsync_GivenTokenIsCanceled_ShouldCancelTask()
+        {
+            //Arrange
+            CancellationTokenSource source1 = new CancellationTokenSource(), source2 = new CancellationTokenSource();
+            const int excepted = 1;
+            var queue = new AsyncQueue<int>();
+            var task1 = queue.Dequeue(source1.Token);
+            var task2 = queue.Dequeue(source2.Token);
+
+            //Act
+            source1.Cancel();
+            queue.Enqueue(excepted);
+            var exception = await Record.ExceptionAsync(() => task1);
+            await task2;
+
+            //Assert
+            Assert.IsType<TaskCanceledException>(exception);
+            Assert.Equal(excepted, task2.Result);
+
+            //Annihilate
+            source1.Dispose();
+            source2.Dispose();
+            queue.Dispose();
+        }
+    }
+}
diff --git a/src/DotPulsar.Tests/Internal/Crc32CTests.cs b/src/DotPulsar.Tests/Internal/Crc32CTests.cs
new file mode 100644
index 0000000..0187c29
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/Crc32CTests.cs
@@ -0,0 +1,39 @@
+using DotPulsar.Internal;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+    public class Crc32CTests
+    {
+        [Fact]
+        public void Calculate_GivenSequenceWithSingleSegment_ShouldReturnExpectedChecksum()
+        {
+            //Arrange
+            var segment = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 0x2c };
+            var sequence = new SequenceBuilder<byte>().Append(segment).Build();
+
+            //Act
+            var actual = Crc32C.Calculate(sequence);
+
+            //Assert
+            const uint expected = 2355953212;
+            Assert.Equal(expected, actual);
+        }
+
+        [Fact]
+        public void Calculate_GivenSequenceWithMultipleSegments_ShouldReturnExpectedChecksum()
+        {
+            //Arrange
+            var s1 = new byte[] { 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18, 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, 0x58, 0x01 };
+            var s2 = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 0x2c };
+            var sequence = new SequenceBuilder<byte>().Append(s1).Append(s2).Build();
+
+            //Act
+            var actual = Crc32C.Calculate(sequence);
+
+            //Assert
+            const uint expected = 1079987866;
+            Assert.Equal(expected, actual);
+        }
+    }
+}
diff --git a/src/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/src/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
new file mode 100644
index 0000000..82f92e8
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
@@ -0,0 +1,146 @@
+using DotPulsar.Internal;
+using DotPulsar.Internal.Extensions;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal.Extensions
+{
+    public class ReadOnlySequenceExtensionsTests
+    {
+        [Fact]
+        public void StartsWith_GivenToShortSequenceWithSingleSegment_ShouldReturnFalse()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00 }).Build();
+
+            //Act
+            var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
+
+            //Assert
+            Assert.False(actual);
+        }
+
+        [Fact]
+        public void StartsWith_GivenSequenceWithSingleSegment_ShouldReturnFalse()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x02, 0x01 }).Build();
+
+            //Act
+            var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
+
+            //Assert
+            Assert.False(actual);
+        }
+
+        [Fact]
+        public void StartsWith_GivenSequenceWithSingleSegment_ShouldReturnTrue()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01, 0x02 }).Build();
+
+            //Act
+            var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 });
+
+            //Assert
+            Assert.True(actual);
+        }
+
+        [Fact]
+        public void StartsWith_GivenToShortSequenceWithMultipleSegments_ShouldReturnFalse()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02 }).Build();
+
+            //Act
+            var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02, 0x03 });
+
+            //Assert
+            Assert.False(actual);
+        }
+
+        [Fact]
+        public void StartsWith_GivenSequenceWithMultipleSegments_ShouldReturnFalse()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x02 }).Append(new byte[] { 0x01, 0x03 }).Build();
+
+            //Act
+            var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02 });
+
+            //Assert
+            Assert.False(actual);
+        }
+
+        [Fact]
+        public void StartsWith_GivenSequenceWithMultipleSegments_ShouldReturnTrue()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build();
+
+            //Act
+            var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02 });
+
+            //Assert
+            Assert.True(actual);
+        }
+
+        [Fact]
+        public void ReadUInt32_GivenSequenceWithSingleSegment_ShouldGiveExceptedResult()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01, 0x02, 0x03 }).Build();
+
+            //Act
+            var actual = sequence.ReadUInt32(0, true);
+
+            //Assert
+            const uint expected = 66051;
+            Assert.Equal(expected, actual);
+        }
+
+        [Fact]
+        public void ReadUInt32_GivenSequenceWithSingleSegmentAndNonZeroStart_ShouldGiveExceptedResult()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x09, 0x00, 0x01, 0x02, 0x03 }).Build();
+
+            //Act
+            var actual = sequence.ReadUInt32(1, true);
+
+            //Assert
+            const uint expected = 66051;
+            Assert.Equal(expected, actual);
+        }
+
+        [Fact]
+        public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build();
+
+            //Act
+            var actual = sequence.ReadUInt32(0, true);
+
+            //Assert
+            const uint expected = 66051;
+            Assert.Equal(expected, actual);
+        }
+
+        [Fact]
+        public void ReadUInt32_GivenSequenceWithMultipleSegmentsAndNonZeroStart_ShouldGiveExceptedResult()
+        {
+            //Arrange
+            var sequence = new SequenceBuilder<byte>()
+                .Append(new byte[] { 0x09, 0x09, 0x09 })
+                .Append(new byte[] { 0x09, 0x00, 0x01 })
+                .Append(new byte[] { 0x02, 0x03 }).Build();
+
+            //Act
+            var actual = sequence.ReadUInt32(4, true);
+
+            //Assert
+            const uint expected = 66051;
+            Assert.Equal(expected, actual);
+        }
+    }
+}
diff --git a/src/DotPulsar.Tests/Internal/SequenceBuilderTests.cs b/src/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
new file mode 100644
index 0000000..51c0ab7
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
@@ -0,0 +1,45 @@
+using DotPulsar.Internal;
+using System.Buffers;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+    public class SequenceBuilderTests
+    {
+        [Fact]
+        public void Append_GivenMultipleInput_ShouldArrangeInCorrectOrder()
+        {
+            //Arrange
+            var a = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+            var b = new byte[] { 0x04, 0x05, 0x06, 0x07, 0x08 };
+            var c = new byte[] { 0x09 };
+            var builder = new SequenceBuilder<byte>().Append(a).Append(b).Append(c);
+
+            //Act
+            var sequence = builder.Build();
+
+            //Assert
+            var array = sequence.ToArray();
+            for (byte i = 0; i < array.Length; ++i)
+                Assert.Equal(i, array[i]);
+        }
+
+        [Fact]
+        public void Prepend_GivenMultipleInput_ShouldArrangeInCorrectOrder()
+        {
+            //Arrange
+            var a = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+            var b = new byte[] { 0x04, 0x05, 0x06, 0x07, 0x08 };
+            var c = new byte[] { 0x09 };
+            var builder = new SequenceBuilder<byte>().Prepend(c).Prepend(b).Prepend(a);
+
+            //Act
+            var sequence = builder.Build();
+
+            //Assert
+            var array = sequence.ToArray();
+            for (byte i = 0; i < array.Length; ++i)
+                Assert.Equal(i, array[i]);
+        }
+    }
+}
diff --git a/src/DotPulsar.Tests/Internal/SerializerTests.cs b/src/DotPulsar.Tests/Internal/SerializerTests.cs
new file mode 100644
index 0000000..f933a74
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/SerializerTests.cs
@@ -0,0 +1,22 @@
+using DotPulsar.Internal;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+    public class SerializerTests
+    {
+        [Fact]
+        public void ToBigEndianBytes_GivenUnsignedInteger_ShouldReturnExpectedBytes()
+        {
+            //Arrange
+            uint value = 66051;
+
+            //Act
+            var actual = Serializer.ToBigEndianBytes(value);
+
+            //Assert
+            var expected = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+            Assert.Equal(expected, actual);
+        }
+    }
+}
diff --git a/src/DotPulsar.Tests/Internal/StateManagerTests.cs b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
new file mode 100644
index 0000000..b7a6590
--- /dev/null
+++ b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -0,0 +1,231 @@
+using DotPulsar.Internal;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace DotPulsar.Tests.Internal
+{
+    public class StateManagerTests
+    {
+        [Theory]
+        [InlineData(ProducerState.Connected, ProducerState.Connected, ProducerState.Connected)]
+        [InlineData(ProducerState.Connected, ProducerState.Disconnected, ProducerState.Connected)]
+        [InlineData(ProducerState.Connected, ProducerState.Closed, ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Connected, ProducerState.Disconnected)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Disconnected, ProducerState.Disconnected)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Disconnected)]
+        [InlineData(ProducerState.Closed, ProducerState.Connected, ProducerState.Closed)]
+        [InlineData(ProducerState.Closed, ProducerState.Disconnected, ProducerState.Closed)]
+        [InlineData(ProducerState.Closed, ProducerState.Closed, ProducerState.Closed)]
+        public void SetState_GivenNewState_ShouldReturnFormerState(ProducerState initialState, ProducerState newState, ProducerState expected)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+            //Act
+            var actual = sut.SetState(newState);
+
+            //Assert
+            Assert.Equal(expected, actual);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected)]
+        [InlineData(ProducerState.Closed)]
+        public void SetState_GivenStateIsFinal_ShouldNotChangeState(ProducerState newState)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed);
+
+            //Act
+            _ = sut.SetState(newState);
+
+            //Assert
+            Assert.Equal(ProducerState.Closed, sut.CurrentState);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+        [InlineData(ProducerState.Connected, ProducerState.Closed)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
+        public void SetState_GivenStateIsChangedToWanted_ShouldCompleteTask(ProducerState initialState, ProducerState newState)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+            var task = sut.StateChangedTo(newState, default);
+
+            //Act
+            _ = sut.SetState(newState);
+
+            //Assert
+            Assert.True(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+        [InlineData(ProducerState.Connected, ProducerState.Closed)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
+        public void SetState_GivenStateIsChangedFromWanted_ShouldCompleteTask(ProducerState initialState, ProducerState newState)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+            var task = sut.StateChangedFrom(initialState, default);
+
+            //Act
+            _ = sut.SetState(newState);
+
+            //Assert
+            Assert.True(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected)]
+        [InlineData(ProducerState.Closed)]
+        public void StateChangedTo_GivenStateIsAlreadyWanted_ShouldCompleteTask(ProducerState state)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(state, ProducerState.Closed);
+
+            //Act
+            var task = sut.StateChangedTo(state, default);
+
+            //Assert
+            Assert.True(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+        [InlineData(ProducerState.Connected, ProducerState.Closed)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
+        public void StateChangedTo_GivenStateIsNotWanted_ShouldNotCompleteTask(ProducerState initialState, ProducerState wantedState)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+            //Act
+            var task = sut.StateChangedTo(wantedState, default);
+
+            //Assert
+            Assert.False(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected)]
+        public void StateChangedTo_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed);
+
+            //Act
+            var task = sut.StateChangedTo(state, default);
+
+            //Assert
+            Assert.True(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected)]
+        public void StateChangedFrom_GivenStateHasNotChanged_ShouldNotCompleteTask(ProducerState state)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(state, ProducerState.Closed);
+
+            //Act
+            var task = sut.StateChangedFrom(state, default);
+
+            //Assert
+            Assert.False(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+        [InlineData(ProducerState.Connected, ProducerState.Closed)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
+        public void StateChangedFrom_GivenStateHasChanged_ShouldCompleteTask(ProducerState initialState, ProducerState fromState)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+            //Act
+            var task = sut.StateChangedFrom(fromState, default);
+
+            //Assert
+            Assert.True(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected)]
+        [InlineData(ProducerState.Disconnected)]
+        [InlineData(ProducerState.Closed)]
+        public void StateChangedFrom_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed);
+
+            //Act
+            var task = sut.StateChangedFrom(state, default);
+
+            //Assert
+            Assert.True(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected, ProducerState.Disconnected)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
+        public void SetState_GivenStateIsChangeToFinalState_ShouldCompleteTask(ProducerState initialState, ProducerState wantedState)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+            //Act
+            var task = sut.StateChangedTo(wantedState, default);
+            _ = sut.SetState(ProducerState.Closed);
+
+            //Assert
+            Assert.True(task.IsCompleted);
+        }
+
+        [Theory]
+        [InlineData(ProducerState.Connected, ProducerState.Disconnected, ProducerState.Closed)]
+        [InlineData(ProducerState.Disconnected, ProducerState.Connected, ProducerState.Closed)]
+        public void SetState_GivenStateIsChangedToNotWanted_ShouldNotCompleteTask(ProducerState initialState, ProducerState newState, ProducerState wantedState)
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed);
+
+            //Act
+            var task = sut.StateChangedTo(wantedState, default);
+            _ = sut.SetState(newState);
+
+            //Assert
+            Assert.False(task.IsCompleted);
+        }
+
+        [Fact]
+        public async Task CancelToken_GivenTaskWasStillWaiting_ShouldCancelTask()
+        {
+            //Arrange
+            var sut = new StateManager<ProducerState>(ProducerState.Connected, ProducerState.Closed);
+            var cts = new CancellationTokenSource();
+            var task = sut.StateChangedFrom(ProducerState.Connected, cts.Token);
+
+            //Act
+            cts.Cancel();
+            var exception = await Record.ExceptionAsync(() => task);
+
+            //Assert
+            Assert.IsType<TaskCanceledException>(exception);
+
+            //Annihilate
+            cts.Dispose();
+        }
+    }
+}
diff --git a/src/DotPulsar.sln b/src/DotPulsar.sln
new file mode 100644
index 0000000..db0a03e
--- /dev/null
+++ b/src/DotPulsar.sln
@@ -0,0 +1,31 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.29209.62
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar", "DotPulsar\DotPulsar.csproj", "{8D300FC3-2E35-4D23-B0DD-FEE9E153330A}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotPulsar.Tests", "DotPulsar.Tests\DotPulsar.Tests.csproj", "{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}"
+EndProject
+Global
+	GlobalSection(SolutionConfigurationPlatforms) = preSolution
+		Debug|Any CPU = Debug|Any CPU
+		Release|Any CPU = Release|Any CPU
+	EndGlobalSection
+	GlobalSection(ProjectConfigurationPlatforms) = postSolution
+		{8D300FC3-2E35-4D23-B0DD-FEE9E153330A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{8D300FC3-2E35-4D23-B0DD-FEE9E153330A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{8D300FC3-2E35-4D23-B0DD-FEE9E153330A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{8D300FC3-2E35-4D23-B0DD-FEE9E153330A}.Release|Any CPU.Build.0 = Release|Any CPU
+		{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{B3FCD2D5-8009-4281-ACDB-6A7BC99606B4}.Release|Any CPU.Build.0 = Release|Any CPU
+	EndGlobalSection
+	GlobalSection(SolutionProperties) = preSolution
+		HideSolutionNode = FALSE
+	EndGlobalSection
+	GlobalSection(ExtensibilityGlobals) = postSolution
+		SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
+	EndGlobalSection
+EndGlobal
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
new file mode 100644
index 0000000..9cb5830
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -0,0 +1,52 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A consumer abstraction
+    /// </summary>
+    public interface IConsumer : IStateChanged<ConsumerState>, IDisposable
+    {
+        /// <summary>
+        /// Acknowledge the consumption of a single message
+        /// </summary>
+        Task Acknowledge(Message message, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Acknowledge the consumption of a single message using the MessageId
+        /// </summary>
+        Task Acknowledge(MessageId messageId, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Acknowledge the consumption of all the messages in the topic up to and including the provided message
+        /// </summary>
+        Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Acknowledge the consumption of all the messages in the topic up to and including the provided MessageId
+        /// </summary>
+        Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Get the MessageId of the last message on the topic
+        /// </summary>
+        Task<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Receives a single message
+        /// </summary>
+        Task<Message> Receive(CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Reset the subscription associated with this consumer to a specific MessageId
+        /// </summary>
+        Task Seek(MessageId messageId, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Unsubscribe the consumer
+        /// </summary>
+        Task Unsubscribe(CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
new file mode 100644
index 0000000..338b5e8
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -0,0 +1,48 @@
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A consumer building abstraction
+    /// </summary>
+    public interface IConsumerBuilder
+    {
+        /// <summary>
+        /// Set the consumer name. This is optional.
+        /// </summary>
+        IConsumerBuilder ConsumerName(string name);
+
+        /// <summary>
+        /// Set initial position for the subscription. Default is 'Latest'.
+        /// </summary>
+        IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition);
+
+        /// <summary>
+        /// Set the priority level for the shared subscription consumer. Default is 0.
+        /// </summary>
+        IConsumerBuilder PriorityLevel(int priorityLevel);
+
+        /// <summary>
+        /// Number of messages that will be prefetched. Default is 1000.
+        /// </summary>
+        IConsumerBuilder MessagePrefetchCount(uint count);
+
+        /// <summary>
+        /// Set the subscription name for this consumer. This is required.
+        /// </summary>
+        IConsumerBuilder SubscriptionName(string name);
+
+        /// <summary>
+        /// Set the subscription type for this consumer. Default is 'Exclusive'.
+        /// </summary>
+        IConsumerBuilder SubscriptionType(SubscriptionType type);
+
+        /// <summary>
+        /// Set the topic for this consumer. This is required.
+        /// </summary>
+        IConsumerBuilder Topic(string topic);
+
+        /// <summary>
+        /// Create the consumer
+        /// </summary>
+        IConsumer Create();
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IMessageBuilder.cs b/src/DotPulsar/Abstractions/IMessageBuilder.cs
new file mode 100644
index 0000000..14792ba
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IMessageBuilder.cs
@@ -0,0 +1,32 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A message building abstraction
+    /// </summary>
+    public interface IMessageBuilder
+    {
+        /// <summary>
+        /// Set the event time of the message
+        /// </summary>
+        IMessageBuilder EventTime(ulong eventTime);
+
+        /// <summary>
+        /// Add/Set a property key/value on the message
+        /// </summary>
+        IMessageBuilder Property(string key, string value);
+
+        /// <summary>
+        /// Set the sequence id of the message
+        /// </summary>
+        IMessageBuilder SequenceId(ulong sequenceId);
+
+        /// <summary>
+        /// Set the consumer name
+        /// </summary>
+        Task<MessageId> Send(ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
new file mode 100644
index 0000000..b59b75b
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -0,0 +1,22 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A producer abstraction
+    /// </summary>
+    public interface IProducer : IStateChanged<ProducerState>, IDisposable
+    {
+        /// <summary>
+        /// Sends a message
+        /// </summary>
+        Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Sends a message and metadata
+        /// </summary>
+        Task<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
new file mode 100644
index 0000000..00edff5
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -0,0 +1,28 @@
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A producer building abstraction
+    /// </summary>
+    public interface IProducerBuilder
+    {
+        /// <summary>
+        /// Set the producer name. This is optional.
+        /// </summary>
+        IProducerBuilder ProducerName(string name);
+
+        /// <summary>
+        /// Set the initial sequence id. Default is 0.
+        /// </summary>
+        IProducerBuilder InitialSequenceId(ulong initialSequenceId);
+
+        /// <summary>
+        /// Set the topic for this producer. This is required.
+        /// </summary>
+        IProducerBuilder Topic(string topic);
+
+        /// <summary>
+        /// Create the producer
+        /// </summary>
+        IProducer Create();
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
new file mode 100644
index 0000000..9e9d9c3
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -0,0 +1,25 @@
+using System;
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A pulsar client abstraction
+    /// </summary>
+    public interface IPulsarClient : IDisposable
+    {
+        /// <summary>
+        /// Create a producer
+        /// </summary>
+        IProducer CreateProducer(ProducerOptions options);
+
+        /// <summary>
+        /// Create a consumer
+        /// </summary>
+        IConsumer CreateConsumer(ConsumerOptions options);
+
+        /// <summary>
+        /// Create a reader
+        /// </summary>
+        IReader CreateReader(ReaderOptions options);
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
new file mode 100644
index 0000000..8edafd5
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -0,0 +1,20 @@
+using System;
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A pulsar client building abstraction
+    /// </summary>
+    public interface IPulsarClientBuilder
+    {
+        /// <summary>
+        /// The service URL for the Pulsar cluster
+        /// </summary>
+        IPulsarClientBuilder ServiceUrl(Uri uri);
+
+        /// <summary>
+        /// Create the client
+        /// </summary>
+        IPulsarClient Build();
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
new file mode 100644
index 0000000..70813fa
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -0,0 +1,17 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A reader abstraction
+    /// </summary>
+    public interface IReader : IStateChanged<ReaderState>, IDisposable
+    {
+        /// <summary>
+        /// Receives a single message
+        /// </summary>
+        Task<Message> Receive(CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IReaderBuilder.cs b/src/DotPulsar/Abstractions/IReaderBuilder.cs
new file mode 100644
index 0000000..3e57577
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IReaderBuilder.cs
@@ -0,0 +1,33 @@
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A reader building abstraction
+    /// </summary>
+    public interface IReaderBuilder
+    {
+        /// <summary>
+        /// Set the reader name. This is optional.
+        /// </summary>
+        IReaderBuilder ReaderName(string name);
+
+        /// <summary>
+        /// Number of messages that will be prefetched. Default is 1000.
+        /// </summary>
+        IReaderBuilder MessagePrefetchCount(uint count);
+
+        /// <summary>
+        /// The initial reader position is set to the specified message id. This is required.
+        /// </summary>
+        IReaderBuilder StartMessageId(MessageId messageId);
+
+        /// <summary>
+        /// Set the topic for this reader. This is required.
+        /// </summary>
+        IReaderBuilder Topic(string topic);
+
+        /// <summary>
+        /// Create the reader
+        /// </summary>
+        IReader Create();
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs b/src/DotPulsar/Abstractions/IStateChanged.cs
new file mode 100644
index 0000000..fec32f3
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IStateChanged.cs
@@ -0,0 +1,49 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Abstractions
+{
+    /// <summary>
+    /// A state change monitoring abstraction
+    /// </summary>
+    public interface IStateChanged<TState>
+    {
+        /// <summary>
+        /// Wait for the state to change to a specific state
+        /// </summary>
+        /// <returns>
+        /// The current state
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete
+        /// </remarks>
+        Task<TState> StateChangedTo(TState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Wait for the state to change from a specific state
+        /// </summary>
+        /// <returns>
+        /// The current state
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete
+        /// </remarks>
+        Task<TState> StateChangedFrom(TState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Ask whether the current state is final, meaning that it will never change
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not
+        /// </returns>
+        bool IsFinalState();
+
+        /// <summary>
+        /// Ask whether the provided state is final, meaning that it will never change
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not
+        /// </returns>
+        bool IsFinalState(TState state);
+    }
+}
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
new file mode 100644
index 0000000..65b7105
--- /dev/null
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -0,0 +1,20 @@
+namespace DotPulsar
+{
+    public sealed class ConsumerOptions
+    {
+        public ConsumerOptions()
+        {
+            InitialPosition = SubscriptionInitialPosition.Latest;
+            MessagePrefetchCount = 1000;
+            SubscriptionType = SubscriptionType.Exclusive;
+        }
+
+        public string ConsumerName { get; set; }
+        public SubscriptionInitialPosition InitialPosition { get; set; }
+        public int PriorityLevel { get; set; }
+        public uint MessagePrefetchCount { get; set; }
+        public string SubscriptionName { get; set; }
+        public SubscriptionType SubscriptionType { get; set; }
+        public string Topic { get; set; }
+    }
+}
diff --git a/src/DotPulsar/ConsumerState.cs b/src/DotPulsar/ConsumerState.cs
new file mode 100644
index 0000000..35dc4fd
--- /dev/null
+++ b/src/DotPulsar/ConsumerState.cs
@@ -0,0 +1,12 @@
+namespace DotPulsar
+{
+    public enum ConsumerState : byte
+    {
+        Active,
+        Closed,
+        Disconnected,
+        Faulted,
+        Inactive,
+        ReachedEndOfTopic
+    }
+}
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
new file mode 100644
index 0000000..1fbacaf
--- /dev/null
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -0,0 +1,26 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFramework>netcoreapp2.2</TargetFramework>
+    <Version>0.1.0</Version>
+    <AssemblyVersion>$(Version)</AssemblyVersion>
+    <FileVersion>$(Version)</FileVersion>
+    <PackageReleaseNotes>Alpha release</PackageReleaseNotes>
+    <Description>Native .NET/C# client library for Apache Pulsar</Description>
+    <Authors>dblank</Authors>
+    <Company>Danske Commodities A/S</Company>
+    <Copyright>$(Company)</Copyright>
+    <PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
+    <NoWarn>1701;1702;1591</NoWarn>
+  </PropertyGroup>
+
+  <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
+    <DocumentationFile>\DotPulsar.xml</DocumentationFile>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="protobuf-net" Version="2.4.0" />
+    <PackageReference Include="System.IO.Pipelines" Version="4.5.3" />
+  </ItemGroup>
+
+</Project>
diff --git a/src/DotPulsar/Exceptions/AuthenticationException.cs b/src/DotPulsar/Exceptions/AuthenticationException.cs
new file mode 100644
index 0000000..6e9e098
--- /dev/null
+++ b/src/DotPulsar/Exceptions/AuthenticationException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class AuthenticationException : DotPulsarException
+    {
+        public AuthenticationException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/AuthorizationException.cs b/src/DotPulsar/Exceptions/AuthorizationException.cs
new file mode 100644
index 0000000..72f0dbb
--- /dev/null
+++ b/src/DotPulsar/Exceptions/AuthorizationException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class AuthorizationException : DotPulsarException
+    {
+        public AuthorizationException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/ChecksumException.cs b/src/DotPulsar/Exceptions/ChecksumException.cs
new file mode 100644
index 0000000..4068590
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ChecksumException.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class ChecksumException : DotPulsarException
+    {
+        public ChecksumException(string message) : base(message) { }
+
+        public ChecksumException(uint expectedChecksum, uint actualChecksum) : base($"Checksum mismatch. Excepted {expectedChecksum} but was actually {actualChecksum}") { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/ConsumerAssignException.cs b/src/DotPulsar/Exceptions/ConsumerAssignException.cs
new file mode 100644
index 0000000..eb4c1f7
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerAssignException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class ConsumerAssignException : DotPulsarException
+    {
+        public ConsumerAssignException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/ConsumerBusyException.cs b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
new file mode 100644
index 0000000..aaa1069
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class ConsumerBusyException : DotPulsarException
+    {
+        public ConsumerBusyException(string message) : base(message) { }
+    }
+}
\ No newline at end of file
diff --git a/src/DotPulsar/Exceptions/ConsumerClosedException.cs b/src/DotPulsar/Exceptions/ConsumerClosedException.cs
new file mode 100644
index 0000000..5c714f0
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerClosedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class ConsumerClosedException : DotPulsarException
+    {
+        public ConsumerClosedException() : base("Consumer has closed") { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/DotPulsarException.cs b/src/DotPulsar/Exceptions/DotPulsarException.cs
new file mode 100644
index 0000000..a57d1a0
--- /dev/null
+++ b/src/DotPulsar/Exceptions/DotPulsarException.cs
@@ -0,0 +1,11 @@
+using System;
+
+namespace DotPulsar.Exceptions
+{
+    public abstract class DotPulsarException : Exception
+    {
+        public DotPulsarException(string message) : base(message) { }
+
+        public DotPulsarException(string message, Exception innerException) : base(message, innerException) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs b/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
new file mode 100644
index 0000000..6deb8ce
--- /dev/null
+++ b/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class IncompatibleSchemaException : DotPulsarException
+    {
+        public IncompatibleSchemaException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/InvalidTopicNameException.cs b/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
new file mode 100644
index 0000000..784fb2e
--- /dev/null
+++ b/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class InvalidTopicNameException : DotPulsarException
+    {
+        public InvalidTopicNameException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/MetadataException.cs b/src/DotPulsar/Exceptions/MetadataException.cs
new file mode 100644
index 0000000..96a65fd
--- /dev/null
+++ b/src/DotPulsar/Exceptions/MetadataException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class MetadataException : DotPulsarException
+    {
+        public MetadataException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/PersistenceException.cs b/src/DotPulsar/Exceptions/PersistenceException.cs
new file mode 100644
index 0000000..918667b
--- /dev/null
+++ b/src/DotPulsar/Exceptions/PersistenceException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class PersistenceException : DotPulsarException
+    {
+        public PersistenceException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs b/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
new file mode 100644
index 0000000..c72bc0b
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class ProducerBlockedQuotaExceededException : DotPulsarException
+    {
+        public ProducerBlockedQuotaExceededException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/ProducerBusyException.cs b/src/DotPulsar/Exceptions/ProducerBusyException.cs
new file mode 100644
index 0000000..a465ba3
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProducerBusyException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class ProducerBusyException : DotPulsarException
+    {
+        public ProducerBusyException(string message) : base(message) { }
+    }
+}
\ No newline at end of file
diff --git a/src/DotPulsar/Exceptions/ProducerClosedException.cs b/src/DotPulsar/Exceptions/ProducerClosedException.cs
new file mode 100644
index 0000000..6fa5e4f
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProducerClosedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class ProducerClosedException : DotPulsarException
+    {
+        public ProducerClosedException() : base("Producer has closed") { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/PulsarClientClosedException.cs b/src/DotPulsar/Exceptions/PulsarClientClosedException.cs
new file mode 100644
index 0000000..988a4b9
--- /dev/null
+++ b/src/DotPulsar/Exceptions/PulsarClientClosedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class PulsarClientClosedException : DotPulsarException
+    {
+        public PulsarClientClosedException() : base("Client has closed") { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/ReaderClosedException.cs b/src/DotPulsar/Exceptions/ReaderClosedException.cs
new file mode 100644
index 0000000..0d08ed8
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ReaderClosedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class ReaderClosedException : DotPulsarException
+    {
+        public ReaderClosedException() : base("Reader has closed") { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs b/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
new file mode 100644
index 0000000..da3eaba
--- /dev/null
+++ b/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class SubscriptionNotFoundException : DotPulsarException
+    {
+        public SubscriptionNotFoundException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/TopicNotFoundException.cs b/src/DotPulsar/Exceptions/TopicNotFoundException.cs
new file mode 100644
index 0000000..d10a36c
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TopicNotFoundException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class TopicNotFoundException : DotPulsarException
+    {
+        public TopicNotFoundException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/TopicTerminatedException.cs b/src/DotPulsar/Exceptions/TopicTerminatedException.cs
new file mode 100644
index 0000000..2222040
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TopicTerminatedException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class TopicTerminatedException : DotPulsarException
+    {
+        public TopicTerminatedException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/UnknownException.cs b/src/DotPulsar/Exceptions/UnknownException.cs
new file mode 100644
index 0000000..bd2772c
--- /dev/null
+++ b/src/DotPulsar/Exceptions/UnknownException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class UnknownException : DotPulsarException
+    {
+        public UnknownException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/UnsupportedVersionException.cs b/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
new file mode 100644
index 0000000..cdcb833
--- /dev/null
+++ b/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Exceptions
+{
+    public sealed class UnsupportedVersionException : DotPulsarException
+    {
+        public UnsupportedVersionException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
new file mode 100644
index 0000000..c482522
--- /dev/null
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -0,0 +1,10 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Internal;
+
+namespace DotPulsar.Extensions
+{
+    public static class ProducerExtensions
+    {
+        public static IMessageBuilder NewMessage(this IProducer producer) => new MessageBuilder(producer);
+    }
+}
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
new file mode 100644
index 0000000..025f207
--- /dev/null
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -0,0 +1,12 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Internal;
+
+namespace DotPulsar.Extensions
+{
+    public static class PulsarClientExtensions
+    {
+        public static IProducerBuilder NewProducer(this IPulsarClient pulsarClient) => new ProducerBuilder(pulsarClient);
+        public static IConsumerBuilder NewConsumer(this IPulsarClient pulsarClient) => new ConsumerBuilder(pulsarClient);
+        public static IReaderBuilder NewReader(this IPulsarClient pulsarClient) => new ReaderBuilder(pulsarClient);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerProxy.cs b/src/DotPulsar/Internal/Abstractions/IConsumerProxy.cs
new file mode 100644
index 0000000..4025088
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerProxy.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IConsumerProxy : IEnqueue<MessagePackage>, IDequeue<MessagePackage>
+    {
+        void Active();
+        void Inactive();
+        void Disconnected();
+        void ReachedEndOfTopic();
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerStream.cs b/src/DotPulsar/Internal/Abstractions/IConsumerStream.cs
new file mode 100644
index 0000000..f3443ed
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerStream.cs
@@ -0,0 +1,16 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IConsumerStream : IDisposable
+    {
+        Task Send(CommandAck command);
+        Task<CommandSuccess> Send(CommandUnsubscribe command);
+        Task<CommandSuccess> Send(CommandSeek command);
+        Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command);
+        Task<Message> Receive(CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerStreamFactory.cs b/src/DotPulsar/Internal/Abstractions/IConsumerStreamFactory.cs
new file mode 100644
index 0000000..b8767ff
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerStreamFactory.cs
@@ -0,0 +1,10 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IConsumerStreamFactory
+    {
+        Task<IConsumerStream> CreateStream(IConsumerProxy proxy, CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IDequeue.cs b/src/DotPulsar/Internal/Abstractions/IDequeue.cs
new file mode 100644
index 0000000..a456fed
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IDequeue.cs
@@ -0,0 +1,10 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IDequeue<T>
+    {
+        Task<T> Dequeue(CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IEnqueue.cs b/src/DotPulsar/Internal/Abstractions/IEnqueue.cs
new file mode 100644
index 0000000..100485d
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IEnqueue.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IEnqueue<T>
+    {
+        void Enqueue(T item);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IFaultStrategy.cs b/src/DotPulsar/Internal/Abstractions/IFaultStrategy.cs
new file mode 100644
index 0000000..b3e141e
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IFaultStrategy.cs
@@ -0,0 +1,10 @@
+using System;
+
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IFaultStrategy
+    {
+        FaultAction DetermineFaultAction(Exception exception);
+        TimeSpan TimeToWait { get; }
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerProxy.cs b/src/DotPulsar/Internal/Abstractions/IProducerProxy.cs
new file mode 100644
index 0000000..e6823f4
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IProducerProxy.cs
@@ -0,0 +1,7 @@
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IProducerProxy
+    {
+        void Disconnected();
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerStream.cs b/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
new file mode 100644
index 0000000..a9e94c2
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
@@ -0,0 +1,11 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IProducerStream : IDisposable
+    {
+        Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerStreamFactory.cs b/src/DotPulsar/Internal/Abstractions/IProducerStreamFactory.cs
new file mode 100644
index 0000000..d0a5f06
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IProducerStreamFactory.cs
@@ -0,0 +1,10 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal.Abstractions
+{
+    public interface IProducerStreamFactory
+    {
+        Task<IProducerStream> CreateStream(IProducerProxy proxy, CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
new file mode 100644
index 0000000..b7d0b4f
--- /dev/null
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -0,0 +1,116 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class AsyncLock : IDisposable
+    {
+        private readonly LinkedList<CancelableCompletionSource<IDisposable>> _pending;
+        private readonly SemaphoreSlim _semaphoreSlim;
+        private readonly Releaser _releaser;
+        private readonly Task<IDisposable> _completedTask;
+        private bool _isDisposed;
+
+        public AsyncLock()
+        {
+            _pending = new LinkedList<CancelableCompletionSource<IDisposable>>();
+            _semaphoreSlim = new SemaphoreSlim(1, 1);
+            _releaser = new Releaser(Release);
+            _completedTask = Task.FromResult((IDisposable)_releaser);
+            _isDisposed = false;
+        }
+
+        public Task<IDisposable> Lock(CancellationToken cancellationToken = default)
+        {
+            LinkedListNode<CancelableCompletionSource<IDisposable>> node = null;
+
+            lock (_pending)
+            {
+                if (_isDisposed)
+                    throw new ObjectDisposedException(nameof(AsyncLock));
+
+                if (_semaphoreSlim.CurrentCount == 1) //Lock is free
+                {
+                    _semaphoreSlim.Wait(); //Will never block
+                    return _completedTask;
+                }
+
+                //Lock was not free
+                var ccs = new CancelableCompletionSource<IDisposable>();
+                node = _pending.AddLast(ccs);
+            }
+
+            cancellationToken.Register(() => Cancel(node));
+
+            return node.Value.Task;
+        }
+
+        public void Dispose()
+        {
+            lock (_pending)
+            {
+                if (_isDisposed)
+                    return;
+
+                _isDisposed = true;
+
+                foreach (var pending in _pending)
+                {
+                    pending.SetException(new ObjectDisposedException(nameof(AsyncLock)));
+                    pending.Dispose();
+                }
+
+                _pending.Clear();
+            }
+
+            _semaphoreSlim.Wait(); //Wait for possible lock-holder to finish
+            _semaphoreSlim.Release();
+            _semaphoreSlim.Dispose();
+        }
+
+        private void Cancel(LinkedListNode<CancelableCompletionSource<IDisposable>> node)
+        {
+            lock (_pending)
+            {
+                try
+                {
+                    _pending.Remove(node);
+                    node.Value.Dispose();
+                }
+                catch
+                {
+                    // Ignore
+                }
+            }
+        }
+
+        private void Release()
+        {
+            lock (_pending)
+            {
+                if (_pending.Count > 0)
+                {
+                    var node = _pending.First;
+                    node.Value.SetResult(_releaser);
+                    node.Value.Dispose();
+                    _pending.RemoveFirst();
+                    return;
+                }
+
+                if (_semaphoreSlim.CurrentCount == 0)
+                    _semaphoreSlim.Release();
+            }
+        }
+
+        private class Releaser : IDisposable
+        {
+            private readonly Action _release;
+
+            public Releaser(Action release) => _release = release;
+
+            public void Dispose() => _release();
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs
new file mode 100644
index 0000000..4dbf084
--- /dev/null
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -0,0 +1,78 @@
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable
+    {
+        private readonly object _lock;
+        private readonly Queue<T> _queue;
+        private readonly LinkedList<CancelableCompletionSource<T>> _pendingDequeues;
+
+        public AsyncQueue()
+        {
+            _lock = new object();
+            _queue = new Queue<T>();
+            _pendingDequeues = new LinkedList<CancelableCompletionSource<T>>();
+        }
+
+        public void Enqueue(T item)
+        {
+            lock (_lock)
+            {
+                if (_pendingDequeues.Count > 0)
+                {
+                    var tcs = _pendingDequeues.First;
+                    _pendingDequeues.RemoveFirst();
+                    tcs.Value.SetResult(item);
+                }
+                else
+                    _queue.Enqueue(item);
+            }
+        }
+
+        public Task<T> Dequeue(CancellationToken cancellationToken = default)
+        {
+            LinkedListNode<CancelableCompletionSource<T>> node = null;
+
+            lock (_lock)
+            {
+                if (_queue.Count > 0)
+                    return Task.FromResult(_queue.Dequeue());
+
+                node = _pendingDequeues.AddLast(new CancelableCompletionSource<T>());
+            }
+
+            node.Value.SetupCancellation(() => Cancel(node), cancellationToken);
+            return node.Value.Task;
+        }
+
+        public void Dispose()
+        {
+            lock (_lock)
+            {
+                foreach (var pendingDequeue in _pendingDequeues)
+                    pendingDequeue.Dispose();
+
+                _pendingDequeues.Clear();
+                _queue.Clear();
+            }
+        }
+
+        private void Cancel(LinkedListNode<CancelableCompletionSource<T>> node)
+        {
+            lock (_lock)
+            {
+                try
+                {
+                    node.Value.Dispose();
+                    _pendingDequeues.Remove(node);
+                }
+                catch { }
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Awaitor.cs b/src/DotPulsar/Internal/Awaitor.cs
new file mode 100644
index 0000000..83e1143
--- /dev/null
+++ b/src/DotPulsar/Internal/Awaitor.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class Awaitor<T, Result> : IDisposable
+    {
+        private readonly Dictionary<T, TaskCompletionSource<Result>> _items;
+
+        public Awaitor() => _items = new Dictionary<T, TaskCompletionSource<Result>>();
+
+        public Task<Result> CreateTask(T item)
+        {
+            var tcs = new TaskCompletionSource<Result>(TaskCreationOptions.RunContinuationsAsynchronously);
+            _items.Add(item, tcs);
+            return tcs.Task;
+        }
+
+        public void SetResult(T item, Result result)
+        {
+            _items.Remove(item, out var tcs);
+            tcs.SetResult(result);
+        }
+
+        public void Dispose()
+        {
+            foreach (var item in _items.Values)
+            {
+                item.SetCanceled();
+            }
+
+            _items.Clear();
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/CancelableCompletionSource.cs b/src/DotPulsar/Internal/CancelableCompletionSource.cs
new file mode 100644
index 0000000..ebf87d4
--- /dev/null
+++ b/src/DotPulsar/Internal/CancelableCompletionSource.cs
@@ -0,0 +1,28 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class CancelableCompletionSource<T> : IDisposable
+    {
+        private readonly TaskCompletionSource<T> _source;
+        private CancellationTokenRegistration? _registration;
+
+        public CancelableCompletionSource() => _source = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+        public void SetupCancellation(Action callback, CancellationToken token) => _registration = token.Register(() => callback());
+
+        public void SetResult(T result) => _ = _source.TrySetResult(result);
+
+        public void SetException(Exception exception) => _ = _source.TrySetException(exception);
+
+        public Task<T> Task => _source.Task;
+
+        public void Dispose()
+        {
+            _ = _source.TrySetCanceled();
+            _registration?.Dispose();
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
new file mode 100644
index 0000000..b5eec21
--- /dev/null
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -0,0 +1,191 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Buffers;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class Connection : IDisposable
+    {
+        private readonly AsyncLock _lock;
+        private readonly ProducerManager _producerManager;
+        private readonly ConsumerManager _consumerManager;
+        private readonly RequestResponseHandler _requestResponseHandler;
+        private readonly PingPongHandler _pingPongHandler;
+        private readonly PulsarStream _stream;
+
+        public Connection(Stream stream)
+        {
+            _lock = new AsyncLock();
+            _producerManager = new ProducerManager();
+            _consumerManager = new ConsumerManager();
+            _requestResponseHandler = new RequestResponseHandler();
+            _pingPongHandler = new PingPongHandler(this);
+            _stream = new PulsarStream(stream, HandleCommand);
+        }
+
+        public Task IsClosed => _stream.IsClosed;
+
+        public async Task<bool> IsActive()
+        {
+            using (await _lock.Lock())
+            {
+                return _producerManager.HasProducers || _consumerManager.HasConsumers;
+            }
+        }
+
+        public async Task<ProducerResponse> Send(CommandProducer command, IProducerProxy proxy)
+        {
+            Task<BaseCommand> responseTask = null;
+            using (await _lock.Lock())
+            {
+                _producerManager.Outgoing(command, proxy);
+                var baseCommand = command.AsBaseCommand();
+                responseTask = _requestResponseHandler.Outgoing(baseCommand);
+                var sequence = Serializer.Serialize(baseCommand);
+                await _stream.Send(sequence);
+            }
+
+            var response = await responseTask;
+            if (response.CommandType == BaseCommand.Type.Error)
+            {
+                _producerManager.Remove(command.ProducerId);
+                response.Error.Throw();
+            }
+
+            return new ProducerResponse(command.ProducerId, response.ProducerSuccess.ProducerName);
+        }
+
+        public async Task<SubscribeResponse> Send(CommandSubscribe command, IConsumerProxy proxy)
+        {
+            Task<BaseCommand> responseTask = null;
+            using (await _lock.Lock())
+            {
+                _consumerManager.Outgoing(command, proxy);
+                var baseCommand = command.AsBaseCommand();
+                responseTask = _requestResponseHandler.Outgoing(baseCommand);
+                var sequence = Serializer.Serialize(baseCommand);
+                await _stream.Send(sequence);
+            }
+
+            var response = await responseTask;
+            if (response.CommandType == BaseCommand.Type.Error)
+            {
+                _consumerManager.Remove(command.ConsumerId);
+                response.Error.Throw();
+            }
+
+            return new SubscribeResponse(command.ConsumerId);
+        }
+
+        public async Task Send(CommandPing command) => await Send(command.AsBaseCommand());
+        public async Task Send(CommandPong command) => await Send(command.AsBaseCommand());
+        public async Task Send(CommandAck command) => await Send(command.AsBaseCommand());
+        public async Task Send(CommandFlow command) => await Send(command.AsBaseCommand());
+
+        public async Task<BaseCommand> Send(CommandUnsubscribe command)
+        {
+            var response = await SendRequestResponse(command.AsBaseCommand());
+            if (response.CommandType == BaseCommand.Type.Success)
+                _consumerManager.Remove(command.ConsumerId);
+            return response;
+        }
+
+        public async Task<BaseCommand> Send(CommandConnect command) => await SendRequestResponse(command.AsBaseCommand());
+        public async Task<BaseCommand> Send(CommandLookupTopic command) => await SendRequestResponse(command.AsBaseCommand());
+        public async Task<BaseCommand> Send(CommandSeek command) => await SendRequestResponse(command.AsBaseCommand());
+        public async Task<BaseCommand> Send(CommandGetLastMessageId command) => await SendRequestResponse(command.AsBaseCommand());
+
+        public async Task<BaseCommand> Send(CommandCloseProducer command)
+        {
+            var response = await SendRequestResponse(command.AsBaseCommand());
+            if (response.CommandType == BaseCommand.Type.Success)
+                _producerManager.Remove(command.ProducerId);
+            return response;
+        }
+
+        public async Task<BaseCommand> Send(CommandCloseConsumer command)
+        {
+            var response = await SendRequestResponse(command.AsBaseCommand());
+            if (response.CommandType == BaseCommand.Type.Success)
+                _consumerManager.Remove(command.ConsumerId);
+            return response;
+        }
+
+        public async Task<BaseCommand> Send(SendPackage command)
+        {
+            Task<BaseCommand> response = null;
+            using (await _lock.Lock())
+            {
+                var baseCommand = command.Command.AsBaseCommand();
+                response = _requestResponseHandler.Outgoing(baseCommand);
+                var sequence = Serializer.Serialize(baseCommand, command.Metadata, command.Payload);
+                await _stream.Send(sequence);
+            }
+            return await response;
+        }
+
+        private async Task<BaseCommand> SendRequestResponse(BaseCommand command)
+        {
+            Task<BaseCommand> response = null;
+            using (await _lock.Lock())
+            {
+                response = _requestResponseHandler.Outgoing(command);
+                var sequence = Serializer.Serialize(command);
+                await _stream.Send(sequence);
+            }
+            return await response;
+        }
+
+        private async Task Send(BaseCommand command)
+        {
+            using (await _lock.Lock())
+            {
+                var sequence = Serializer.Serialize(command);
+                await _stream.Send(sequence);
+            }
+        }
+
+        private void HandleCommand(uint commandSize, ReadOnlySequence<byte> sequence)
+        {
+            var command = Serializer.Deserialize<BaseCommand>(sequence.Slice(0, commandSize));
+
+            switch (command.CommandType)
+            {
+                case BaseCommand.Type.Message:
+                    _consumerManager.Incoming(new MessagePackage(command.Message, sequence.Slice(commandSize)));
+                    return;
+                case BaseCommand.Type.CloseConsumer:
+                    _consumerManager.Incoming(command.CloseConsumer);
+                    return;
+                case BaseCommand.Type.ActiveConsumerChange:
+                    _consumerManager.Incoming(command.ActiveConsumerChange);
+                    return;
+                case BaseCommand.Type.ReachedEndOfTopic:
+                    _consumerManager.Incoming(command.ReachedEndOfTopic);
+                    return;
+                case BaseCommand.Type.CloseProducer:
+                    _producerManager.Incoming(command.CloseProducer);
+                    return;
+                case BaseCommand.Type.Ping:
+                    _pingPongHandler.Incoming(command.Ping);
+                    return;
+                default:
+                    _requestResponseHandler.Incoming(command);
+                    return;
+            }
+        }
+
+        public void Dispose()
+        {
+            _lock.Dispose();
+            _consumerManager.Dispose();
+            _producerManager.Dispose();
+            _requestResponseHandler.Dispose();
+            _stream.Dispose();
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
new file mode 100644
index 0000000..de3ac0b
--- /dev/null
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -0,0 +1,143 @@
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Collections.Concurrent;
+using System.ComponentModel;
+using System.Linq;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ConnectionPool : IDisposable
+    {
+        private readonly AsyncLock _lock;
+        private readonly int _protocolVersion;
+        private readonly string _clientVersion;
+        private readonly Uri _serviceUrl;
+        private readonly ConcurrentDictionary<Uri, Connection> _connections;
+        private readonly CancellationTokenSource _cancellationTokenSource;
+        private readonly Task _closeInactiveConnections;
+
+        public ConnectionPool(int protocolVersion, string clientVersion, Uri serviceUrl)
+        {
+            _lock = new AsyncLock();
+            _protocolVersion = protocolVersion;
+            _clientVersion = clientVersion;
+            _serviceUrl = serviceUrl;
+            _connections = new ConcurrentDictionary<Uri, Connection>();
+            _cancellationTokenSource = new CancellationTokenSource();
+            _closeInactiveConnections = CloseInactiveConnections(TimeSpan.FromSeconds(60), _cancellationTokenSource.Token);
+        }
+
+        public void Dispose() //While we wait for IAsyncDisposable
+        {
+            _cancellationTokenSource.Cancel();
+            _closeInactiveConnections.Wait();
+
+            _lock.Dispose();
+
+            foreach (var serviceUrl in _connections.Keys.ToArray())
+            {
+                Deregister(serviceUrl);
+            }
+        }
+
+        public async Task<Connection> FindConnectionForTopic(string topic, CancellationToken cancellationToken)
+        {
+            var connection = await CreateConnection(_serviceUrl, cancellationToken);
+
+            var authoritative = false;
+
+            while (true)
+            {
+                var lookup = new CommandLookupTopic
+                {
+                    Topic = topic,
+                    Authoritative = authoritative
+                };
+                var response = await connection.Send(lookup);
+                response.Expect(BaseCommand.Type.LookupResponse);
+
+                switch (response.LookupTopicResponse.Response)
+                {
+                    case CommandLookupTopicResponse.LookupType.Connect:
+                        return connection;
+                    case CommandLookupTopicResponse.LookupType.Redirect:
+                        authoritative = response.LookupTopicResponse.Authoritative;
+                        connection = await CreateConnection(new Uri(response.LookupTopicResponse.BrokerServiceUrl), cancellationToken);
+                        continue;
+                    case CommandLookupTopicResponse.LookupType.Failed:
+                        response.LookupTopicResponse.Throw();
+                        break;
+                    default:
+                        throw new InvalidEnumArgumentException("LookupType", (int)response.LookupTopicResponse.Response, typeof(CommandLookupTopicResponse.LookupType));
+                }
+            }
+        }
+
+        private async Task<Connection> CreateConnection(Uri serviceUrl, CancellationToken cancellationToken)
+        {
+            using (await _lock.Lock(cancellationToken))
+            {
+                if (_connections.TryGetValue(serviceUrl, out Connection connection))
+                    return connection;
+
+                var tcpClient = new TcpClient();
+                await tcpClient.ConnectAsync(serviceUrl.Host, serviceUrl.Port);
+
+                connection = new Connection(tcpClient.GetStream());
+                Register(serviceUrl, connection);
+
+                var connect = new CommandConnect
+                {
+                    ProtocolVersion = _protocolVersion,
+                    ClientVersion = _clientVersion
+                };
+
+                var response = await connection.Send(connect);
+                response.Expect(BaseCommand.Type.Connected);
+
+                return connection;
+            }
+        }
+
+        private void Register(Uri serviceUrl, Connection connection)
+        {
+            _connections[serviceUrl] = connection;
+            connection.IsClosed.ContinueWith(t => Deregister(serviceUrl));
+        }
+
+        private void Deregister(Uri serviceUrl)
+        {
+            if (_connections.TryRemove(serviceUrl, out Connection connection))
+                connection.Dispose();
+        }
+
+        private async Task CloseInactiveConnections(TimeSpan interval, CancellationToken cancellationToken)
+        {
+            while (!cancellationToken.IsCancellationRequested)
+            {
+                try
+                {
+                    await Task.Delay(interval, cancellationToken);
+
+                    using (await _lock.Lock(cancellationToken))
+                    {
+                        var serviceUrls = _connections.Keys;
+                        foreach (var serviceUrl in serviceUrls)
+                        {
+                            var connection = _connections[serviceUrl];
+                            if (connection == null)
+                                continue;
+                            if (!await connection.IsActive())
+                                Deregister(serviceUrl);
+                        }
+                    }
+                }
+                catch { }
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
new file mode 100644
index 0000000..3317313
--- /dev/null
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -0,0 +1,156 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class Consumer : IConsumer
+    {
+        private readonly Executor _executor;
+        private readonly CommandAck _cachedCommandAck;
+        private readonly IConsumerStreamFactory _streamFactory;
+        private readonly IFaultStrategy _faultStrategy;
+        private readonly bool _setProxyState;
+        private readonly StateManager<ConsumerState> _stateManager;
+        private readonly CancellationTokenSource _connectTokenSource;
+        private readonly Task _connectTask;
+        private Action _throwIfClosedOrFaulted;
+        private IConsumerStream Stream { get; set; }
+
+        public Consumer(IConsumerStreamFactory streamFactory, IFaultStrategy faultStrategy, bool setProxyState)
+        {
+            _executor = new Executor(ExecutorOnException);
+            _cachedCommandAck = new CommandAck();
+            _stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+            _streamFactory = streamFactory;
+            _faultStrategy = faultStrategy;
+            _setProxyState = setProxyState;
+            _connectTokenSource = new CancellationTokenSource();
+            Stream = new NotReadyStream();
+            _connectTask = Connect(_connectTokenSource.Token);
+            _throwIfClosedOrFaulted = () => { };
+        }
+
+        public async Task<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+        public async Task<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+        public bool IsFinalState() => _stateManager.IsFinalState();
+        public bool IsFinalState(ConsumerState state) => _stateManager.IsFinalState(state);
+
+        public void Dispose()
+        {
+            _executor.Dispose();
+            _connectTokenSource.Cancel();
+            _connectTask.Wait();
+        }
+
+        public async Task<Message> Receive(CancellationToken cancellationToken) => await _executor.Execute(() => Stream.Receive(cancellationToken), cancellationToken);
+
+        public async Task Acknowledge(Message message, CancellationToken cancellationToken)
+            => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken);
+
+        public async Task Acknowledge(MessageId messageId, CancellationToken cancellationToken)
+            => await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken);
+
+        public async Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
+            => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
+
+        public async Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
+            => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
+
+        public async Task Unsubscribe(CancellationToken cancellationToken)
+        {
+            _ = await _executor.Execute(() => Stream.Send(new CommandUnsubscribe()), cancellationToken);
+            HasClosed();
+        }
+
+        public async Task Seek(MessageId messageId, CancellationToken cancellationToken)
+        {
+            var seek = new CommandSeek { MessageId = messageId.Data };
+            _ = await _executor.Execute(() => Stream.Send(seek), cancellationToken);
+            return;
+        }
+
+        public async Task<MessageId> GetLastMessageId(CancellationToken cancellationToken)
+        {
+            var response = await _executor.Execute(() => Stream.Send(new CommandGetLastMessageId()), cancellationToken);
+            return new MessageId(response.LastMessageId);
+        }
+
+        private async Task Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
+        {
+            await _executor.Execute(() =>
+            {
+                _cachedCommandAck.Type = ackType;
+                _cachedCommandAck.MessageIds.Clear();
+                _cachedCommandAck.MessageIds.Add(messageIdData);
+                return Stream.Send(_cachedCommandAck);
+            }, cancellationToken);
+        }
+
+        private async Task ExecutorOnException(Exception exception, CancellationToken cancellationToken)
+        {
+            _throwIfClosedOrFaulted();
+
+            switch (_faultStrategy.DetermineFaultAction(exception))
+            {
+                case FaultAction.Retry:
+                    await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+                    break;
+                case FaultAction.Relookup:
+                    await _stateManager.StateChangedFrom(ConsumerState.Disconnected, cancellationToken);
+                    break;
+                case FaultAction.Fault:
+                    HasFaulted(exception);
+                    break;
+            }
+
+            _throwIfClosedOrFaulted();
+        }
+
+        private void HasFaulted(Exception exception)
+        {
+            _throwIfClosedOrFaulted = () => throw exception;
+            _stateManager.SetState(ConsumerState.Faulted);
+        }
+
+        private void HasClosed()
+        {
+            _throwIfClosedOrFaulted = () => throw new ConsumerClosedException();
+            _stateManager.SetState(ConsumerState.Closed);
+        }
+
+        private async Task Connect(CancellationToken cancellationToken)
+        {
+            try
+            {
+                while (true)
+                {
+                    using (var proxy = new ConsumerProxy(_stateManager, new AsyncQueue<MessagePackage>()))
+                    using (Stream = await _streamFactory.CreateStream(proxy, cancellationToken))
+                    {
+                        if (_setProxyState)
+                            proxy.Active();
+                        else
+                            await _stateManager.StateChangedFrom(ConsumerState.Disconnected, cancellationToken);
+
+                        await _stateManager.StateChangedTo(ConsumerState.Disconnected, cancellationToken);
+                        if (_stateManager.IsFinalState())
+                            return;
+                    }
+                }
+            }
+            catch (OperationCanceledException)
+            {
+                HasClosed();
+            }
+            catch (Exception exception)
+            {
+                HasFaulted(exception);
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
new file mode 100644
index 0000000..522240f
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -0,0 +1,60 @@
+using DotPulsar.Abstractions;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ConsumerBuilder : IConsumerBuilder
+    {
+        private readonly IPulsarClient _pulsarClient;
+        private readonly ConsumerOptions _options;
+
+        public ConsumerBuilder(IPulsarClient pulsarClient)
+        {
+            _pulsarClient = pulsarClient;
+            _options = new ConsumerOptions();
+        }
+
+        public IConsumerBuilder ConsumerName(string name)
+        {
+            _options.ConsumerName = name;
+            return this;
+        }
+
+        public IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition)
+        {
+            _options.InitialPosition = initialPosition;
+            return this;
+        }
+
+        public IConsumerBuilder PriorityLevel(int priorityLevel)
+        {
+            _options.PriorityLevel = priorityLevel;
+            return this;
+        }
+
+        public IConsumerBuilder MessagePrefetchCount(uint count)
+        {
+            _options.MessagePrefetchCount = count;
+            return this;
+        }
+
+        public IConsumerBuilder SubscriptionName(string name)
+        {
+            _options.SubscriptionName = name;
+            return this;
+        }
+
+        public IConsumerBuilder SubscriptionType(SubscriptionType type)
+        {
+            _options.SubscriptionType = type;
+            return this;
+        }
+
+        public IConsumerBuilder Topic(string topic)
+        {
+            _options.Topic = topic;
+            return this;
+        }
+
+        public IConsumer Create() => _pulsarClient.CreateConsumer(_options);
+    }
+}
diff --git a/src/DotPulsar/Internal/ConsumerManager.cs b/src/DotPulsar/Internal/ConsumerManager.cs
new file mode 100644
index 0000000..c461283
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerManager.cs
@@ -0,0 +1,61 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ConsumerManager : IDisposable
+    {
+        private readonly IdLookup<IConsumerProxy> _proxies;
+
+        public ConsumerManager() => _proxies = new IdLookup<IConsumerProxy>();
+
+        public bool HasConsumers => !_proxies.IsEmpty();
+
+        public void Outgoing(CommandSubscribe subscribe, IConsumerProxy proxy) => subscribe.ConsumerId = _proxies.Add(proxy);
+
+        public void Dispose()
+        {
+            foreach (var id in _proxies.AllIds())
+            {
+                RemoveConsumer(id);
+            }
+        }
+
+        public void Incoming(MessagePackage package)
+        {
+            var consumerId = package.Command.ConsumerId;
+            var proxy = _proxies[consumerId];
+            proxy?.Enqueue(package);
+        }
+
+        public void Incoming(CommandCloseConsumer command) => RemoveConsumer(command.ConsumerId);
+
+        public void Incoming(CommandActiveConsumerChange command)
+        {
+            var proxy = _proxies[command.ConsumerId];
+            if (proxy == null) return;
+
+            if (command.IsActive)
+                proxy.Active();
+            else
+                proxy.Inactive();
+        }
+
+        public void Incoming(CommandReachedEndOfTopic command)
+        {
+            var proxy = _proxies[command.ConsumerId];
+            proxy?.ReachedEndOfTopic();
+        }
+
+        public void Remove(ulong consumerId) => _proxies.Remove(consumerId);
+
+        private void RemoveConsumer(ulong consumerId)
+        {
+            var proxy = _proxies[consumerId];
+            if (proxy == null) return;
+            proxy.Disconnected();
+            _proxies.Remove(consumerId);
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ConsumerProxy.cs b/src/DotPulsar/Internal/ConsumerProxy.cs
new file mode 100644
index 0000000..8d87a48
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerProxy.cs
@@ -0,0 +1,54 @@
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ConsumerProxy : IConsumerProxy, IDisposable
+    {
+        private readonly object _lock;
+        private readonly StateManager<ConsumerState> _stateManager;
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private bool _hasDisconnected;
+
+        public ConsumerProxy(StateManager<ConsumerState> stateManager, AsyncQueue<MessagePackage> queue)
+        {
+            _lock = new object();
+            _stateManager = stateManager;
+            _queue = queue;
+            _hasDisconnected = false;
+        }
+
+        public void Active() => SetState(ConsumerState.Active);
+        public void Inactive() => SetState(ConsumerState.Inactive);
+        public void ReachedEndOfTopic() => SetState(ConsumerState.ReachedEndOfTopic);
+
+        public void Disconnected()
+        {
+            lock (_lock)
+            {
+                if (_hasDisconnected)
+                    return;
+
+                _stateManager.SetState(ConsumerState.Disconnected);
+                _hasDisconnected = true;
+            }
+        }
+
+
+        public void Enqueue(MessagePackage package) => _queue.Enqueue(package);
+        public async Task<MessagePackage> Dequeue(CancellationToken cancellationToken) => await _queue.Dequeue(cancellationToken);
+
+        private void SetState(ConsumerState state)
+        {
+            lock (_lock)
+            {
+                if (!_hasDisconnected)
+                    _stateManager.SetState(state);
+            }
+        }
+
+        public void Dispose() => _queue.Dispose();
+    }
+}
diff --git a/src/DotPulsar/Internal/ConsumerStream.cs b/src/DotPulsar/Internal/ConsumerStream.cs
new file mode 100644
index 0000000..0dea1fd
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerStream.cs
@@ -0,0 +1,151 @@
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ConsumerStream : IConsumerStream
+    {
+        private readonly ulong _id;
+        private readonly IDequeue<MessagePackage> _dequeue;
+        private readonly Connection _connection;
+        private readonly IFaultStrategy _faultStrategy;
+        private readonly IConsumerProxy _proxy;
+        private readonly CommandFlow _commandFlow;
+        private uint _sendWhenZero;
+        private bool _firstBatch;
+
+        public ConsumerStream(ulong id, uint messagePrefetchCount, IDequeue<MessagePackage> dequeue, Connection connection, IFaultStrategy faultStrategy, IConsumerProxy proxy)
+        {
+            _id = id;
+            _dequeue = dequeue;
+            _connection = connection;
+            _faultStrategy = faultStrategy;
+            _proxy = proxy;
+            _commandFlow = new CommandFlow { ConsumerId = id, MessagePermits = messagePrefetchCount };
+            _sendWhenZero = 0;
+            _firstBatch = true;
+        }
+
+        public async Task<Message> Receive(CancellationToken cancellationToken)
+        {
+            while (true)
+            {
+                if (_sendWhenZero == 0) //TODO should sending the flow command be handled on other thread and thereby not slow down the consumer?
+                {
+                    await _connection.Send(_commandFlow);
+
+                    if (_firstBatch)
+                    {
+                        _commandFlow.MessagePermits = (uint)Math.Ceiling(_commandFlow.MessagePermits * 0.5);
+                        _firstBatch = false;
+                    }
+
+                    _sendWhenZero = _commandFlow.MessagePermits;
+                }
+
+                var messagePackage = await _dequeue.Dequeue(cancellationToken);
+                _sendWhenZero--;
+
+                try
+                {
+                    return Serializer.Deserialize(messagePackage);
+                }
+                catch (ChecksumException)
+                {
+                    var ack = new CommandAck
+                    {
+                        Type = CommandAck.AckType.Individual,
+                        validation_error = CommandAck.ValidationError.ChecksumMismatch
+                    };
+                    ack.MessageIds.Add(messagePackage.Command.MessageId);
+                    await Send(ack);
+                }
+            }
+        }
+
+        public async Task Send(CommandAck command)
+        {
+            try
+            {
+                command.ConsumerId = _id;
+                await _connection.Send(command);
+            }
+            catch (Exception exception)
+            {
+                OnException(exception);
+                throw;
+            }
+        }
+
+        public async Task<CommandSuccess> Send(CommandUnsubscribe command)
+        {
+            try
+            {
+                command.ConsumerId = _id;
+                var response = await _connection.Send(command);
+                response.Expect(BaseCommand.Type.Success);
+                return response.Success;
+            }
+            catch (Exception exception)
+            {
+                OnException(exception);
+                throw;
+            }
+        }
+
+        public async Task<CommandSuccess> Send(CommandSeek command)
+        {
+            try
+            {
+                command.ConsumerId = _id;
+                var response = await _connection.Send(command);
+                response.Expect(BaseCommand.Type.Success);
+                return response.Success;
+            }
+            catch (Exception exception)
+            {
+                OnException(exception);
+                throw;
+            }
+        }
+
+        public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command)
+        {
+            try
+            {
+                command.ConsumerId = _id;
+                var response = await _connection.Send(command);
+                response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
+                return response.GetLastMessageIdResponse;
+            }
+            catch (Exception exception)
+            {
+                OnException(exception);
+                throw;
+            }
+        }
+
+        public void Dispose()
+        {
+            try
+            {
+                _connection.Send(new CommandCloseConsumer { ConsumerId = _id }).Wait();
+            }
+            catch
+            {
+                // Ignore
+            }
+        }
+
+        private void OnException(Exception exception)
+        {
+            if (_faultStrategy.DetermineFaultAction(exception) == FaultAction.Relookup)
+                _proxy.Disconnected();
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ConsumerStreamFactory.cs b/src/DotPulsar/Internal/ConsumerStreamFactory.cs
new file mode 100644
index 0000000..d8fd673
--- /dev/null
+++ b/src/DotPulsar/Internal/ConsumerStreamFactory.cs
@@ -0,0 +1,81 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ConsumerStreamFactory : IConsumerStreamFactory
+    {
+        private readonly ConnectionPool _connectionTool;
+        private readonly IFaultStrategy _faultStrategy;
+        private readonly CommandSubscribe _subscribe;
+        private readonly uint _messagePrefetchCount;
+
+        public ConsumerStreamFactory(ConnectionPool connectionManager, ConsumerOptions options, IFaultStrategy faultStrategy)
+        {
+            _connectionTool = connectionManager;
+            _faultStrategy = faultStrategy;
+            _messagePrefetchCount = options.MessagePrefetchCount;
+
+            _subscribe = new CommandSubscribe
+            {
+                ConsumerName = options.ConsumerName,
+                initialPosition = (CommandSubscribe.InitialPosition)options.InitialPosition,
+                PriorityLevel = options.PriorityLevel,
+                Subscription = options.SubscriptionName,
+                Topic = options.Topic,
+                Type = (CommandSubscribe.SubType)options.SubscriptionType
+            };
+        }
+
+        public ConsumerStreamFactory(ConnectionPool connectionManager, ReaderOptions options, IFaultStrategy faultStrategy)
+        {
+            _connectionTool = connectionManager;
+            _faultStrategy = faultStrategy;
+            _messagePrefetchCount = options.MessagePrefetchCount;
+
+            _subscribe = new CommandSubscribe
+            {
+                ConsumerName = options.ReaderName,
+                Durable = false,
+                StartMessageId = options.StartMessageId.Data,
+                Subscription = "Reader-" + Guid.NewGuid().ToString("N"),
+                Topic = options.Topic
+            };
+        }
+
+        public async Task<IConsumerStream> CreateStream(IConsumerProxy proxy, CancellationToken cancellationToken)
+        {
+            while (true)
+            {
+                try
+                {
+                    var connection = await _connectionTool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
+                    var response = await connection.Send(_subscribe, proxy);
+                    return new ConsumerStream(response.ConsumerId, _messagePrefetchCount, proxy, connection, _faultStrategy, proxy);
+                }
+                catch (OperationCanceledException)
+                {
+                    if (cancellationToken.IsCancellationRequested)
+                        throw;
+                    else
+                        await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+                }
+                catch (Exception exception)
+                {
+                    switch (_faultStrategy.DetermineFaultAction(exception))
+                    {
+                        case FaultAction.Relookup:
+                        case FaultAction.Retry:
+                            await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+                            continue;
+                    }
+
+                    throw;
+                }
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Crc32C.cs b/src/DotPulsar/Internal/Crc32C.cs
new file mode 100644
index 0000000..eaee280
--- /dev/null
+++ b/src/DotPulsar/Internal/Crc32C.cs
@@ -0,0 +1,77 @@
+using System.Buffers;
+
+namespace DotPulsar.Internal
+{
+    public static class Crc32C
+    {
+        private const uint Generator = 0x82F63B78u;
+
+        private static readonly uint[] Lookup;
+
+        static Crc32C()
+        {
+            Lookup = new uint[16 * 256];
+
+            for (uint i = 0; i < 256; i++)
+            {
+                var entry = i;
+                for (var j = 0; j < 16; j++)
+                {
+                    for (var k = 0; k < 8; k++)
+                    {
+                        entry = (entry & 1) == 1 ? Generator ^ (entry >> 1) : (entry >> 1);
+                    }
+                    Lookup[(j * 256) + i] = entry;
+                }
+            }
+        }
+
+        public static uint Calculate(ReadOnlySequence<byte> sequence)
+        {
+            var block = new uint[16];
+            var checksum = uint.MaxValue;
+            var remaningBytes = sequence.Length;
+            var readingBlock = remaningBytes >= 16;
+            var offset = 15;
+
+            foreach (var memory in sequence)
+            {
+                var span = memory.Span;
+                for (var i = 0; i < span.Length; ++i)
+                {
+                    var currentByte = span[i];
+
+                    if (!readingBlock)
+                    {
+                        checksum = Lookup[(byte)(checksum ^ currentByte)] ^ checksum >> 8;
+                        continue;
+                    }
+
+                    var offSetBase = offset * 256;
+
+                    if (offset > 11)
+                        block[offset] = Lookup[offSetBase + ((byte)(checksum >> (8 * (15 - offset))) ^ currentByte)];
+                    else
+                        block[offset] = Lookup[offSetBase + currentByte];
+
+                    --remaningBytes;
+
+                    if (offset == 0)
+                    {
+                        offset = 15;
+                        readingBlock = remaningBytes >= 16;
+                        checksum = 0;
+                        for (var j = 0; j < block.Length; ++j)
+                            checksum ^= block[j];
+                    }
+                    else
+                    {
+                        --offset;
+                    }
+                }
+            }
+
+            return checksum ^ uint.MaxValue;
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs b/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
new file mode 100644
index 0000000..328cc70
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
@@ -0,0 +1,9 @@
+using DotPulsar.Exceptions;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class ConsumerNotFoundException : DotPulsarException
+    {
+        public ConsumerNotFoundException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
new file mode 100644
index 0000000..ca43151
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
@@ -0,0 +1,9 @@
+using DotPulsar.Exceptions;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class ServiceNotReadyException : DotPulsarException
+    {
+        public ServiceNotReadyException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/StreamNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/StreamNotReadyException.cs
new file mode 100644
index 0000000..5618e19
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/StreamNotReadyException.cs
@@ -0,0 +1,9 @@
+using DotPulsar.Exceptions;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class StreamNotReadyException : DotPulsarException
+    {
+        public StreamNotReadyException() : base("The service is not ready yet") { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs b/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
new file mode 100644
index 0000000..d44620f
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
@@ -0,0 +1,9 @@
+using DotPulsar.Exceptions;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class TooManyRequestsException : DotPulsarException
+    {
+        public TooManyRequestsException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs b/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
new file mode 100644
index 0000000..d783deb
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
@@ -0,0 +1,11 @@
+using DotPulsar.Exceptions;
+using System;
+
+namespace DotPulsar.Internal.Exceptions
+{
+    public sealed class UnexpectedResponseException : DotPulsarException
+    {
+        public UnexpectedResponseException(string message) : base(message) { }
+        public UnexpectedResponseException(string message, Exception innerException) : base(message, innerException) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Executor.cs b/src/DotPulsar/Internal/Executor.cs
new file mode 100644
index 0000000..1f5b5b6
--- /dev/null
+++ b/src/DotPulsar/Internal/Executor.cs
@@ -0,0 +1,102 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class Executor : IDisposable
+    {
+        private readonly AsyncLock _lock;
+        private readonly Func<Exception, CancellationToken, Task> _onException;
+
+        public Executor(Func<Exception, CancellationToken, Task> onException)
+        {
+            _lock = new AsyncLock();
+            _onException = onException;
+        }
+
+        public void Dispose() => _lock.Dispose();
+
+        public async Task Execute(Action action, CancellationToken cancellationToken)
+        {
+            using (await _lock.Lock(cancellationToken))
+            {
+                while (true)
+                {
+                    try
+                    {
+                        action();
+                        return;
+                    }
+                    catch (Exception exception)
+                    {
+                        await _onException(exception, cancellationToken);
+                    }
+
+                    cancellationToken.ThrowIfCancellationRequested();
+                }
+            }
+        }
+
+        public async Task Execute(Func<Task> func, CancellationToken cancellationToken)
+        {
+            using (await _lock.Lock(cancellationToken))
+            {
+                while (true)
+                {
+                    try
+                    {
+                        await func();
+                        return;
+                    }
+                    catch (Exception exception)
+                    {
+                        await _onException(exception, cancellationToken);
+                    }
+
+                    cancellationToken.ThrowIfCancellationRequested();
+                }
+            }
+        }
+
+        public async Task<TResult> Execute<TResult>(Func<TResult> func, CancellationToken cancellationToken)
+        {
+            using (await _lock.Lock(cancellationToken))
+            {
+                while (true)
+                {
+                    try
+                    {
+                        return func();
+                    }
+                    catch (Exception exception)
+                    {
+                        await _onException(exception, cancellationToken);
+                    }
+
+                    cancellationToken.ThrowIfCancellationRequested();
+                }
+            }
+        }
+
+        public async Task<TResult> Execute<TResult>(Func<Task<TResult>> func, CancellationToken cancellationToken)
+        {
+            using (await _lock.Lock(cancellationToken))
+            {
+                while (true)
+                {
+                    try
+                    {
+                        return await func();
+                    }
+                    catch (Exception exception)
+                    {
+                        await _onException(exception, cancellationToken);
+                    }
+
+                    cancellationToken.ThrowIfCancellationRequested();
+                }
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
new file mode 100644
index 0000000..29d743c
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -0,0 +1,193 @@
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Exceptions;
+using DotPulsar.Internal.PulsarApi;
+
+namespace DotPulsar.Internal.Extensions
+{
+    public static class CommandExtensions
+    {
+        public static void Expect(this BaseCommand command, params BaseCommand.Type[] types)
+        {
+            var actual = command.CommandType;
+
+            foreach (var type in types)
+            {
+                if (type == actual)
+                    return;
+            }
+
+            switch (actual)
+            {
+                case BaseCommand.Type.Error:
+                    command.Error.Throw();
+                    return;
+                case BaseCommand.Type.SendError:
+                    command.SendError.Throw();
+                    return;
+            }
+
+            throw new UnexpectedResponseException($"Expected '{string.Join(",", types)}' but got '{actual}'");
+        }
+
+        public static void Throw(this CommandSendError command) => Throw(command.Error, command.Message);
+
+        public static void Throw(this CommandLookupTopicResponse command) => Throw(command.Error, command.Message);
+
+        public static void Throw(this CommandError error) => Throw(error.Error, error.Message);
+
+        private static void Throw(ServerError error, string message)
+        {
+            switch (error)
+            {
+                case ServerError.AuthenticationError: throw new AuthenticationException(message);
+                case ServerError.AuthorizationError: throw new AuthorizationException(message);
+                case ServerError.ChecksumError: throw new ChecksumException(message);
+                case ServerError.ConsumerAssignError: throw new ConsumerAssignException(message);
+                case ServerError.ConsumerBusy: throw new ConsumerBusyException(message);
+                case ServerError.ConsumerNotFound: throw new ConsumerNotFoundException(message);
+                case ServerError.IncompatibleSchema: throw new IncompatibleSchemaException(message);
+                case ServerError.InvalidTopicName: throw new InvalidTopicNameException(message);
+                case ServerError.MetadataError: throw new MetadataException(message);
+                case ServerError.PersistenceError: throw new PersistenceException(message);
+                case ServerError.ProducerBlockedQuotaExceededError:
+                case ServerError.ProducerBlockedQuotaExceededException:
+                    throw new ProducerBlockedQuotaExceededException(message + ". Error code: " + error);
+                case ServerError.ProducerBusy: throw new ProducerBusyException(message);
+                case ServerError.ServiceNotReady: throw new ServiceNotReadyException(message);
+                case ServerError.SubscriptionNotFound: throw new SubscriptionNotFoundException(message);
+                case ServerError.TooManyRequests: throw new TooManyRequestsException(message);
+                case ServerError.TopicNotFound: throw new TopicNotFoundException(message);
+                case ServerError.TopicTerminatedError: throw new TopicTerminatedException(message);
+                case ServerError.UnsupportedVersionError: throw new UnsupportedVersionException(message);
+                case ServerError.UnknownError:
+                default: throw new UnknownException(message + ". Error code: " + error);
+            }
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandAck command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Ack,
+                Ack = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandConnect command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Connect,
+                Connect = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandPing command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Ping,
+                Ping = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandPong command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Pong,
+                Pong = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandProducer command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Producer,
+                Producer = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandGetLastMessageId command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.GetLastMessageId,
+                GetLastMessageId = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandUnsubscribe command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Unsubscribe,
+                Unsubscribe = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandSubscribe command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Subscribe,
+                Subscribe = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandLookupTopic command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Lookup,
+                LookupTopic = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandSend command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Send,
+                Send = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandFlow command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Flow,
+                Flow = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandCloseProducer command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.CloseProducer,
+                CloseProducer = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandCloseConsumer command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.CloseConsumer,
+                CloseConsumer = command
+            };
+        }
+
+        public static BaseCommand AsBaseCommand(this CommandSeek command)
+        {
+            return new BaseCommand
+            {
+                CommandType = BaseCommand.Type.Seek,
+                Seek = command
+            };
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
new file mode 100644
index 0000000..e477974
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -0,0 +1,82 @@
+using System;
+using System.Buffers;
+
+namespace DotPulsar.Internal.Extensions
+{
+    public static class ReadOnlySequenceExtensions
+    {
+        public static bool StartsWith<T>(this ReadOnlySequence<T> sequence, ReadOnlyMemory<T> target) where T : IEquatable<T>
+        {
+            if (target.Length > sequence.Length)
+                return false;
+
+            var targetIndex = 0;
+            var targetSpan = target.Span;
+
+            foreach (var memory in sequence)
+            {
+                var span = memory.Span;
+                for (var i = 0; i < span.Length; ++i)
+                {
+                    if (!span[i].Equals(targetSpan[targetIndex]))
+                        return false;
+
+                    ++targetIndex;
+                    if (targetIndex == targetSpan.Length)
+                        return true;
+                }
+            }
+
+            return false;
+        }
+
+        public static uint ReadUInt32(this ReadOnlySequence<byte> sequence, int start, bool isBigEndian)
+        {
+            if (sequence.Length < (4 + start))
+                throw new ArgumentOutOfRangeException(nameof(start), start, "Sequence must be at least 4 bytes long from 'start' to end");
+
+            var reverse = isBigEndian != BitConverter.IsLittleEndian;
+            var union = new UIntUnion();
+            var read = 0;
+
+            foreach (var memory in sequence)
+            {
+                if (start > memory.Length)
+                {
+                    start -= memory.Length;
+                    continue;
+                }
+
+                var span = memory.Span;
+                for (var i = start; i < span.Length; ++i, ++read)
+                {
+                    switch (read)
+                    {
+                        case 0:
+                            if (reverse) union.B0 = span[i];
+                            else union.B3 = span[i];
+                            continue;
+                        case 1:
+                            if (reverse) union.B1 = span[i];
+                            else union.B2 = span[i];
+                            continue;
+                        case 2:
+                            if (reverse) union.B2 = span[i];
+                            else union.B1 = span[i];
+                            continue;
+                        case 3:
+                            if (reverse) union.B3 = span[i];
+                            else union.B0 = span[i];
+                            break;
+                    }
+                }
+
+                if (read == 3)
+                    break;
+                start = 0;
+            }
+
+            return union.UInt;
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/FaultAction.cs b/src/DotPulsar/Internal/FaultAction.cs
new file mode 100644
index 0000000..263567a
--- /dev/null
+++ b/src/DotPulsar/Internal/FaultAction.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar.Internal
+{
+    public enum FaultAction : byte
+    {
+        Retry,
+        Relookup,
+        Fault
+    }
+}
diff --git a/src/DotPulsar/Internal/FaultStrategy.cs b/src/DotPulsar/Internal/FaultStrategy.cs
new file mode 100644
index 0000000..cfd91a0
--- /dev/null
+++ b/src/DotPulsar/Internal/FaultStrategy.cs
@@ -0,0 +1,30 @@
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
+using System;
+
+namespace DotPulsar.Internal
+{
+    public sealed class FaultStrategy : IFaultStrategy
+    {
+        public FaultStrategy(int timeToWaitInMilliseconds)
+        {
+            TimeToWait = TimeSpan.FromMilliseconds(timeToWaitInMilliseconds);
+        }
+
+        public TimeSpan TimeToWait { get; }
+
+        public FaultAction DetermineFaultAction(Exception exception)
+        {
+            switch (exception)
+            {
+                case TooManyRequestsException _: return FaultAction.Retry;
+                case StreamNotReadyException _: return FaultAction.Relookup;
+                case ServiceNotReadyException _: return FaultAction.Relookup;
+                case DotPulsarException _: return FaultAction.Fault;
+            }
+
+            return FaultAction.Relookup;
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/IdLookup.cs b/src/DotPulsar/Internal/IdLookup.cs
new file mode 100644
index 0000000..433d9e2
--- /dev/null
+++ b/src/DotPulsar/Internal/IdLookup.cs
@@ -0,0 +1,59 @@
+using System.Collections.Generic;
+
+namespace DotPulsar.Internal
+{
+    public sealed class IdLookup<T> where T : class
+    {
+        private T[] _items;
+
+        public IdLookup() => _items = new T[0];
+
+        public bool IsEmpty()
+        {
+            for (var i = 0; i < _items.Length; ++i)
+            {
+                if (_items[i] != null)
+                    return false;
+            }
+
+            return true;
+        }
+
+        public ulong[] AllIds()
+        {
+            var activeIds = new List<ulong>();
+            for (var i = 0; i < _items.Length; ++i)
+            {
+                if (_items[i] != null)
+                    activeIds.Add((ulong)i);
+            }
+            return activeIds.ToArray();
+        }
+
+        public ulong Add(T item)
+        {
+            for (int i = 0; i < _items.Length; ++i)
+            {
+                if (_items[i] != null)
+                    continue;
+
+                _items[i] = item;
+                return (ulong)i;
+            }
+
+            var newArray = new T[_items.Length + 1];
+            _items.CopyTo(newArray, 0);
+            var id = newArray.Length - 1;
+            newArray[id] = item;
+            _items = newArray;
+            return (ulong)id;
+        }
+
+        public void Remove(ulong id) => _items[(int)id] = null;
+
+        public T this[ulong id]
+        {
+            get => _items[(int)id];
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
new file mode 100644
index 0000000..394a3f6
--- /dev/null
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -0,0 +1,39 @@
+using DotPulsar.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class MessageBuilder : IMessageBuilder
+    {
+        private readonly IProducer _producer;
+        private readonly MessageMetadata _metadata;
+
+        public MessageBuilder(IProducer producer)
+        {
+            _producer = producer;
+            _metadata = new MessageMetadata();
+        }
+
+        public IMessageBuilder EventTime(ulong eventTime)
+        {
+            _metadata.EventTime = eventTime;
+            return this;
+        }
+
+        public IMessageBuilder Property(string key, string value)
+        {
+            _metadata[key] = value;
+            return this;
+        }
+
+        public IMessageBuilder SequenceId(ulong sequenceId)
+        {
+            _metadata.SequenceId = sequenceId;
+            return this;
+        }
+
+        public async Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) => await _producer.Send(_metadata, data, cancellationToken);
+    }
+}
diff --git a/src/DotPulsar/Internal/MessagePackage.cs b/src/DotPulsar/Internal/MessagePackage.cs
new file mode 100644
index 0000000..2aaf738
--- /dev/null
+++ b/src/DotPulsar/Internal/MessagePackage.cs
@@ -0,0 +1,17 @@
+using DotPulsar.Internal.PulsarApi;
+using System.Buffers;
+
+namespace DotPulsar.Internal
+{
+    public sealed class MessagePackage
+    {
+        public MessagePackage(CommandMessage command, ReadOnlySequence<byte> data)
+        {
+            Command = command;
+            Data = data;
+        }
+
+        public CommandMessage Command { get; }
+        public ReadOnlySequence<byte> Data { get; }
+    }
+}
diff --git a/src/DotPulsar/Internal/NotReadyStream.cs b/src/DotPulsar/Internal/NotReadyStream.cs
new file mode 100644
index 0000000..0b4f888
--- /dev/null
+++ b/src/DotPulsar/Internal/NotReadyStream.cs
@@ -0,0 +1,28 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Exceptions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class NotReadyStream : IConsumerStream, IProducerStream
+    {
+        public void Dispose() { }
+
+        public Task<Message> Receive(CancellationToken cancellationToken) => throw GetException();
+
+        public Task Send(CommandAck command) => throw GetException();
+
+        public Task<CommandSuccess> Send(CommandUnsubscribe command) => throw GetException();
+
+        public Task<CommandSuccess> Send(CommandSeek command) => throw GetException();
+
+        public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command) => throw GetException();
+
+        public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload) => throw GetException();
+
+        private Exception GetException() => new StreamNotReadyException();
+    }
+}
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs
new file mode 100644
index 0000000..e2367b4
--- /dev/null
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -0,0 +1,21 @@
+using DotPulsar.Internal.PulsarApi;
+
+namespace DotPulsar.Internal
+{
+    public sealed class PingPongHandler
+    {
+        private readonly Connection _connection;
+        private readonly CommandPong _pong;
+
+        public PingPongHandler(Connection connection)
+        {
+            _connection = connection;
+            _pong = new CommandPong();
+        }
+
+        public void Incoming(CommandPing ping)
+        {
+            _ = _connection.Send(_pong);
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
new file mode 100644
index 0000000..d37d351
--- /dev/null
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -0,0 +1,116 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class Producer : IProducer
+    {
+        private readonly Executor _executor;
+        private readonly IProducerStreamFactory _streamFactory;
+        private readonly IFaultStrategy _faultStrategy;
+        private readonly StateManager<ProducerState> _stateManager;
+        private readonly CancellationTokenSource _connectTokenSource;
+        private readonly Task _connectTask;
+        private Action _throwIfClosedOrFaulted;
+        private IProducerStream Stream { get; set; }
+
+        public Producer(IProducerStreamFactory streamFactory, IFaultStrategy faultStrategy)
+        {
+            _executor = new Executor(ExecutorOnException);
+            _streamFactory = streamFactory;
+            _faultStrategy = faultStrategy;
+            _stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
+            _connectTokenSource = new CancellationTokenSource();
+            Stream = new NotReadyStream();
+            _connectTask = Connect(_connectTokenSource.Token);
+            _throwIfClosedOrFaulted = () => { };
+        }
+
+        public async Task<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+        public async Task<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+        public bool IsFinalState() => _stateManager.IsFinalState();
+        public bool IsFinalState(ProducerState state) => _stateManager.IsFinalState(state);
+
+        public void Dispose()
+        {
+            _executor.Dispose();
+            _connectTokenSource.Cancel();
+            _connectTask.Wait();
+        }
+
+        public async Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
+            => await Send(new MessageMetadata(), data, cancellationToken);
+
+        public async Task<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
+            => await Send(metadata.Metadata, data, cancellationToken);
+
+        private async Task<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload, CancellationToken cancellationToken)
+        {
+            var response = await _executor.Execute(() => Stream.Send(metadata, payload), cancellationToken);
+            return new MessageId(response.MessageId);
+        }
+
+        private async Task ExecutorOnException(Exception exception, CancellationToken cancellationToken)
+        {
+            _throwIfClosedOrFaulted();
+
+            switch (_faultStrategy.DetermineFaultAction(exception))
+            {
+                case FaultAction.Retry:
+                    await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+                    break;
+                case FaultAction.Relookup:
+                    await _stateManager.StateChangedTo(ProducerState.Connected, cancellationToken);
+                    break;
+                case FaultAction.Fault:
+                    HasFaulted(exception);
+                    break;
+            }
+
+            _throwIfClosedOrFaulted();
+        }
+
+        private void HasFaulted(Exception exception)
+        {
+            _throwIfClosedOrFaulted = () => throw exception;
+            _stateManager.SetState(ProducerState.Faulted);
+        }
+
+        private void HasClosed()
+        {
+            _throwIfClosedOrFaulted = () => throw new ProducerClosedException();
+            _stateManager.SetState(ProducerState.Closed);
+        }
+
+        private async Task Connect(CancellationToken cancellationToken)
+        {
+            try
+            {
+                while (true)
+                {
+                    var proxy = new ProducerProxy(_stateManager);
+
+                    using (Stream = await _streamFactory.CreateStream(proxy, cancellationToken))
+                    {
+                        proxy.Connected();
+                        await _stateManager.StateChangedFrom(ProducerState.Connected, cancellationToken);
+                        if (_stateManager.IsFinalState())
+                            return;
+                    }
+                }
+            }
+            catch (OperationCanceledException)
+            {
+                HasClosed();
+            }
+            catch (Exception exception)
+            {
+                HasFaulted(exception);
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
new file mode 100644
index 0000000..654086c
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -0,0 +1,36 @@
+using DotPulsar.Abstractions;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ProducerBuilder : IProducerBuilder
+    {
+        private readonly IPulsarClient _pulsarClient;
+        private readonly ProducerOptions _options;
+
+        public ProducerBuilder(IPulsarClient pulsarClient)
+        {
+            _pulsarClient = pulsarClient;
+            _options = new ProducerOptions();
+        }
+
+        public IProducerBuilder ProducerName(string name)
+        {
+            _options.ProducerName = name;
+            return this;
+        }
+
+        public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
+        {
+            _options.InitialSequenceId = initialSequenceId;
+            return this;
+        }
+
+        public IProducerBuilder Topic(string topic)
+        {
+            _options.Topic = topic;
+            return this;
+        }
+
+        public IProducer Create() => _pulsarClient.CreateProducer(_options);
+    }
+}
diff --git a/src/DotPulsar/Internal/ProducerManager.cs b/src/DotPulsar/Internal/ProducerManager.cs
new file mode 100644
index 0000000..2bd6494
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerManager.cs
@@ -0,0 +1,37 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ProducerManager : IDisposable
+    {
+        private readonly IdLookup<IProducerProxy> _proxies;
+
+        public ProducerManager() => _proxies = new IdLookup<IProducerProxy>();
+
+        public bool HasProducers => !_proxies.IsEmpty();
+
+        public void Outgoing(CommandProducer producer, IProducerProxy proxy) => producer.ProducerId = _proxies.Add(proxy);
+
+        public void Remove(ulong producerId) => _proxies.Remove(producerId);
+
+        public void Incoming(CommandCloseProducer command) => RemoveProducer(command.ProducerId);
+
+        public void Dispose()
+        {
+            foreach (var id in _proxies.AllIds())
+            {
+                RemoveProducer(id);
+            }
+        }
+
+        private void RemoveProducer(ulong producerId)
+        {
+            var stateManager = _proxies[producerId];
+            if (stateManager == null) return;
+            stateManager.Disconnected();
+            _proxies.Remove(producerId);
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ProducerProxy.cs b/src/DotPulsar/Internal/ProducerProxy.cs
new file mode 100644
index 0000000..f5711e5
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerProxy.cs
@@ -0,0 +1,39 @@
+using DotPulsar.Internal.Abstractions;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ProducerProxy : IProducerProxy
+    {
+        private readonly object _lock;
+        private readonly StateManager<ProducerState> _stateManager;
+        private bool _hasDisconnected;
+
+        public ProducerProxy(StateManager<ProducerState> stateManager)
+        {
+            _lock = new object();
+            _stateManager = stateManager;
+            _hasDisconnected = false;
+        }
+
+        public void Connected()
+        {
+            lock (_lock)
+            {
+                if (!_hasDisconnected)
+                    _stateManager.SetState(ProducerState.Connected);
+            }
+        }
+
+        public void Disconnected()
+        {
+            lock (_lock)
+            {
+                if (_hasDisconnected)
+                    return;
+
+                _stateManager.SetState(ProducerState.Disconnected);
+                _hasDisconnected = true;
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ProducerResponse.cs b/src/DotPulsar/Internal/ProducerResponse.cs
new file mode 100644
index 0000000..458d186
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerResponse.cs
@@ -0,0 +1,14 @@
+namespace DotPulsar.Internal
+{
+    public sealed class ProducerResponse
+    {
+        public ProducerResponse(ulong producerId, string producerName)
+        {
+            ProducerId = producerId;
+            ProducerName = producerName;
+        }
+
+        public ulong ProducerId { get; }
+        public string ProducerName { get; }
+    }
+}
diff --git a/src/DotPulsar/Internal/ProducerStream.cs b/src/DotPulsar/Internal/ProducerStream.cs
new file mode 100644
index 0000000..7b296e0
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerStream.cs
@@ -0,0 +1,69 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ProducerStream : IProducerStream
+    {
+        private readonly SendPackage _cachedSendPackage;
+        private readonly ulong _id;
+        private readonly string _name;
+        private readonly SequenceId _sequenceId;
+        private readonly Connection _connection;
+        private readonly IFaultStrategy _faultStrategy;
+        private readonly IProducerProxy _proxy;
+
+        public ProducerStream(ulong id, string name, SequenceId sequenceId, Connection connection, IFaultStrategy faultStrategy, IProducerProxy proxy)
+        {
+            _cachedSendPackage = new SendPackage(new CommandSend { ProducerId = id, NumMessages = 1 });
+            _id = id;
+            _name = name;
+            _sequenceId = sequenceId;
+            _connection = connection;
+            _faultStrategy = faultStrategy;
+            _proxy = proxy;
+        }
+
+        public void Dispose()
+        {
+            try
+            {
+                _connection.Send(new CommandCloseProducer { ProducerId = _id }).Wait();
+            }
+            catch
+            {
+                // Ignore
+            }
+        }
+
+        public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload)
+        {
+            try
+            {
+                metadata.PublishTime = (ulong)DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+                metadata.ProducerName = _name;
+
+                if (metadata.SequenceId == 0)
+                    metadata.SequenceId = _sequenceId.Current;
+
+                _cachedSendPackage.Command.SequenceId = metadata.SequenceId;
+                _cachedSendPackage.Metadata = metadata;
+                _cachedSendPackage.Payload = payload;
+
+                var response = await _connection.Send(_cachedSendPackage);
+                response.Expect(BaseCommand.Type.SendReceipt); //TODO find out if we should increment on SendError
+                _sequenceId.Increment();
+                return response.SendReceipt;
+            }
+            catch (Exception exception)
+            {
+                if (_faultStrategy.DetermineFaultAction(exception) == FaultAction.Relookup)
+                    _proxy.Disconnected();
+                throw;
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ProducerStreamFactory.cs b/src/DotPulsar/Internal/ProducerStreamFactory.cs
new file mode 100644
index 0000000..5c3f138
--- /dev/null
+++ b/src/DotPulsar/Internal/ProducerStreamFactory.cs
@@ -0,0 +1,62 @@
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ProducerStreamFactory : IProducerStreamFactory
+    {
+        private readonly ConnectionPool _connectionTool;
+        private readonly ProducerOptions _options;
+        private readonly IFaultStrategy _faultStrategy;
+        private readonly SequenceId _sequenceId;
+
+        public ProducerStreamFactory(ConnectionPool connectionManager, ProducerOptions options, IFaultStrategy faultStrategy)
+        {
+            _connectionTool = connectionManager;
+            _options = options;
+            _faultStrategy = faultStrategy;
+            _sequenceId = new SequenceId(options.InitialSequenceId);
+        }
+
+        public async Task<IProducerStream> CreateStream(IProducerProxy proxy, CancellationToken cancellationToken)
+        {
+            var commandProducer = new CommandProducer
+            {
+                ProducerName = _options.ProducerName,
+                Topic = _options.Topic
+            };
+
+            while (true)
+            {
+                try
+                {
+                    var connection = await _connectionTool.FindConnectionForTopic(_options.Topic, cancellationToken);
+                    var response = await connection.Send(commandProducer, proxy);
+                    return new ProducerStream(response.ProducerId, response.ProducerName, _sequenceId, connection, _faultStrategy, proxy);
+                }
+                catch (OperationCanceledException)
+                {
+                    if (cancellationToken.IsCancellationRequested)
+                        throw;
+                    else
+                        await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+                }
+                catch (Exception exception)
+                {
+                    switch (_faultStrategy.DetermineFaultAction(exception))
+                    {
+                        case FaultAction.Relookup:
+                        case FaultAction.Retry:
+                            await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+                            continue;
+                    }
+
+                    throw;
+                }
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs b/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
new file mode 100644
index 0000000..7560c72
--- /dev/null
+++ b/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
@@ -0,0 +1,1953 @@
+namespace DotPulsar.Internal.PulsarApi
+{
+    // This file was generated by a tool; you should avoid making direct changes.
+    // Consider using 'partial classes' to extend these types
+    // Input: PulsarApi.proto (Last edit: ba24d73)
+
+#pragma warning disable CS1591, CS0612, CS3021, IDE1006
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class Schema : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"name", IsRequired = true)]
+        public string Name { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"schema_data", IsRequired = true)]
+        public byte[] SchemaData { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, IsRequired = true)]
+        public SchemaType Type { get; set; }
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"properties")]
+        public global::System.Collections.Generic.List<KeyValue> Properties { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum SchemaType
+        {
+            None = 0,
+            String = 1,
+            Json = 2,
+            Protobuf = 3,
+            Avro = 4,
+            Bool = 5,
+            Int8 = 6,
+            Int16 = 7,
+            Int32 = 8,
+            Int64 = 9,
+            Float = 10,
+            Double = 11,
+            Date = 12,
+            Time = 13,
+            Timestamp = 14,
+            KeyValue = 15,
+        }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class MessageIdData : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, IsRequired = true)]
+        public ulong LedgerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, IsRequired = true)]
+        public ulong EntryId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"partition")]
+        [global::System.ComponentModel.DefaultValue(-1)]
+        public int Partition
+        {
+            get { return __pbn__Partition ?? -1; }
+            set { __pbn__Partition = value; }
+        }
+        public bool ShouldSerializePartition() => __pbn__Partition != null;
+        public void ResetPartition() => __pbn__Partition = null;
+        private int? __pbn__Partition;
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"batch_index")]
+        [global::System.ComponentModel.DefaultValue(-1)]
+        public int BatchIndex
+        {
+            get { return __pbn__BatchIndex ?? -1; }
+            set { __pbn__BatchIndex = value; }
+        }
+        public bool ShouldSerializeBatchIndex() => __pbn__BatchIndex != null;
+        public void ResetBatchIndex() => __pbn__BatchIndex = null;
+        private int? __pbn__BatchIndex;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class KeyValue : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"key", IsRequired = true)]
+        public string Key { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"value", IsRequired = true)]
+        public string Value { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class KeyLongValue : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"key", IsRequired = true)]
+        public string Key { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"value", IsRequired = true)]
+        public ulong Value { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class EncryptionKeys : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"key", IsRequired = true)]
+        public string Key { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"value", IsRequired = true)]
+        public byte[] Value { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"metadata")]
+        public global::System.Collections.Generic.List<KeyValue> Metadatas { get; } = new global::System.Collections.Generic.List<KeyValue>();
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class MessageMetadata : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"producer_name", IsRequired = true)]
+        public string ProducerName { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"sequence_id", IsRequired = true)]
+        public ulong SequenceId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"publish_time", IsRequired = true)]
+        public ulong PublishTime { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"properties")]
+        public global::System.Collections.Generic.List<KeyValue> Properties { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"replicated_from")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ReplicatedFrom
+        {
+            get { return __pbn__ReplicatedFrom ?? ""; }
+            set { __pbn__ReplicatedFrom = value; }
+        }
+        public bool ShouldSerializeReplicatedFrom() => __pbn__ReplicatedFrom != null;
+        public void ResetReplicatedFrom() => __pbn__ReplicatedFrom = null;
+        private string __pbn__ReplicatedFrom;
+
+        [global::ProtoBuf.ProtoMember(6, Name = @"partition_key")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string PartitionKey
+        {
+            get { return __pbn__PartitionKey ?? ""; }
+            set { __pbn__PartitionKey = value; }
+        }
+        public bool ShouldSerializePartitionKey() => __pbn__PartitionKey != null;
+        public void ResetPartitionKey() => __pbn__PartitionKey = null;
+        private string __pbn__PartitionKey;
+
+        [global::ProtoBuf.ProtoMember(7, Name = @"replicate_to")]
+        public global::System.Collections.Generic.List<string> ReplicateToes { get; } = new global::System.Collections.Generic.List<string>();
+
+        [global::ProtoBuf.ProtoMember(8, Name = @"compression")]
+        [global::System.ComponentModel.DefaultValue(CompressionType.None)]
+        public CompressionType Compression
+        {
+            get { return __pbn__Compression ?? CompressionType.None; }
+            set { __pbn__Compression = value; }
+        }
+        public bool ShouldSerializeCompression() => __pbn__Compression != null;
+        public void ResetCompression() => __pbn__Compression = null;
+        private CompressionType? __pbn__Compression;
+
+        [global::ProtoBuf.ProtoMember(9, Name = @"uncompressed_size")]
+        [global::System.ComponentModel.DefaultValue(0)]
+        public uint UncompressedSize
+        {
+            get { return __pbn__UncompressedSize ?? 0; }
+            set { __pbn__UncompressedSize = value; }
+        }
+        public bool ShouldSerializeUncompressedSize() => __pbn__UncompressedSize != null;
+        public void ResetUncompressedSize() => __pbn__UncompressedSize = null;
+        private uint? __pbn__UncompressedSize;
+
+        [global::ProtoBuf.ProtoMember(11, Name = @"num_messages_in_batch")]
+        [global::System.ComponentModel.DefaultValue(1)]
+        public int NumMessagesInBatch
+        {
+            get { return __pbn__NumMessagesInBatch ?? 1; }
+            set { __pbn__NumMessagesInBatch = value; }
+        }
+        public bool ShouldSerializeNumMessagesInBatch() => __pbn__NumMessagesInBatch != null;
+        public void ResetNumMessagesInBatch() => __pbn__NumMessagesInBatch = null;
+        private int? __pbn__NumMessagesInBatch;
+
+        [global::ProtoBuf.ProtoMember(12, Name = @"event_time")]
+        [global::System.ComponentModel.DefaultValue(0)]
+        public ulong EventTime
+        {
+            get { return __pbn__EventTime ?? 0; }
+            set { __pbn__EventTime = value; }
+        }
+        public bool ShouldSerializeEventTime() => __pbn__EventTime != null;
+        public void ResetEventTime() => __pbn__EventTime = null;
+        private ulong? __pbn__EventTime;
+
+        [global::ProtoBuf.ProtoMember(13, Name = @"encryption_keys")]
+        public global::System.Collections.Generic.List<EncryptionKeys> EncryptionKeys { get; } = new global::System.Collections.Generic.List<EncryptionKeys>();
+
+        [global::ProtoBuf.ProtoMember(14, Name = @"encryption_algo")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string EncryptionAlgo
+        {
+            get { return __pbn__EncryptionAlgo ?? ""; }
+            set { __pbn__EncryptionAlgo = value; }
+        }
+        public bool ShouldSerializeEncryptionAlgo() => __pbn__EncryptionAlgo != null;
+        public void ResetEncryptionAlgo() => __pbn__EncryptionAlgo = null;
+        private string __pbn__EncryptionAlgo;
+
+        [global::ProtoBuf.ProtoMember(15, Name = @"encryption_param")]
+        public byte[] EncryptionParam
+        {
+            get { return __pbn__EncryptionParam; }
+            set { __pbn__EncryptionParam = value; }
+        }
+        public bool ShouldSerializeEncryptionParam() => __pbn__EncryptionParam != null;
+        public void ResetEncryptionParam() => __pbn__EncryptionParam = null;
+        private byte[] __pbn__EncryptionParam;
+
+        [global::ProtoBuf.ProtoMember(16, Name = @"schema_version")]
+        public byte[] SchemaVersion
+        {
+            get { return __pbn__SchemaVersion; }
+            set { __pbn__SchemaVersion = value; }
+        }
+        public bool ShouldSerializeSchemaVersion() => __pbn__SchemaVersion != null;
+        public void ResetSchemaVersion() => __pbn__SchemaVersion = null;
+        private byte[] __pbn__SchemaVersion;
+
+        [global::ProtoBuf.ProtoMember(17, Name = @"partition_key_b64_encoded")]
+        [global::System.ComponentModel.DefaultValue(false)]
+        public bool PartitionKeyB64Encoded
+        {
+            get { return __pbn__PartitionKeyB64Encoded ?? false; }
+            set { __pbn__PartitionKeyB64Encoded = value; }
+        }
+        public bool ShouldSerializePartitionKeyB64Encoded() => __pbn__PartitionKeyB64Encoded != null;
+        public void ResetPartitionKeyB64Encoded() => __pbn__PartitionKeyB64Encoded = null;
+        private bool? __pbn__PartitionKeyB64Encoded;
+
+        [global::ProtoBuf.ProtoMember(18, Name = @"ordering_key")]
+        public byte[] OrderingKey
+        {
+            get { return __pbn__OrderingKey; }
+            set { __pbn__OrderingKey = value; }
+        }
+        public bool ShouldSerializeOrderingKey() => __pbn__OrderingKey != null;
+        public void ResetOrderingKey() => __pbn__OrderingKey = null;
+        private byte[] __pbn__OrderingKey;
+
+        [global::ProtoBuf.ProtoMember(19, Name = @"deliver_at_time")]
+        public long DeliverAtTime
+        {
+            get { return __pbn__DeliverAtTime.GetValueOrDefault(); }
+            set { __pbn__DeliverAtTime = value; }
+        }
+        public bool ShouldSerializeDeliverAtTime() => __pbn__DeliverAtTime != null;
+        public void ResetDeliverAtTime() => __pbn__DeliverAtTime = null;
+        private long? __pbn__DeliverAtTime;
+
+        [global::ProtoBuf.ProtoMember(20, Name = @"marker_type")]
+        public int MarkerType
+        {
+            get { return __pbn__MarkerType.GetValueOrDefault(); }
+            set { __pbn__MarkerType = value; }
+        }
+        public bool ShouldSerializeMarkerType() => __pbn__MarkerType != null;
+        public void ResetMarkerType() => __pbn__MarkerType = null;
+        private int? __pbn__MarkerType;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class SingleMessageMetadata : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"properties")]
+        public global::System.Collections.Generic.List<KeyValue> Properties { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"partition_key")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string PartitionKey
+        {
+            get { return __pbn__PartitionKey ?? ""; }
+            set { __pbn__PartitionKey = value; }
+        }
+        public bool ShouldSerializePartitionKey() => __pbn__PartitionKey != null;
+        public void ResetPartitionKey() => __pbn__PartitionKey = null;
+        private string __pbn__PartitionKey;
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"payload_size", IsRequired = true)]
+        public int PayloadSize { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"compacted_out")]
+        [global::System.ComponentModel.DefaultValue(false)]
+        public bool CompactedOut
+        {
+            get { return __pbn__CompactedOut ?? false; }
+            set { __pbn__CompactedOut = value; }
+        }
+        public bool ShouldSerializeCompactedOut() => __pbn__CompactedOut != null;
+        public void ResetCompactedOut() => __pbn__CompactedOut = null;
+        private bool? __pbn__CompactedOut;
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"event_time")]
+        [global::System.ComponentModel.DefaultValue(0)]
+        public ulong EventTime
+        {
+            get { return __pbn__EventTime ?? 0; }
+            set { __pbn__EventTime = value; }
+        }
+        public bool ShouldSerializeEventTime() => __pbn__EventTime != null;
+        public void ResetEventTime() => __pbn__EventTime = null;
+        private ulong? __pbn__EventTime;
+
+        [global::ProtoBuf.ProtoMember(6, Name = @"partition_key_b64_encoded")]
+        [global::System.ComponentModel.DefaultValue(false)]
+        public bool PartitionKeyB64Encoded
+        {
+            get { return __pbn__PartitionKeyB64Encoded ?? false; }
+            set { __pbn__PartitionKeyB64Encoded = value; }
+        }
+        public bool ShouldSerializePartitionKeyB64Encoded() => __pbn__PartitionKeyB64Encoded != null;
+        public void ResetPartitionKeyB64Encoded() => __pbn__PartitionKeyB64Encoded = null;
+        private bool? __pbn__PartitionKeyB64Encoded;
+
+        [global::ProtoBuf.ProtoMember(7, Name = @"ordering_key")]
+        public byte[] OrderingKey
+        {
+            get { return __pbn__OrderingKey; }
+            set { __pbn__OrderingKey = value; }
+        }
+        public bool ShouldSerializeOrderingKey() => __pbn__OrderingKey != null;
+        public void ResetOrderingKey() => __pbn__OrderingKey = null;
+        private byte[] __pbn__OrderingKey;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandConnect : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"client_version", IsRequired = true)]
+        public string ClientVersion { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"auth_method")]
+        [global::System.ComponentModel.DefaultValue(AuthMethod.AuthMethodNone)]
+        public AuthMethod AuthMethod
+        {
+            get { return __pbn__AuthMethod ?? AuthMethod.AuthMethodNone; }
+            set { __pbn__AuthMethod = value; }
+        }
+        public bool ShouldSerializeAuthMethod() => __pbn__AuthMethod != null;
+        public void ResetAuthMethod() => __pbn__AuthMethod = null;
+        private AuthMethod? __pbn__AuthMethod;
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"auth_method_name")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string AuthMethodName
+        {
+            get { return __pbn__AuthMethodName ?? ""; }
+            set { __pbn__AuthMethodName = value; }
+        }
+        public bool ShouldSerializeAuthMethodName() => __pbn__AuthMethodName != null;
+        public void ResetAuthMethodName() => __pbn__AuthMethodName = null;
+        private string __pbn__AuthMethodName;
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"auth_data")]
+        public byte[] AuthData
+        {
+            get { return __pbn__AuthData; }
+            set { __pbn__AuthData = value; }
+        }
+        public bool ShouldSerializeAuthData() => __pbn__AuthData != null;
+        public void ResetAuthData() => __pbn__AuthData = null;
+        private byte[] __pbn__AuthData;
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"protocol_version")]
+        [global::System.ComponentModel.DefaultValue(0)]
+        public int ProtocolVersion
+        {
+            get { return __pbn__ProtocolVersion ?? 0; }
+            set { __pbn__ProtocolVersion = value; }
+        }
+        public bool ShouldSerializeProtocolVersion() => __pbn__ProtocolVersion != null;
+        public void ResetProtocolVersion() => __pbn__ProtocolVersion = null;
+        private int? __pbn__ProtocolVersion;
+
+        [global::ProtoBuf.ProtoMember(6, Name = @"proxy_to_broker_url")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ProxyToBrokerUrl
+        {
+            get { return __pbn__ProxyToBrokerUrl ?? ""; }
+            set { __pbn__ProxyToBrokerUrl = value; }
+        }
+        public bool ShouldSerializeProxyToBrokerUrl() => __pbn__ProxyToBrokerUrl != null;
+        public void ResetProxyToBrokerUrl() => __pbn__ProxyToBrokerUrl = null;
+        private string __pbn__ProxyToBrokerUrl;
+
+        [global::ProtoBuf.ProtoMember(7, Name = @"original_principal")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalPrincipal
+        {
+            get { return __pbn__OriginalPrincipal ?? ""; }
+            set { __pbn__OriginalPrincipal = value; }
+        }
+        public bool ShouldSerializeOriginalPrincipal() => __pbn__OriginalPrincipal != null;
+        public void ResetOriginalPrincipal() => __pbn__OriginalPrincipal = null;
+        private string __pbn__OriginalPrincipal;
+
+        [global::ProtoBuf.ProtoMember(8, Name = @"original_auth_data")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalAuthData
+        {
+            get { return __pbn__OriginalAuthData ?? ""; }
+            set { __pbn__OriginalAuthData = value; }
+        }
+        public bool ShouldSerializeOriginalAuthData() => __pbn__OriginalAuthData != null;
+        public void ResetOriginalAuthData() => __pbn__OriginalAuthData = null;
+        private string __pbn__OriginalAuthData;
+
+        [global::ProtoBuf.ProtoMember(9, Name = @"original_auth_method")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalAuthMethod
+        {
+            get { return __pbn__OriginalAuthMethod ?? ""; }
+            set { __pbn__OriginalAuthMethod = value; }
+        }
+        public bool ShouldSerializeOriginalAuthMethod() => __pbn__OriginalAuthMethod != null;
+        public void ResetOriginalAuthMethod() => __pbn__OriginalAuthMethod = null;
+        private string __pbn__OriginalAuthMethod;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandConnected : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"server_version", IsRequired = true)]
+        public string ServerVersion { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"protocol_version")]
+        [global::System.ComponentModel.DefaultValue(0)]
+        public int ProtocolVersion
+        {
+            get { return __pbn__ProtocolVersion ?? 0; }
+            set { __pbn__ProtocolVersion = value; }
+        }
+        public bool ShouldSerializeProtocolVersion() => __pbn__ProtocolVersion != null;
+        public void ResetProtocolVersion() => __pbn__ProtocolVersion = null;
+        private int? __pbn__ProtocolVersion;
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"max_message_size")]
+        public int MaxMessageSize
+        {
+            get { return __pbn__MaxMessageSize.GetValueOrDefault(); }
+            set { __pbn__MaxMessageSize = value; }
+        }
+        public bool ShouldSerializeMaxMessageSize() => __pbn__MaxMessageSize != null;
+        public void ResetMaxMessageSize() => __pbn__MaxMessageSize = null;
+        private int? __pbn__MaxMessageSize;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandAuthResponse : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"client_version")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ClientVersion
+        {
+            get { return __pbn__ClientVersion ?? ""; }
+            set { __pbn__ClientVersion = value; }
+        }
+        public bool ShouldSerializeClientVersion() => __pbn__ClientVersion != null;
+        public void ResetClientVersion() => __pbn__ClientVersion = null;
+        private string __pbn__ClientVersion;
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"response")]
+        public AuthData Response { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"protocol_version")]
+        [global::System.ComponentModel.DefaultValue(0)]
+        public int ProtocolVersion
+        {
+            get { return __pbn__ProtocolVersion ?? 0; }
+            set { __pbn__ProtocolVersion = value; }
+        }
+        public bool ShouldSerializeProtocolVersion() => __pbn__ProtocolVersion != null;
+        public void ResetProtocolVersion() => __pbn__ProtocolVersion = null;
+        private int? __pbn__ProtocolVersion;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandAuthChallenge : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"server_version")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ServerVersion
+        {
+            get { return __pbn__ServerVersion ?? ""; }
+            set { __pbn__ServerVersion = value; }
+        }
+        public bool ShouldSerializeServerVersion() => __pbn__ServerVersion != null;
+        public void ResetServerVersion() => __pbn__ServerVersion = null;
+        private string __pbn__ServerVersion;
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"challenge")]
+        public AuthData Challenge { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"protocol_version")]
+        [global::System.ComponentModel.DefaultValue(0)]
+        public int ProtocolVersion
+        {
+            get { return __pbn__ProtocolVersion ?? 0; }
+            set { __pbn__ProtocolVersion = value; }
+        }
+        public bool ShouldSerializeProtocolVersion() => __pbn__ProtocolVersion != null;
+        public void ResetProtocolVersion() => __pbn__ProtocolVersion = null;
+        private int? __pbn__ProtocolVersion;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class AuthData : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"auth_method_name")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string AuthMethodName
+        {
+            get { return __pbn__AuthMethodName ?? ""; }
+            set { __pbn__AuthMethodName = value; }
+        }
+        public bool ShouldSerializeAuthMethodName() => __pbn__AuthMethodName != null;
+        public void ResetAuthMethodName() => __pbn__AuthMethodName = null;
+        private string __pbn__AuthMethodName;
+
+        [global::ProtoBuf.ProtoMember(2)]
+        public byte[] auth_data
+        {
+            get { return __pbn__auth_data; }
+            set { __pbn__auth_data = value; }
+        }
+        public bool ShouldSerializeauth_data() => __pbn__auth_data != null;
+        public void Resetauth_data() => __pbn__auth_data = null;
+        private byte[] __pbn__auth_data;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandSubscribe : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"topic", IsRequired = true)]
+        public string Topic { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"subscription", IsRequired = true)]
+        public string Subscription { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, IsRequired = true)]
+        public SubType Type { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(6, Name = @"consumer_name")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ConsumerName
+        {
+            get { return __pbn__ConsumerName ?? ""; }
+            set { __pbn__ConsumerName = value; }
+        }
+        public bool ShouldSerializeConsumerName() => __pbn__ConsumerName != null;
+        public void ResetConsumerName() => __pbn__ConsumerName = null;
+        private string __pbn__ConsumerName;
+
+        [global::ProtoBuf.ProtoMember(7, Name = @"priority_level")]
+        public int PriorityLevel
+        {
+            get { return __pbn__PriorityLevel.GetValueOrDefault(); }
+            set { __pbn__PriorityLevel = value; }
+        }
+        public bool ShouldSerializePriorityLevel() => __pbn__PriorityLevel != null;
+        public void ResetPriorityLevel() => __pbn__PriorityLevel = null;
+        private int? __pbn__PriorityLevel;
+
+        [global::ProtoBuf.ProtoMember(8, Name = @"durable")]
+        [global::System.ComponentModel.DefaultValue(true)]
+        public bool Durable
+        {
+            get { return __pbn__Durable ?? true; }
+            set { __pbn__Durable = value; }
+        }
+        public bool ShouldSerializeDurable() => __pbn__Durable != null;
+        public void ResetDurable() => __pbn__Durable = null;
+        private bool? __pbn__Durable;
+
+        [global::ProtoBuf.ProtoMember(9, Name = @"start_message_id")]
+        public MessageIdData StartMessageId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(10, Name = @"metadata")]
+        public global::System.Collections.Generic.List<KeyValue> Metadatas { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+        [global::ProtoBuf.ProtoMember(11, Name = @"read_compacted")]
+        public bool ReadCompacted
+        {
+            get { return __pbn__ReadCompacted.GetValueOrDefault(); }
+            set { __pbn__ReadCompacted = value; }
+        }
+        public bool ShouldSerializeReadCompacted() => __pbn__ReadCompacted != null;
+        public void ResetReadCompacted() => __pbn__ReadCompacted = null;
+        private bool? __pbn__ReadCompacted;
+
+        [global::ProtoBuf.ProtoMember(12, Name = @"schema")]
+        public Schema Schema { get; set; }
+
+        [global::ProtoBuf.ProtoMember(13)]
+        [global::System.ComponentModel.DefaultValue(InitialPosition.Latest)]
+        public InitialPosition initialPosition
+        {
+            get { return __pbn__initialPosition ?? InitialPosition.Latest; }
+            set { __pbn__initialPosition = value; }
+        }
+        public bool ShouldSerializeinitialPosition() => __pbn__initialPosition != null;
+        public void ResetinitialPosition() => __pbn__initialPosition = null;
+        private InitialPosition? __pbn__initialPosition;
+
+        [global::ProtoBuf.ProtoMember(14, Name = @"replicate_subscription_state")]
+        public bool ReplicateSubscriptionState
+        {
+            get { return __pbn__ReplicateSubscriptionState.GetValueOrDefault(); }
+            set { __pbn__ReplicateSubscriptionState = value; }
+        }
+        public bool ShouldSerializeReplicateSubscriptionState() => __pbn__ReplicateSubscriptionState != null;
+        public void ResetReplicateSubscriptionState() => __pbn__ReplicateSubscriptionState = null;
+        private bool? __pbn__ReplicateSubscriptionState;
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum SubType
+        {
+            Exclusive = 0,
+            Shared = 1,
+            Failover = 2,
+            [global::ProtoBuf.ProtoEnum(Name = @"Key_Shared")]
+            KeyShared = 3,
+        }
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum InitialPosition
+        {
+            Latest = 0,
+            Earliest = 1,
+        }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandPartitionedTopicMetadata : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"topic", IsRequired = true)]
+        public string Topic { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"original_principal")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalPrincipal
+        {
+            get { return __pbn__OriginalPrincipal ?? ""; }
+            set { __pbn__OriginalPrincipal = value; }
+        }
+        public bool ShouldSerializeOriginalPrincipal() => __pbn__OriginalPrincipal != null;
+        public void ResetOriginalPrincipal() => __pbn__OriginalPrincipal = null;
+        private string __pbn__OriginalPrincipal;
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"original_auth_data")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalAuthData
+        {
+            get { return __pbn__OriginalAuthData ?? ""; }
+            set { __pbn__OriginalAuthData = value; }
+        }
+        public bool ShouldSerializeOriginalAuthData() => __pbn__OriginalAuthData != null;
+        public void ResetOriginalAuthData() => __pbn__OriginalAuthData = null;
+        private string __pbn__OriginalAuthData;
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"original_auth_method")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalAuthMethod
+        {
+            get { return __pbn__OriginalAuthMethod ?? ""; }
+            set { __pbn__OriginalAuthMethod = value; }
+        }
+        public bool ShouldSerializeOriginalAuthMethod() => __pbn__OriginalAuthMethod != null;
+        public void ResetOriginalAuthMethod() => __pbn__OriginalAuthMethod = null;
+        private string __pbn__OriginalAuthMethod;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandPartitionedTopicMetadataResponse : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"partitions")]
+        public uint Partitions
+        {
+            get { return __pbn__Partitions.GetValueOrDefault(); }
+            set { __pbn__Partitions = value; }
+        }
+        public bool ShouldSerializePartitions() => __pbn__Partitions != null;
+        public void ResetPartitions() => __pbn__Partitions = null;
+        private uint? __pbn__Partitions;
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"response")]
+        [global::System.ComponentModel.DefaultValue(LookupType.Success)]
+        public LookupType Response
+        {
+            get { return __pbn__Response ?? LookupType.Success; }
+            set { __pbn__Response = value; }
+        }
+        public bool ShouldSerializeResponse() => __pbn__Response != null;
+        public void ResetResponse() => __pbn__Response = null;
+        private LookupType? __pbn__Response;
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"error")]
+        [global::System.ComponentModel.DefaultValue(ServerError.UnknownError)]
+        public ServerError Error
+        {
+            get { return __pbn__Error ?? ServerError.UnknownError; }
+            set { __pbn__Error = value; }
+        }
+        public bool ShouldSerializeError() => __pbn__Error != null;
+        public void ResetError() => __pbn__Error = null;
+        private ServerError? __pbn__Error;
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"message")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string Message
+        {
+            get { return __pbn__Message ?? ""; }
+            set { __pbn__Message = value; }
+        }
+        public bool ShouldSerializeMessage() => __pbn__Message != null;
+        public void ResetMessage() => __pbn__Message = null;
+        private string __pbn__Message;
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum LookupType
+        {
+            Success = 0,
+            Failed = 1,
+        }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandLookupTopic : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"topic", IsRequired = true)]
+        public string Topic { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"authoritative")]
+        [global::System.ComponentModel.DefaultValue(false)]
+        public bool Authoritative
+        {
+            get { return __pbn__Authoritative ?? false; }
+            set { __pbn__Authoritative = value; }
+        }
+        public bool ShouldSerializeAuthoritative() => __pbn__Authoritative != null;
+        public void ResetAuthoritative() => __pbn__Authoritative = null;
+        private bool? __pbn__Authoritative;
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"original_principal")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalPrincipal
+        {
+            get { return __pbn__OriginalPrincipal ?? ""; }
+            set { __pbn__OriginalPrincipal = value; }
+        }
+        public bool ShouldSerializeOriginalPrincipal() => __pbn__OriginalPrincipal != null;
+        public void ResetOriginalPrincipal() => __pbn__OriginalPrincipal = null;
+        private string __pbn__OriginalPrincipal;
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"original_auth_data")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalAuthData
+        {
+            get { return __pbn__OriginalAuthData ?? ""; }
+            set { __pbn__OriginalAuthData = value; }
+        }
+        public bool ShouldSerializeOriginalAuthData() => __pbn__OriginalAuthData != null;
+        public void ResetOriginalAuthData() => __pbn__OriginalAuthData = null;
+        private string __pbn__OriginalAuthData;
+
+        [global::ProtoBuf.ProtoMember(6, Name = @"original_auth_method")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string OriginalAuthMethod
+        {
+            get { return __pbn__OriginalAuthMethod ?? ""; }
+            set { __pbn__OriginalAuthMethod = value; }
+        }
+        public bool ShouldSerializeOriginalAuthMethod() => __pbn__OriginalAuthMethod != null;
+        public void ResetOriginalAuthMethod() => __pbn__OriginalAuthMethod = null;
+        private string __pbn__OriginalAuthMethod;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandLookupTopicResponse : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1)]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string BrokerServiceUrl
+        {
+            get { return __pbn__brokerServiceUrl ?? ""; }
+            set { __pbn__brokerServiceUrl = value; }
+        }
+        public bool ShouldSerializebrokerServiceUrl() => __pbn__brokerServiceUrl != null;
+        public void ResetbrokerServiceUrl() => __pbn__brokerServiceUrl = null;
+        private string __pbn__brokerServiceUrl;
+
+        [global::ProtoBuf.ProtoMember(2)]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string BrokerServiceUrlTls
+        {
+            get { return __pbn__brokerServiceUrlTls ?? ""; }
+            set { __pbn__brokerServiceUrlTls = value; }
+        }
+        public bool ShouldSerializebrokerServiceUrlTls() => __pbn__brokerServiceUrlTls != null;
+        public void ResetbrokerServiceUrlTls() => __pbn__brokerServiceUrlTls = null;
+        private string __pbn__brokerServiceUrlTls;
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"response")]
+        [global::System.ComponentModel.DefaultValue(LookupType.Redirect)]
+        public LookupType Response
+        {
+            get { return __pbn__Response ?? LookupType.Redirect; }
+            set { __pbn__Response = value; }
+        }
+        public bool ShouldSerializeResponse() => __pbn__Response != null;
+        public void ResetResponse() => __pbn__Response = null;
+        private LookupType? __pbn__Response;
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"authoritative")]
+        [global::System.ComponentModel.DefaultValue(false)]
+        public bool Authoritative
+        {
+            get { return __pbn__Authoritative ?? false; }
+            set { __pbn__Authoritative = value; }
+        }
+        public bool ShouldSerializeAuthoritative() => __pbn__Authoritative != null;
+        public void ResetAuthoritative() => __pbn__Authoritative = null;
+        private bool? __pbn__Authoritative;
+
+        [global::ProtoBuf.ProtoMember(6, Name = @"error")]
+        [global::System.ComponentModel.DefaultValue(ServerError.UnknownError)]
+        public ServerError Error
+        {
+            get { return __pbn__Error ?? ServerError.UnknownError; }
+            set { __pbn__Error = value; }
+        }
+        public bool ShouldSerializeError() => __pbn__Error != null;
+        public void ResetError() => __pbn__Error = null;
+        private ServerError? __pbn__Error;
+
+        [global::ProtoBuf.ProtoMember(7, Name = @"message")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string Message
+        {
+            get { return __pbn__Message ?? ""; }
+            set { __pbn__Message = value; }
+        }
+        public bool ShouldSerializeMessage() => __pbn__Message != null;
+        public void ResetMessage() => __pbn__Message = null;
+        private string __pbn__Message;
+
+        [global::ProtoBuf.ProtoMember(8, Name = @"proxy_through_service_url")]
+        [global::System.ComponentModel.DefaultValue(false)]
+        public bool ProxyThroughServiceUrl
+        {
+            get { return __pbn__ProxyThroughServiceUrl ?? false; }
+            set { __pbn__ProxyThroughServiceUrl = value; }
+        }
+        public bool ShouldSerializeProxyThroughServiceUrl() => __pbn__ProxyThroughServiceUrl != null;
+        public void ResetProxyThroughServiceUrl() => __pbn__ProxyThroughServiceUrl = null;
+        private bool? __pbn__ProxyThroughServiceUrl;
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum LookupType
+        {
+            Redirect = 0,
+            Connect = 1,
+            Failed = 2,
+        }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandProducer : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"topic", IsRequired = true)]
+        public string Topic { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"producer_id", IsRequired = true)]
+        public ulong ProducerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"producer_name")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ProducerName
+        {
+            get { return __pbn__ProducerName ?? ""; }
+            set { __pbn__ProducerName = value; }
+        }
+        public bool ShouldSerializeProducerName() => __pbn__ProducerName != null;
+        public void ResetProducerName() => __pbn__ProducerName = null;
+        private string __pbn__ProducerName;
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"encrypted")]
+        [global::System.ComponentModel.DefaultValue(false)]
+        public bool Encrypted
+        {
+            get { return __pbn__Encrypted ?? false; }
+            set { __pbn__Encrypted = value; }
+        }
+        public bool ShouldSerializeEncrypted() => __pbn__Encrypted != null;
+        public void ResetEncrypted() => __pbn__Encrypted = null;
+        private bool? __pbn__Encrypted;
+
+        [global::ProtoBuf.ProtoMember(6, Name = @"metadata")]
+        public global::System.Collections.Generic.List<KeyValue> Metadatas { get; } = new global::System.Collections.Generic.List<KeyValue>();
+
+        [global::ProtoBuf.ProtoMember(7, Name = @"schema")]
+        public Schema Schema { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandSend : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"producer_id", IsRequired = true)]
+        public ulong ProducerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"sequence_id", IsRequired = true)]
+        public ulong SequenceId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"num_messages")]
+        [global::System.ComponentModel.DefaultValue(1)]
+        public int NumMessages
+        {
+            get { return __pbn__NumMessages ?? 1; }
+            set { __pbn__NumMessages = value; }
+        }
+        public bool ShouldSerializeNumMessages() => __pbn__NumMessages != null;
+        public void ResetNumMessages() => __pbn__NumMessages = null;
+        private int? __pbn__NumMessages;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandSendReceipt : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"producer_id", IsRequired = true)]
+        public ulong ProducerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"sequence_id", IsRequired = true)]
+        public ulong SequenceId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"message_id")]
+        public MessageIdData MessageId { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandSendError : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"producer_id", IsRequired = true)]
+        public ulong ProducerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"sequence_id", IsRequired = true)]
+        public ulong SequenceId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"error", IsRequired = true)]
+        public ServerError Error { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"message", IsRequired = true)]
+        public string Message { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandMessage : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"message_id", IsRequired = true)]
+        public MessageIdData MessageId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"redelivery_count")]
+        [global::System.ComponentModel.DefaultValue(0)]
+        public uint RedeliveryCount
+        {
+            get { return __pbn__RedeliveryCount ?? 0; }
+            set { __pbn__RedeliveryCount = value; }
+        }
+        public bool ShouldSerializeRedeliveryCount() => __pbn__RedeliveryCount != null;
+        public void ResetRedeliveryCount() => __pbn__RedeliveryCount = null;
+        private uint? __pbn__RedeliveryCount;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandAck : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, IsRequired = true)]
+        public AckType Type { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"message_id")]
+        public global::System.Collections.Generic.List<MessageIdData> MessageIds { get; } = new global::System.Collections.Generic.List<MessageIdData>(1);
+
+        [global::ProtoBuf.ProtoMember(4)]
+        [global::System.ComponentModel.DefaultValue(ValidationError.UncompressedSizeCorruption)]
+        public ValidationError validation_error
+        {
+            get { return __pbn__validation_error ?? ValidationError.UncompressedSizeCorruption; }
+            set { __pbn__validation_error = value; }
+        }
+        public bool ShouldSerializevalidation_error() => __pbn__validation_error != null;
+        public void Resetvalidation_error() => __pbn__validation_error = null;
+        private ValidationError? __pbn__validation_error;
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"properties")]
+        public global::System.Collections.Generic.List<KeyLongValue> Properties { get; } = new global::System.Collections.Generic.List<KeyLongValue>();
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum AckType
+        {
+            Individual = 0,
+            Cumulative = 1,
+        }
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum ValidationError
+        {
+            UncompressedSizeCorruption = 0,
+            DecompressionError = 1,
+            ChecksumMismatch = 2,
+            BatchDeSerializeError = 3,
+            DecryptionError = 4,
+        }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandActiveConsumerChange : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"is_active")]
+        [global::System.ComponentModel.DefaultValue(false)]
+        public bool IsActive
+        {
+            get { return __pbn__IsActive ?? false; }
+            set { __pbn__IsActive = value; }
+        }
+        public bool ShouldSerializeIsActive() => __pbn__IsActive != null;
+        public void ResetIsActive() => __pbn__IsActive = null;
+        private bool? __pbn__IsActive;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandFlow : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, IsRequired = true)]
+        public uint MessagePermits { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandUnsubscribe : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandSeek : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"message_id")]
+        public MessageIdData MessageId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"message_publish_time")]
+        public ulong MessagePublishTime
+        {
+            get { return __pbn__MessagePublishTime.GetValueOrDefault(); }
+            set { __pbn__MessagePublishTime = value; }
+        }
+        public bool ShouldSerializeMessagePublishTime() => __pbn__MessagePublishTime != null;
+        public void ResetMessagePublishTime() => __pbn__MessagePublishTime = null;
+        private ulong? __pbn__MessagePublishTime;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandReachedEndOfTopic : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandCloseProducer : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"producer_id", IsRequired = true)]
+        public ulong ProducerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandCloseConsumer : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandRedeliverUnacknowledgedMessages : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"message_ids")]
+        public global::System.Collections.Generic.List<MessageIdData> MessageIds { get; } = new global::System.Collections.Generic.List<MessageIdData>();
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandSuccess : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"schema")]
+        public Schema Schema { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandProducerSuccess : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"producer_name", IsRequired = true)]
+        public string ProducerName { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"last_sequence_id")]
+        [global::System.ComponentModel.DefaultValue(-1)]
+        public long LastSequenceId
+        {
+            get { return __pbn__LastSequenceId ?? -1; }
+            set { __pbn__LastSequenceId = value; }
+        }
+        public bool ShouldSerializeLastSequenceId() => __pbn__LastSequenceId != null;
+        public void ResetLastSequenceId() => __pbn__LastSequenceId = null;
+        private long? __pbn__LastSequenceId;
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"schema_version")]
+        public byte[] SchemaVersion
+        {
+            get { return __pbn__SchemaVersion; }
+            set { __pbn__SchemaVersion = value; }
+        }
+        public bool ShouldSerializeSchemaVersion() => __pbn__SchemaVersion != null;
+        public void ResetSchemaVersion() => __pbn__SchemaVersion = null;
+        private byte[] __pbn__SchemaVersion;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandError : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"error", IsRequired = true)]
+        public ServerError Error { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"message", IsRequired = true)]
+        public string Message { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandPing : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandPong : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandConsumerStats : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandConsumerStatsResponse : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"error_code")]
+        [global::System.ComponentModel.DefaultValue(ServerError.UnknownError)]
+        public ServerError ErrorCode
+        {
+            get { return __pbn__ErrorCode ?? ServerError.UnknownError; }
+            set { __pbn__ErrorCode = value; }
+        }
+        public bool ShouldSerializeErrorCode() => __pbn__ErrorCode != null;
+        public void ResetErrorCode() => __pbn__ErrorCode = null;
+        private ServerError? __pbn__ErrorCode;
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"error_message")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ErrorMessage
+        {
+            get { return __pbn__ErrorMessage ?? ""; }
+            set { __pbn__ErrorMessage = value; }
+        }
+        public bool ShouldSerializeErrorMessage() => __pbn__ErrorMessage != null;
+        public void ResetErrorMessage() => __pbn__ErrorMessage = null;
+        private string __pbn__ErrorMessage;
+
+        [global::ProtoBuf.ProtoMember(4)]
+        public double MsgRateOut
+        {
+            get { return __pbn__msgRateOut.GetValueOrDefault(); }
+            set { __pbn__msgRateOut = value; }
+        }
+        public bool ShouldSerializemsgRateOut() => __pbn__msgRateOut != null;
+        public void ResetmsgRateOut() => __pbn__msgRateOut = null;
+        private double? __pbn__msgRateOut;
+
+        [global::ProtoBuf.ProtoMember(5)]
+        public double MsgThroughputOut
+        {
+            get { return __pbn__msgThroughputOut.GetValueOrDefault(); }
+            set { __pbn__msgThroughputOut = value; }
+        }
+        public bool ShouldSerializemsgThroughputOut() => __pbn__msgThroughputOut != null;
+        public void ResetmsgThroughputOut() => __pbn__msgThroughputOut = null;
+        private double? __pbn__msgThroughputOut;
+
+        [global::ProtoBuf.ProtoMember(6)]
+        public double MsgRateRedeliver
+        {
+            get { return __pbn__msgRateRedeliver.GetValueOrDefault(); }
+            set { __pbn__msgRateRedeliver = value; }
+        }
+        public bool ShouldSerializemsgRateRedeliver() => __pbn__msgRateRedeliver != null;
+        public void ResetmsgRateRedeliver() => __pbn__msgRateRedeliver = null;
+        private double? __pbn__msgRateRedeliver;
+
+        [global::ProtoBuf.ProtoMember(7)]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ConsumerName
+        {
+            get { return __pbn__consumerName ?? ""; }
+            set { __pbn__consumerName = value; }
+        }
+        public bool ShouldSerializeconsumerName() => __pbn__consumerName != null;
+        public void ResetconsumerName() => __pbn__consumerName = null;
+        private string __pbn__consumerName;
+
+        [global::ProtoBuf.ProtoMember(8)]
+        public ulong AvailablePermits
+        {
+            get { return __pbn__availablePermits.GetValueOrDefault(); }
+            set { __pbn__availablePermits = value; }
+        }
+        public bool ShouldSerializeavailablePermits() => __pbn__availablePermits != null;
+        public void ResetavailablePermits() => __pbn__availablePermits = null;
+        private ulong? __pbn__availablePermits;
+
+        [global::ProtoBuf.ProtoMember(9)]
+        public ulong UnackedMessages
+        {
+            get { return __pbn__unackedMessages.GetValueOrDefault(); }
+            set { __pbn__unackedMessages = value; }
+        }
+        public bool ShouldSerializeunackedMessages() => __pbn__unackedMessages != null;
+        public void ResetunackedMessages() => __pbn__unackedMessages = null;
+        private ulong? __pbn__unackedMessages;
+
+        [global::ProtoBuf.ProtoMember(10)]
+        public bool BlockedConsumerOnUnackedMsgs
+        {
+            get { return __pbn__blockedConsumerOnUnackedMsgs.GetValueOrDefault(); }
+            set { __pbn__blockedConsumerOnUnackedMsgs = value; }
+        }
+        public bool ShouldSerializeblockedConsumerOnUnackedMsgs() => __pbn__blockedConsumerOnUnackedMsgs != null;
+        public void ResetblockedConsumerOnUnackedMsgs() => __pbn__blockedConsumerOnUnackedMsgs = null;
+        private bool? __pbn__blockedConsumerOnUnackedMsgs;
+
+        [global::ProtoBuf.ProtoMember(11, Name = @"address")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string Address
+        {
+            get { return __pbn__Address ?? ""; }
+            set { __pbn__Address = value; }
+        }
+        public bool ShouldSerializeAddress() => __pbn__Address != null;
+        public void ResetAddress() => __pbn__Address = null;
+        private string __pbn__Address;
+
+        [global::ProtoBuf.ProtoMember(12)]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ConnectedSince
+        {
+            get { return __pbn__connectedSince ?? ""; }
+            set { __pbn__connectedSince = value; }
+        }
+        public bool ShouldSerializeconnectedSince() => __pbn__connectedSince != null;
+        public void ResetconnectedSince() => __pbn__connectedSince = null;
+        private string __pbn__connectedSince;
+
+        [global::ProtoBuf.ProtoMember(13, Name = @"type")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string Type
+        {
+            get { return __pbn__Type ?? ""; }
+            set { __pbn__Type = value; }
+        }
+        public bool ShouldSerializeType() => __pbn__Type != null;
+        public void ResetType() => __pbn__Type = null;
+        private string __pbn__Type;
+
+        [global::ProtoBuf.ProtoMember(14)]
+        public double MsgRateExpired
+        {
+            get { return __pbn__msgRateExpired.GetValueOrDefault(); }
+            set { __pbn__msgRateExpired = value; }
+        }
+        public bool ShouldSerializemsgRateExpired() => __pbn__msgRateExpired != null;
+        public void ResetmsgRateExpired() => __pbn__msgRateExpired = null;
+        private double? __pbn__msgRateExpired;
+
+        [global::ProtoBuf.ProtoMember(15)]
+        public ulong MsgBacklog
+        {
+            get { return __pbn__msgBacklog.GetValueOrDefault(); }
+            set { __pbn__msgBacklog = value; }
+        }
+        public bool ShouldSerializemsgBacklog() => __pbn__msgBacklog != null;
+        public void ResetmsgBacklog() => __pbn__msgBacklog = null;
+        private ulong? __pbn__msgBacklog;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandGetLastMessageId : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"consumer_id", IsRequired = true)]
+        public ulong ConsumerId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandGetLastMessageIdResponse : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"last_message_id", IsRequired = true)]
+        public MessageIdData LastMessageId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandGetTopicsOfNamespace : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"namespace", IsRequired = true)]
+        public string Namespace { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3)]
+        [global::System.ComponentModel.DefaultValue(Mode.Persistent)]
+        public Mode mode
+        {
+            get { return __pbn__mode ?? Mode.Persistent; }
+            set { __pbn__mode = value; }
+        }
+        public bool ShouldSerializemode() => __pbn__mode != null;
+        public void Resetmode() => __pbn__mode = null;
+        private Mode? __pbn__mode;
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum Mode
+        {
+            [global::ProtoBuf.ProtoEnum(Name = @"PERSISTENT")]
+            Persistent = 0,
+            [global::ProtoBuf.ProtoEnum(Name = @"NON_PERSISTENT")]
+            NonPersistent = 1,
+            [global::ProtoBuf.ProtoEnum(Name = @"ALL")]
+            All = 2,
+        }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandGetTopicsOfNamespaceResponse : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"topics")]
+        public global::System.Collections.Generic.List<string> Topics { get; } = new global::System.Collections.Generic.List<string>();
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandGetSchema : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"topic", IsRequired = true)]
+        public string Topic { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"schema_version")]
+        public byte[] SchemaVersion
+        {
+            get { return __pbn__SchemaVersion; }
+            set { __pbn__SchemaVersion = value; }
+        }
+        public bool ShouldSerializeSchemaVersion() => __pbn__SchemaVersion != null;
+        public void ResetSchemaVersion() => __pbn__SchemaVersion = null;
+        private byte[] __pbn__SchemaVersion;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class CommandGetSchemaResponse : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, Name = @"request_id", IsRequired = true)]
+        public ulong RequestId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"error_code")]
+        [global::System.ComponentModel.DefaultValue(ServerError.UnknownError)]
+        public ServerError ErrorCode
+        {
+            get { return __pbn__ErrorCode ?? ServerError.UnknownError; }
+            set { __pbn__ErrorCode = value; }
+        }
+        public bool ShouldSerializeErrorCode() => __pbn__ErrorCode != null;
+        public void ResetErrorCode() => __pbn__ErrorCode = null;
+        private ServerError? __pbn__ErrorCode;
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"error_message")]
+        [global::System.ComponentModel.DefaultValue("")]
+        public string ErrorMessage
+        {
+            get { return __pbn__ErrorMessage ?? ""; }
+            set { __pbn__ErrorMessage = value; }
+        }
+        public bool ShouldSerializeErrorMessage() => __pbn__ErrorMessage != null;
+        public void ResetErrorMessage() => __pbn__ErrorMessage = null;
+        private string __pbn__ErrorMessage;
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"schema")]
+        public Schema Schema { get; set; }
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"schema_version")]
+        public byte[] SchemaVersion
+        {
+            get { return __pbn__SchemaVersion; }
+            set { __pbn__SchemaVersion = value; }
+        }
+        public bool ShouldSerializeSchemaVersion() => __pbn__SchemaVersion != null;
+        public void ResetSchemaVersion() => __pbn__SchemaVersion = null;
+        private byte[] __pbn__SchemaVersion;
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public partial class BaseCommand : global::ProtoBuf.IExtensible
+    {
+        private global::ProtoBuf.IExtension __pbn__extensionData;
+        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);
+
+        [global::ProtoBuf.ProtoMember(1, IsRequired = true)]
+        public Type CommandType { get; set; } = Type.Connect;
+
+        [global::ProtoBuf.ProtoMember(2, Name = @"connect")]
+        public CommandConnect Connect { get; set; }
+
+        [global::ProtoBuf.ProtoMember(3, Name = @"connected")]
+        public CommandConnected Connected { get; set; }
+
+        [global::ProtoBuf.ProtoMember(4, Name = @"subscribe")]
+        public CommandSubscribe Subscribe { get; set; }
+
+        [global::ProtoBuf.ProtoMember(5, Name = @"producer")]
+        public CommandProducer Producer { get; set; }
+
+        [global::ProtoBuf.ProtoMember(6, Name = @"send")]
+        public CommandSend Send { get; set; }
+
+        [global::ProtoBuf.ProtoMember(7, Name = @"send_receipt")]
+        public CommandSendReceipt SendReceipt { get; set; }
+
+        [global::ProtoBuf.ProtoMember(8, Name = @"send_error")]
+        public CommandSendError SendError { get; set; }
+
+        [global::ProtoBuf.ProtoMember(9, Name = @"message")]
+        public CommandMessage Message { get; set; }
+
+        [global::ProtoBuf.ProtoMember(10, Name = @"ack")]
+        public CommandAck Ack { get; set; }
+
+        [global::ProtoBuf.ProtoMember(11, Name = @"flow")]
+        public CommandFlow Flow { get; set; }
+
+        [global::ProtoBuf.ProtoMember(12, Name = @"unsubscribe")]
+        public CommandUnsubscribe Unsubscribe { get; set; }
+
+        [global::ProtoBuf.ProtoMember(13, Name = @"success")]
+        public CommandSuccess Success { get; set; }
+
+        [global::ProtoBuf.ProtoMember(14, Name = @"error")]
+        public CommandError Error { get; set; }
+
+        [global::ProtoBuf.ProtoMember(15, Name = @"close_producer")]
+        public CommandCloseProducer CloseProducer { get; set; }
+
+        [global::ProtoBuf.ProtoMember(16, Name = @"close_consumer")]
+        public CommandCloseConsumer CloseConsumer { get; set; }
+
+        [global::ProtoBuf.ProtoMember(17, Name = @"producer_success")]
+        public CommandProducerSuccess ProducerSuccess { get; set; }
+
+        [global::ProtoBuf.ProtoMember(18, Name = @"ping")]
+        public CommandPing Ping { get; set; }
+
+        [global::ProtoBuf.ProtoMember(19, Name = @"pong")]
+        public CommandPong Pong { get; set; }
+
+        [global::ProtoBuf.ProtoMember(20)]
+        public CommandRedeliverUnacknowledgedMessages RedeliverUnacknowledgedMessages { get; set; }
+
+        [global::ProtoBuf.ProtoMember(21)]
+        public CommandPartitionedTopicMetadata PartitionMetadata { get; set; }
+
+        [global::ProtoBuf.ProtoMember(22)]
+        public CommandPartitionedTopicMetadataResponse PartitionMetadataResponse { get; set; }
+
+        [global::ProtoBuf.ProtoMember(23)]
+        public CommandLookupTopic LookupTopic { get; set; }
+
+        [global::ProtoBuf.ProtoMember(24)]
+        public CommandLookupTopicResponse LookupTopicResponse { get; set; }
+
+        [global::ProtoBuf.ProtoMember(25)]
+        public CommandConsumerStats ConsumerStats { get; set; }
+
+        [global::ProtoBuf.ProtoMember(26)]
+        public CommandConsumerStatsResponse ConsumerStatsResponse { get; set; }
+
+        [global::ProtoBuf.ProtoMember(27)]
+        public CommandReachedEndOfTopic ReachedEndOfTopic { get; set; }
+
+        [global::ProtoBuf.ProtoMember(28, Name = @"seek")]
+        public CommandSeek Seek { get; set; }
+
+        [global::ProtoBuf.ProtoMember(29)]
+        public CommandGetLastMessageId GetLastMessageId { get; set; }
+
+        [global::ProtoBuf.ProtoMember(30)]
+        public CommandGetLastMessageIdResponse GetLastMessageIdResponse { get; set; }
+
+        [global::ProtoBuf.ProtoMember(31, Name = @"active_consumer_change")]
+        public CommandActiveConsumerChange ActiveConsumerChange { get; set; }
+
+        [global::ProtoBuf.ProtoMember(32)]
+        public CommandGetTopicsOfNamespace GetTopicsOfNamespace { get; set; }
+
+        [global::ProtoBuf.ProtoMember(33)]
+        public CommandGetTopicsOfNamespaceResponse GetTopicsOfNamespaceResponse { get; set; }
+
+        [global::ProtoBuf.ProtoMember(34)]
+        public CommandGetSchema GetSchema { get; set; }
+
+        [global::ProtoBuf.ProtoMember(35)]
+        public CommandGetSchemaResponse GetSchemaResponse { get; set; }
+
+        [global::ProtoBuf.ProtoMember(36)]
+        public CommandAuthChallenge AuthChallenge { get; set; }
+
+        [global::ProtoBuf.ProtoMember(37)]
+        public CommandAuthResponse AuthResponse { get; set; }
+
+        [global::ProtoBuf.ProtoContract()]
+        public enum Type
+        {
+            [global::ProtoBuf.ProtoEnum(Name = @"CONNECT")]
+            Connect = 2,
+            [global::ProtoBuf.ProtoEnum(Name = @"CONNECTED")]
+            Connected = 3,
+            [global::ProtoBuf.ProtoEnum(Name = @"SUBSCRIBE")]
+            Subscribe = 4,
+            [global::ProtoBuf.ProtoEnum(Name = @"PRODUCER")]
+            Producer = 5,
+            [global::ProtoBuf.ProtoEnum(Name = @"SEND")]
+            Send = 6,
+            [global::ProtoBuf.ProtoEnum(Name = @"SEND_RECEIPT")]
+            SendReceipt = 7,
+            [global::ProtoBuf.ProtoEnum(Name = @"SEND_ERROR")]
+            SendError = 8,
+            [global::ProtoBuf.ProtoEnum(Name = @"MESSAGE")]
+            Message = 9,
+            [global::ProtoBuf.ProtoEnum(Name = @"ACK")]
+            Ack = 10,
+            [global::ProtoBuf.ProtoEnum(Name = @"FLOW")]
+            Flow = 11,
+            [global::ProtoBuf.ProtoEnum(Name = @"UNSUBSCRIBE")]
+            Unsubscribe = 12,
+            [global::ProtoBuf.ProtoEnum(Name = @"SUCCESS")]
+            Success = 13,
+            [global::ProtoBuf.ProtoEnum(Name = @"ERROR")]
+            Error = 14,
+            [global::ProtoBuf.ProtoEnum(Name = @"CLOSE_PRODUCER")]
+            CloseProducer = 15,
+            [global::ProtoBuf.ProtoEnum(Name = @"CLOSE_CONSUMER")]
+            CloseConsumer = 16,
+            [global::ProtoBuf.ProtoEnum(Name = @"PRODUCER_SUCCESS")]
+            ProducerSuccess = 17,
+            [global::ProtoBuf.ProtoEnum(Name = @"PING")]
+            Ping = 18,
+            [global::ProtoBuf.ProtoEnum(Name = @"PONG")]
+            Pong = 19,
+            [global::ProtoBuf.ProtoEnum(Name = @"REDELIVER_UNACKNOWLEDGED_MESSAGES")]
+            RedeliverUnacknowledgedMessages = 20,
+            [global::ProtoBuf.ProtoEnum(Name = @"PARTITIONED_METADATA")]
+            PartitionedMetadata = 21,
+            [global::ProtoBuf.ProtoEnum(Name = @"PARTITIONED_METADATA_RESPONSE")]
+            PartitionedMetadataResponse = 22,
+            [global::ProtoBuf.ProtoEnum(Name = @"LOOKUP")]
+            Lookup = 23,
+            [global::ProtoBuf.ProtoEnum(Name = @"LOOKUP_RESPONSE")]
+            LookupResponse = 24,
+            [global::ProtoBuf.ProtoEnum(Name = @"CONSUMER_STATS")]
+            ConsumerStats = 25,
+            [global::ProtoBuf.ProtoEnum(Name = @"CONSUMER_STATS_RESPONSE")]
+            ConsumerStatsResponse = 26,
+            [global::ProtoBuf.ProtoEnum(Name = @"REACHED_END_OF_TOPIC")]
+            ReachedEndOfTopic = 27,
+            [global::ProtoBuf.ProtoEnum(Name = @"SEEK")]
+            Seek = 28,
+            [global::ProtoBuf.ProtoEnum(Name = @"GET_LAST_MESSAGE_ID")]
+            GetLastMessageId = 29,
+            [global::ProtoBuf.ProtoEnum(Name = @"GET_LAST_MESSAGE_ID_RESPONSE")]
+            GetLastMessageIdResponse = 30,
+            [global::ProtoBuf.ProtoEnum(Name = @"ACTIVE_CONSUMER_CHANGE")]
+            ActiveConsumerChange = 31,
+            [global::ProtoBuf.ProtoEnum(Name = @"GET_TOPICS_OF_NAMESPACE")]
+            GetTopicsOfNamespace = 32,
+            [global::ProtoBuf.ProtoEnum(Name = @"GET_TOPICS_OF_NAMESPACE_RESPONSE")]
+            GetTopicsOfNamespaceResponse = 33,
+            [global::ProtoBuf.ProtoEnum(Name = @"GET_SCHEMA")]
+            GetSchema = 34,
+            [global::ProtoBuf.ProtoEnum(Name = @"GET_SCHEMA_RESPONSE")]
+            GetSchemaResponse = 35,
+            [global::ProtoBuf.ProtoEnum(Name = @"AUTH_CHALLENGE")]
+            AuthChallenge = 36,
+            [global::ProtoBuf.ProtoEnum(Name = @"AUTH_RESPONSE")]
+            AuthResponse = 37,
+        }
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public enum CompressionType
+    {
+        [global::ProtoBuf.ProtoEnum(Name = @"NONE")]
+        None = 0,
+        [global::ProtoBuf.ProtoEnum(Name = @"LZ4")]
+        Lz4 = 1,
+        [global::ProtoBuf.ProtoEnum(Name = @"ZLIB")]
+        Zlib = 2,
+        [global::ProtoBuf.ProtoEnum(Name = @"ZSTD")]
+        Zstd = 3,
+        [global::ProtoBuf.ProtoEnum(Name = @"SNAPPY")]
+        Snappy = 4,
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public enum ServerError
+    {
+        UnknownError = 0,
+        MetadataError = 1,
+        PersistenceError = 2,
+        AuthenticationError = 3,
+        AuthorizationError = 4,
+        ConsumerBusy = 5,
+        ServiceNotReady = 6,
+        ProducerBlockedQuotaExceededError = 7,
+        ProducerBlockedQuotaExceededException = 8,
+        ChecksumError = 9,
+        UnsupportedVersionError = 10,
+        TopicNotFound = 11,
+        SubscriptionNotFound = 12,
+        ConsumerNotFound = 13,
+        TooManyRequests = 14,
+        TopicTerminatedError = 15,
+        ProducerBusy = 16,
+        InvalidTopicName = 17,
+        IncompatibleSchema = 18,
+        ConsumerAssignError = 19,
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public enum AuthMethod
+    {
+        AuthMethodNone = 0,
+        AuthMethodYcaV1 = 1,
+        AuthMethodAthens = 2,
+    }
+
+    [global::ProtoBuf.ProtoContract()]
+    public enum ProtocolVersion
+    {
+        [global::ProtoBuf.ProtoEnum(Name = @"v0")]
+        V0 = 0,
+        [global::ProtoBuf.ProtoEnum(Name = @"v1")]
+        V1 = 1,
+        [global::ProtoBuf.ProtoEnum(Name = @"v2")]
+        V2 = 2,
+        [global::ProtoBuf.ProtoEnum(Name = @"v3")]
+        V3 = 3,
+        [global::ProtoBuf.ProtoEnum(Name = @"v4")]
+        V4 = 4,
+        [global::ProtoBuf.ProtoEnum(Name = @"v5")]
+        V5 = 5,
+        [global::ProtoBuf.ProtoEnum(Name = @"v6")]
+        V6 = 6,
+        [global::ProtoBuf.ProtoEnum(Name = @"v7")]
+        V7 = 7,
+        [global::ProtoBuf.ProtoEnum(Name = @"v8")]
+        V8 = 8,
+        [global::ProtoBuf.ProtoEnum(Name = @"v9")]
+        V9 = 9,
+        [global::ProtoBuf.ProtoEnum(Name = @"v10")]
+        V10 = 10,
+        [global::ProtoBuf.ProtoEnum(Name = @"v11")]
+        V11 = 11,
+        [global::ProtoBuf.ProtoEnum(Name = @"v12")]
+        V12 = 12,
+        [global::ProtoBuf.ProtoEnum(Name = @"v13")]
+        V13 = 13,
+        [global::ProtoBuf.ProtoEnum(Name = @"v14")]
+        V14 = 14,
+    }
+}
+
+#pragma warning restore CS1591, CS0612, CS3021, IDE1006
\ No newline at end of file
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
new file mode 100644
index 0000000..31241e8
--- /dev/null
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -0,0 +1,35 @@
+using DotPulsar.Abstractions;
+using System;
+using System.Reflection;
+
+namespace DotPulsar.Internal
+{
+    public sealed class PulsarClientBuilder : IPulsarClientBuilder
+    {
+        private static readonly int ProtocolVersion;
+        private static readonly string ClientVersion;
+
+        static PulsarClientBuilder()
+        {
+            ProtocolVersion = 12;
+            var assemblyName = Assembly.GetCallingAssembly().GetName();
+            ClientVersion = assemblyName.Name + " " + assemblyName.Version.ToString(3);
+        }
+
+        private Uri _serviceUrl;
+
+        public PulsarClientBuilder() => _serviceUrl = new Uri("pulsar://localhost:6650");
+
+        public IPulsarClientBuilder ServiceUrl(Uri uri)
+        {
+            _serviceUrl = uri;
+            return this;
+        }
+
+        public IPulsarClient Build()
+        {
+            var connectionPool = new ConnectionPool(ProtocolVersion, ClientVersion, _serviceUrl);
+            return new PulsarClient(connectionPool);
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
new file mode 100644
index 0000000..ed18b48
--- /dev/null
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -0,0 +1,118 @@
+using DotPulsar.Internal.Extensions;
+using System;
+using System.Buffers;
+using System.IO;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class PulsarStream : IDisposable
+    {
+        private readonly Stream _stream;
+        private readonly Action<uint, ReadOnlySequence<byte>> _handler;
+        private readonly CancellationTokenSource _tokenSource;
+
+        public PulsarStream(Stream stream, Action<uint, ReadOnlySequence<byte>> handler)
+        {
+            _stream = stream;
+            _handler = handler;
+            _tokenSource = new CancellationTokenSource();
+            var pipe = new Pipe();
+            var fill = FillPipe(_stream, pipe.Writer, _tokenSource.Token);
+            var read = ReadPipe(pipe.Reader, _tokenSource.Token);
+            IsClosed = Task.WhenAny(fill, read);
+        }
+
+        public Task IsClosed { get; }
+
+        public async Task Send(ReadOnlySequence<byte> sequence)
+        {
+            try
+            {
+                foreach (var segment in sequence)
+                {
+                    await _stream.WriteAsync(segment);
+                }
+            }
+            catch
+            {
+                _tokenSource.Cancel();
+                throw;
+            }
+        }
+
+        public void Dispose()
+        {
+            _tokenSource.Cancel();
+            _stream.Dispose();
+        }
+
+        private async Task FillPipe(Stream stream, PipeWriter writer, CancellationToken cancellationToken)
+        {
+            try
+            {
+                while (!cancellationToken.IsCancellationRequested)
+                {
+                    var memory = writer.GetMemory(84999); // LOH - 1 byte
+                    var bytesRead = await stream.ReadAsync(memory, cancellationToken);
+                    if (bytesRead == 0)
+                        break;
+
+                    writer.Advance(bytesRead);
+
+                    var result = await writer.FlushAsync(cancellationToken);
+                    if (result.IsCompleted)
+                        break;
+                }
+            }
+            catch
+            {
+                _tokenSource.Cancel();
+            }
+
+            writer.Complete();
+        }
+
+        private async Task ReadPipe(PipeReader reader, CancellationToken cancellationToken)
+        {
+            try
+            {
+                while (!cancellationToken.IsCancellationRequested)
+                {
+                    var result = await reader.ReadAsync(cancellationToken);
+                    var buffer = result.Buffer;
+
+                    while (!cancellationToken.IsCancellationRequested)
+                    {
+                        if (buffer.Length < 4)
+                            break;
+
+                        var frameSize = buffer.ReadUInt32(0, true);
+                        var totalSize = frameSize + 4;
+                        if (buffer.Length < totalSize)
+                            break;
+
+                        var commandSize = buffer.ReadUInt32(4, true);
+
+                        _handler(commandSize, buffer.Slice(8, totalSize - 8));
+
+                        buffer = buffer.Slice(totalSize);
+                    }
+
+                    if (result.IsCompleted)
+                        break;
+
+                    reader.AdvanceTo(buffer.Start);
+                }
+            }
+            catch
+            {
+                _tokenSource.Cancel();
+            }
+
+            reader.Complete();
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
new file mode 100644
index 0000000..a4427e0
--- /dev/null
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -0,0 +1,105 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class Reader : IReader
+    {
+        private readonly Executor _executor;
+        private readonly IConsumerStreamFactory _streamFactory;
+        private readonly IFaultStrategy _faultStrategy;
+        private readonly StateManager<ReaderState> _stateManager;
+        private readonly CancellationTokenSource _connectTokenSource;
+        private readonly Task _connectTask;
+        private Action _throwIfClosedOrFaulted;
+        private IConsumerStream Stream { get; set; }
+
+        public Reader(IConsumerStreamFactory streamFactory, IFaultStrategy faultStrategy)
+        {
+            _executor = new Executor(ExecutorOnException);
+            _stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
+            _streamFactory = streamFactory;
+            _faultStrategy = faultStrategy;
+            _connectTokenSource = new CancellationTokenSource();
+            Stream = new NotReadyStream();
+            _connectTask = Connect(_connectTokenSource.Token);
+            _throwIfClosedOrFaulted = () => { };
+        }
+
+        public async Task<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+        public async Task<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+        public bool IsFinalState() => _stateManager.IsFinalState();
+        public bool IsFinalState(ReaderState state) => _stateManager.IsFinalState(state);
+
+        public void Dispose()
+        {
+            _executor.Dispose();
+            _connectTokenSource.Cancel();
+            _connectTask.Wait();
+        }
+
+        public async Task<Message> Receive(CancellationToken cancellationToken) => await _executor.Execute(() => Stream.Receive(cancellationToken), cancellationToken);
+
+        private async Task ExecutorOnException(Exception exception, CancellationToken cancellationToken)
+        {
+            _throwIfClosedOrFaulted();
+
+            switch (_faultStrategy.DetermineFaultAction(exception))
+            {
+                case FaultAction.Retry:
+                    await Task.Delay(_faultStrategy.TimeToWait, cancellationToken);
+                    break;
+                case FaultAction.Relookup:
+                    await _stateManager.StateChangedTo(ReaderState.Connected, cancellationToken);
+                    break;
+                case FaultAction.Fault:
+                    HasFaulted(exception);
+                    break;
+            }
+
+            _throwIfClosedOrFaulted();
+        }
+
+        private void HasFaulted(Exception exception)
+        {
+            _throwIfClosedOrFaulted = () => throw exception;
+            _stateManager.SetState(ReaderState.Faulted);
+        }
+
+        private void HasClosed()
+        {
+            _throwIfClosedOrFaulted = () => throw new ReaderClosedException();
+            _stateManager.SetState(ReaderState.Closed);
+        }
+
+        private async Task Connect(CancellationToken cancellationToken)
+        {
+            try
+            {
+                while (true)
+                {
+                    using (var proxy = new ReaderProxy(_stateManager, new AsyncQueue<MessagePackage>()))
+                    using (Stream = await _streamFactory.CreateStream(proxy, cancellationToken))
+                    {
+                        proxy.Active();
+                        await _stateManager.StateChangedFrom(ReaderState.Connected, cancellationToken);
+                        if (_stateManager.IsFinalState())
+                            return;
+                    }
+                }
+            }
+            catch (OperationCanceledException)
+            {
+                HasClosed();
+            }
+            catch (Exception exception)
+            {
+                HasFaulted(exception);
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
new file mode 100644
index 0000000..85de19d
--- /dev/null
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -0,0 +1,42 @@
+using DotPulsar.Abstractions;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ReaderBuilder : IReaderBuilder
+    {
+        private readonly IPulsarClient _pulsarClient;
+        private readonly ReaderOptions _options;
+
+        public ReaderBuilder(IPulsarClient pulsarClient)
+        {
+            _pulsarClient = pulsarClient;
+            _options = new ReaderOptions();
+        }
+
+        public IReaderBuilder ReaderName(string name)
+        {
+            _options.ReaderName = name;
+            return this;
+        }
+
+        public IReaderBuilder MessagePrefetchCount(uint count)
+        {
+            _options.MessagePrefetchCount = count;
+            return this;
+        }
+
+        public IReaderBuilder StartMessageId(MessageId messageId)
+        {
+            _options.StartMessageId = messageId;
+            return this;
+        }
+
+        public IReaderBuilder Topic(string topic)
+        {
+            _options.Topic = topic;
+            return this;
+        }
+
+        public IReader Create() => _pulsarClient.CreateReader(_options);
+    }
+}
diff --git a/src/DotPulsar/Internal/ReaderProxy.cs b/src/DotPulsar/Internal/ReaderProxy.cs
new file mode 100644
index 0000000..eb93582
--- /dev/null
+++ b/src/DotPulsar/Internal/ReaderProxy.cs
@@ -0,0 +1,53 @@
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class ReaderProxy : IConsumerProxy, IDisposable
+    {
+        private readonly object _lock;
+        private readonly StateManager<ReaderState> _stateManager;
+        private readonly AsyncQueue<MessagePackage> _queue;
+        private bool _hasDisconnected;
+
+        public ReaderProxy(StateManager<ReaderState> stateManager, AsyncQueue<MessagePackage> queue)
+        {
+            _lock = new object();
+            _stateManager = stateManager;
+            _queue = queue;
+            _hasDisconnected = false;
+        }
+
+        public void Active() => SetState(ReaderState.Connected);
+        public void Inactive() => SetState(ReaderState.Connected);
+        public void ReachedEndOfTopic() => SetState(ReaderState.ReachedEndOfTopic);
+
+        public void Disconnected()
+        {
+            lock (_lock)
+            {
+                if (_hasDisconnected)
+                    return;
+
+                _stateManager.SetState(ReaderState.Disconnected);
+                _hasDisconnected = true;
+            }
+        }
+
+        public void Enqueue(MessagePackage package) => _queue.Enqueue(package);
+        public async Task<MessagePackage> Dequeue(CancellationToken cancellationToken) => await _queue.Dequeue(cancellationToken);
+
+        private void SetState(ReaderState state)
+        {
+            lock (_lock)
+            {
+                if (!_hasDisconnected)
+                    _stateManager.SetState(state);
+            }
+        }
+
+        public void Dispose() => _queue.Dispose();
+    }
+}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
new file mode 100644
index 0000000..8d11e95
--- /dev/null
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -0,0 +1,93 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class RequestResponseHandler : IDisposable
+    {
+        private readonly Awaitor<string, BaseCommand> _responses;
+        private ulong _requestId;
+
+        public RequestResponseHandler()
+        {
+            _responses = new Awaitor<string, BaseCommand>();
+            _requestId = 1;
+        }
+
+        public void Dispose() => _responses.Dispose();
+
+        public Task<BaseCommand> Outgoing(BaseCommand command)
+        {
+            SetRequestId(command);
+            return _responses.CreateTask(GetResponseIdentifier(command));
+        }
+
+        public void Incoming(BaseCommand command)
+        {
+            var identifier = GetResponseIdentifier(command);
+            if (identifier != null)
+                _responses.SetResult(identifier, command);
+        }
+
+        private void SetRequestId(BaseCommand cmd)
+        {
+            switch (cmd.CommandType)
+            {
+                case BaseCommand.Type.Seek:
+                    cmd.Seek.RequestId = _requestId++;
+                    return;
+                case BaseCommand.Type.Lookup:
+                    cmd.LookupTopic.RequestId = _requestId++;
+                    return;
+                case BaseCommand.Type.Error:
+                    cmd.Error.RequestId = _requestId++;
+                    return;
+                case BaseCommand.Type.Producer:
+                    cmd.Producer.RequestId = _requestId++;
+                    return;
+                case BaseCommand.Type.CloseProducer:
+                    cmd.CloseProducer.RequestId = _requestId++;
+                    return;
+                case BaseCommand.Type.Subscribe:
+                    cmd.Subscribe.RequestId = _requestId++;
+                    return;
+                case BaseCommand.Type.Unsubscribe:
+                    cmd.Unsubscribe.RequestId = _requestId++;
+                    return;
+                case BaseCommand.Type.CloseConsumer:
+                    cmd.CloseConsumer.RequestId = _requestId++;
+                    return;
+                case BaseCommand.Type.GetLastMessageId:
+                    cmd.GetLastMessageId.RequestId = _requestId++;
+                    return;
+            }
+        }
+
+        private static string GetResponseIdentifier(BaseCommand cmd)
+        {
+            switch (cmd.CommandType)
+            {
+                case BaseCommand.Type.Connect:
+                case BaseCommand.Type.Connected: return "Connected";
+                case BaseCommand.Type.Send: return cmd.Send.ProducerId.ToString() + '-' + cmd.Send.SequenceId.ToString();
+                case BaseCommand.Type.SendError: return cmd.SendError.ProducerId.ToString() + '-' + cmd.SendError.SequenceId.ToString();
+                case BaseCommand.Type.SendReceipt: return cmd.SendReceipt.ProducerId.ToString() + '-' + cmd.SendReceipt.SequenceId.ToString();
+                case BaseCommand.Type.Error: return cmd.Error.RequestId.ToString();
+                case BaseCommand.Type.Producer: return cmd.Producer.RequestId.ToString();
+                case BaseCommand.Type.ProducerSuccess: return cmd.ProducerSuccess.RequestId.ToString();
+                case BaseCommand.Type.CloseProducer: return cmd.CloseProducer.RequestId.ToString();
+                case BaseCommand.Type.Lookup: return cmd.LookupTopic.RequestId.ToString();
+                case BaseCommand.Type.LookupResponse: return cmd.LookupTopicResponse.RequestId.ToString();
+                case BaseCommand.Type.Unsubscribe: return cmd.Unsubscribe.RequestId.ToString();
+                case BaseCommand.Type.Subscribe: return cmd.Subscribe.RequestId.ToString();
+                case BaseCommand.Type.Success: return cmd.Success.RequestId.ToString();
+                case BaseCommand.Type.Seek: return cmd.Seek.RequestId.ToString();
+                case BaseCommand.Type.CloseConsumer: return cmd.CloseConsumer.RequestId.ToString();
+                case BaseCommand.Type.GetLastMessageId: return cmd.GetLastMessageId.RequestId.ToString();
+                case BaseCommand.Type.GetLastMessageIdResponse: return cmd.GetLastMessageIdResponse.RequestId.ToString();
+                default: throw new ArgumentOutOfRangeException("CommandType", cmd.CommandType, "CommandType not supported as request/response type");
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/SendPackage.cs b/src/DotPulsar/Internal/SendPackage.cs
new file mode 100644
index 0000000..7d9fbfd
--- /dev/null
+++ b/src/DotPulsar/Internal/SendPackage.cs
@@ -0,0 +1,14 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+
+namespace DotPulsar.Internal
+{
+    public sealed class SendPackage
+    {
+        public SendPackage(CommandSend command) => Command = command;
+
+        public CommandSend Command { get; }
+        public PulsarApi.MessageMetadata Metadata { get; set; }
+        public ReadOnlyMemory<byte> Payload { get; set; }
+    }
+}
diff --git a/src/DotPulsar/Internal/SequenceBuilder.cs b/src/DotPulsar/Internal/SequenceBuilder.cs
new file mode 100644
index 0000000..4b38e90
--- /dev/null
+++ b/src/DotPulsar/Internal/SequenceBuilder.cs
@@ -0,0 +1,63 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace DotPulsar.Internal
+{
+    public sealed class SequenceBuilder<T>
+    {
+        private readonly LinkedList<ReadOnlyMemory<T>> _elements;
+
+        public SequenceBuilder() => _elements = new LinkedList<ReadOnlyMemory<T>>();
+
+        public SequenceBuilder<T> Prepend(ReadOnlyMemory<T> memory)
+        {
+            _elements.AddFirst(memory);
+            return this;
+        }
+
+        public SequenceBuilder<T> Append(ReadOnlyMemory<T> memory)
+        {
+            _elements.AddLast(memory);
+            return this;
+        }
+
+        public long Length => _elements.Sum(e => e.Length);
+
+        public ReadOnlySequence<T> Build()
+        {
+            Segment start = null;
+            Segment current = null;
+
+            foreach (var element in _elements)
+            {
+                if (current is null)
+                {
+                    current = new Segment(element);
+                    start = current;
+                }
+                else
+                    current = current.CreateNext(element);
+            }
+
+            return new ReadOnlySequence<T>(start, 0, current, current.Memory.Length);
+        }
+
+        private sealed class Segment : ReadOnlySequenceSegment<T>
+        {
+            public Segment(ReadOnlyMemory<T> memory, long runningIndex = 0)
+            {
+                Memory = memory;
+                RunningIndex = runningIndex;
+            }
+
+            public Segment CreateNext(ReadOnlyMemory<T> memory)
+            {
+                var segment = new Segment(memory, RunningIndex + Memory.Length);
+                Next = segment;
+                return segment;
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
new file mode 100644
index 0000000..624f37a
--- /dev/null
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -0,0 +1,16 @@
+namespace DotPulsar.Internal
+{
+    public sealed class SequenceId
+    {
+        public SequenceId(ulong initialSequenceId)
+        {
+            Current = initialSequenceId;
+            if (initialSequenceId > 0)
+                Increment();
+        }
+
+        public ulong Current { get; private set; }
+
+        public void Increment() => ++Current;
+    }
+}
diff --git a/src/DotPulsar/Internal/Serializer.cs b/src/DotPulsar/Internal/Serializer.cs
new file mode 100644
index 0000000..3c2d03c
--- /dev/null
+++ b/src/DotPulsar/Internal/Serializer.cs
@@ -0,0 +1,84 @@
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Buffers;
+using System.IO;
+
+namespace DotPulsar.Internal
+{
+    public static class Serializer
+    {
+        private static readonly byte[] MagicNumber = new byte[] { 0x0e, 0x01 };
+
+        public static T Deserialize<T>(ReadOnlySequence<byte> sequence)
+        {
+            using var ms = new MemoryStream(sequence.ToArray()); //TODO Fix this when protobuf-net start supporting sequences or .NET supports creating a stream from a sequence
+            return ProtoBuf.Serializer.Deserialize<T>(ms);
+        }
+
+        public static Message Deserialize(MessagePackage package)
+        {
+            var sequence = package.Data;
+            var magicNumberMatches = sequence.StartsWith(MagicNumber);
+            if (!magicNumberMatches)
+                throw new ChecksumException("Magic number don't match");
+            var expectedChecksum = sequence.ReadUInt32(2, true);
+            var actualChecksum = Crc32C.Calculate(sequence.Slice(6));
+            if (expectedChecksum != actualChecksum)
+                throw new ChecksumException(expectedChecksum, actualChecksum);
+            var metaSize = sequence.ReadUInt32(6, true);
+            var meta = Deserialize<PulsarApi.MessageMetadata>(sequence.Slice(10, metaSize));
+            var data = sequence.Slice(10 + metaSize);
+            return new Message(new MessageId(package.Command.MessageId), meta, data);
+        }
+
+        public static ReadOnlySequence<byte> Serialize(BaseCommand command)
+        {
+            var commandBytes = Serialize<BaseCommand>(command);
+            var commandSizeBytes = ToBigEndianBytes((uint)commandBytes.Length);
+            var totalSizeBytes = ToBigEndianBytes((uint)commandBytes.Length + 4);
+
+            return new SequenceBuilder<byte>()
+                .Append(totalSizeBytes)
+                .Append(commandSizeBytes)
+                .Append(commandBytes)
+                .Build();
+        }
+
+        public static ReadOnlySequence<byte> Serialize(BaseCommand command, PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload)
+        {
+            var commandBytes = Serialize<BaseCommand>(command);
+            var commandSizeBytes = ToBigEndianBytes((uint)commandBytes.Length);
+
+            var metadataBytes = Serialize(metadata);
+            var metadataSizeBytes = ToBigEndianBytes((uint)metadataBytes.Length);
+
+            var sb = new SequenceBuilder<byte>().Append(metadataSizeBytes).Append(metadataBytes).Append(payload);
+            var checksum = Crc32C.Calculate(sb.Build());
+
+            return sb.Prepend(ToBigEndianBytes(checksum))
+                .Prepend(MagicNumber)
+                .Prepend(commandBytes)
+                .Prepend(commandSizeBytes)
+                .Prepend(ToBigEndianBytes((uint)sb.Length))
+                .Build();
+        }
+
+        public static byte[] ToBigEndianBytes(uint integer)
+        {
+            var union = new UIntUnion(integer);
+            if (BitConverter.IsLittleEndian)
+                return new[] { union.B3, union.B2, union.B1, union.B0 };
+            else
+                return new[] { union.B0, union.B1, union.B2, union.B3 };
+        }
+
+        private static byte[] Serialize<T>(T item)
+        {
+            using var ms = new MemoryStream();
+            ProtoBuf.Serializer.Serialize(ms, item);
+            return ms.ToArray();
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/StateChanged.cs b/src/DotPulsar/Internal/StateChanged.cs
new file mode 100644
index 0000000..3690f44
--- /dev/null
+++ b/src/DotPulsar/Internal/StateChanged.cs
@@ -0,0 +1,8 @@
+namespace DotPulsar.Internal
+{
+    public enum StateChanged : byte
+    {
+        To,
+        From
+    }
+}
diff --git a/src/DotPulsar/Internal/StateManager.cs b/src/DotPulsar/Internal/StateManager.cs
new file mode 100644
index 0000000..84557df
--- /dev/null
+++ b/src/DotPulsar/Internal/StateManager.cs
@@ -0,0 +1,75 @@
+using DotPulsar.Abstractions;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class StateManager<TState> : IStateChanged<TState>
+    {
+        private readonly object _lock;
+        private readonly StateTaskCollection<TState> _stateTasks;
+        private readonly TState[] _finalStates;
+
+        public StateManager(TState initialState, params TState[] finalStates)
+        {
+            _lock = new object();
+            _stateTasks = new StateTaskCollection<TState>();
+            _finalStates = finalStates;
+            CurrentState = initialState;
+        }
+
+        public TState CurrentState { get; private set; }
+
+        public TState SetState(TState state)
+        {
+            lock (_lock)
+            {
+                if (IsFinalState(CurrentState) || CurrentState.Equals(state))
+                    return CurrentState;
+
+                var formerState = CurrentState;
+                CurrentState = state;
+
+                if (IsFinalState(CurrentState))
+                    _stateTasks.CompleteAllTasks(CurrentState);
+                else
+                    _stateTasks.CompleteTasksAwaiting(CurrentState);
+
+                return formerState;
+            }
+        }
+
+        public Task<TState> StateChangedTo(TState state, CancellationToken cancellationToken)
+        {
+            lock (_lock)
+            {
+                if (IsFinalState(CurrentState) || CurrentState.Equals(state))
+                    return Task.FromResult(CurrentState);
+                return _stateTasks.CreateTaskFor(state, StateChanged.To, cancellationToken);
+            }
+        }
+
+        public Task<TState> StateChangedFrom(TState state, CancellationToken cancellationToken)
+        {
+            lock (_lock)
+            {
+                if (IsFinalState(CurrentState) || !CurrentState.Equals(state))
+                    return Task.FromResult(CurrentState);
+                return _stateTasks.CreateTaskFor(state, StateChanged.From, cancellationToken);
+            }
+        }
+
+        public bool IsFinalState(TState state)
+        {
+            for (var i = 0; i < _finalStates.Length; ++i)
+            {
+                if (_finalStates[i].Equals(state))
+                    return true;
+            }
+
+            return false;
+        }
+
+        public bool IsFinalState() => IsFinalState(CurrentState);
+    }
+}
diff --git a/src/DotPulsar/Internal/StateTask.cs b/src/DotPulsar/Internal/StateTask.cs
new file mode 100644
index 0000000..aaa17be
--- /dev/null
+++ b/src/DotPulsar/Internal/StateTask.cs
@@ -0,0 +1,29 @@
+using System;
+
+namespace DotPulsar.Internal
+{
+    public sealed class StateTask<TState> : IDisposable
+    {
+        private readonly TState _state;
+        private readonly StateChanged _change;
+
+        public StateTask(TState state, StateChanged change)
+        {
+            _state = state;
+            _change = change;
+            CancelableCompletionSource = new CancelableCompletionSource<TState>();
+        }
+
+        public CancelableCompletionSource<TState> CancelableCompletionSource { get; }
+
+        public void Dispose() => CancelableCompletionSource.Dispose();
+
+        public bool IsAwaiting(TState state)
+        {
+            if (_change == StateChanged.To)
+                return _state.Equals(state);
+
+            return !_state.Equals(state);
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs b/src/DotPulsar/Internal/StateTaskCollection.cs
new file mode 100644
index 0000000..eaea664
--- /dev/null
+++ b/src/DotPulsar/Internal/StateTaskCollection.cs
@@ -0,0 +1,79 @@
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DotPulsar.Internal
+{
+    public sealed class StateTaskCollection<TState>
+    {
+        private readonly object _lock;
+        private readonly LinkedList<StateTask<TState>> _awaitors;
+
+        public StateTaskCollection()
+        {
+            _lock = new object();
+            _awaitors = new LinkedList<StateTask<TState>>();
+        }
+
+        public Task<TState> CreateTaskFor(TState state, StateChanged changed, CancellationToken cancellationToken)
+        {
+            LinkedListNode<StateTask<TState>> node;
+
+            lock (_lock)
+            {
+                node = _awaitors.AddFirst(new StateTask<TState>(state, changed));
+            }
+
+            node.Value.CancelableCompletionSource.SetupCancellation(() => TaskWasCanceled(node), cancellationToken);
+            return node.Value.CancelableCompletionSource.Task;
+        }
+
+        public void CompleteTasksAwaiting(TState state)
+        {
+            lock (_lock)
+            {
+                var awaitor = _awaitors.First;
+                while (awaitor != null)
+                {
+                    var next = awaitor.Next;
+                    if (awaitor.Value.IsAwaiting(state))
+                    {
+                        _awaitors.Remove(awaitor);
+                        awaitor.Value.CancelableCompletionSource.SetResult(state);
+                        awaitor.Value.CancelableCompletionSource.Dispose();
+                    }
+                    awaitor = next;
+                }
+            }
+        }
+
+        public void CompleteAllTasks(TState state)
+        {
+            lock (_lock)
+            {
+                foreach (var awaitor in _awaitors)
+                {
+                    awaitor.CancelableCompletionSource.SetResult(state);
+                    awaitor.CancelableCompletionSource.Dispose();
+                }
+                _awaitors.Clear();
+            }
+        }
+
+        private void TaskWasCanceled(LinkedListNode<StateTask<TState>> node)
+        {
+            lock (_lock)
+            {
+                try
+                {
+                    _awaitors.Remove(node);
+                    node.Value.Dispose();
+                }
+                catch
+                {
+                    // Ignore
+                }
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/SubscribeResponse.cs b/src/DotPulsar/Internal/SubscribeResponse.cs
new file mode 100644
index 0000000..a401525
--- /dev/null
+++ b/src/DotPulsar/Internal/SubscribeResponse.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar.Internal
+{
+    public sealed class SubscribeResponse
+    {
+        public SubscribeResponse(ulong consumerId) => ConsumerId = consumerId;
+
+        public ulong ConsumerId { get; }
+    }
+}
diff --git a/src/DotPulsar/Internal/UIntUnion.cs b/src/DotPulsar/Internal/UIntUnion.cs
new file mode 100644
index 0000000..160bddd
--- /dev/null
+++ b/src/DotPulsar/Internal/UIntUnion.cs
@@ -0,0 +1,38 @@
+using System.Runtime.InteropServices;
+
+namespace DotPulsar.Internal
+{
+    [StructLayout(LayoutKind.Explicit)]
+    public struct UIntUnion
+    {
+        public UIntUnion(byte b0, byte b1, byte b2, byte b3)
+        {
+            UInt = 0;
+            B0 = b0;
+            B1 = b1;
+            B2 = b2;
+            B3 = b3;
+        }
+
+        public UIntUnion(uint value)
+        {
+            B0 = 0;
+            B1 = 0;
+            B2 = 0;
+            B3 = 0;
+            UInt = value;
+        }
+
+        [FieldOffset(0)]
+        public byte B0;
+        [FieldOffset(1)]
+        public byte B1;
+        [FieldOffset(2)]
+        public byte B2;
+        [FieldOffset(3)]
+        public byte B3;
+
+        [FieldOffset(0)]
+        public uint UInt;
+    }
+}
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
new file mode 100644
index 0000000..eb553b5
--- /dev/null
+++ b/src/DotPulsar/Message.cs
@@ -0,0 +1,35 @@
+using System.Buffers;
+using System.Collections.Immutable;
+
+namespace DotPulsar
+{
+    public sealed class Message
+    {
+        private readonly Internal.PulsarApi.MessageMetadata _messageMetadata;
+        private ImmutableDictionary<string, string> _properties;
+
+        internal Message(MessageId messageId, Internal.PulsarApi.MessageMetadata messageMetadata, ReadOnlySequence<byte> data)
+        {
+            MessageId = messageId;
+            _messageMetadata = messageMetadata;
+            Data = data;
+        }
+
+        public MessageId MessageId { get; }
+        public ReadOnlySequence<byte> Data { get; }
+        public ulong EventTime => _messageMetadata.EventTime;
+        public string ProducerName => _messageMetadata.ProducerName;
+        public ulong PublishTime => _messageMetadata.PublishTime;
+        public ulong SequenceId => _messageMetadata.SequenceId;
+
+        public ImmutableDictionary<string, string> Properties
+        {
+            get
+            {
+                if (_properties == null)
+                    _properties = _messageMetadata.Properties.ToImmutableDictionary(p => p.Key, p => p.Value);
+                return _properties;
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
new file mode 100644
index 0000000..1c163af
--- /dev/null
+++ b/src/DotPulsar/MessageId.cs
@@ -0,0 +1,46 @@
+using DotPulsar.Internal.PulsarApi;
+using System;
+
+namespace DotPulsar
+{
+    public sealed class MessageId : IEquatable<MessageId>
+    {
+        static MessageId()
+        {
+            Earliest = new MessageId(ulong.MaxValue, ulong.MaxValue, -1, -1);
+            Latest = new MessageId(long.MaxValue, long.MaxValue, -1, -1);
+        }
+
+        public static MessageId Earliest { get; }
+        public static MessageId Latest { get; }
+
+        internal MessageId(MessageIdData messageIdData) => Data = messageIdData;
+
+        public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex)
+        {
+            Data = new MessageIdData
+            {
+                LedgerId = ledgerId,
+                EntryId = entryId,
+                Partition = partition,
+                BatchIndex = batchIndex
+            };
+        }
+
+        internal MessageIdData Data { get; }
+
+        public ulong LedgerId => Data.LedgerId;
+        public ulong EntryId => Data.EntryId;
+        public int Partition => Data.Partition;
+        public int BatchIndex => Data.BatchIndex;
+
+        public override bool Equals(object o) => o is MessageId ? Equals((MessageId)o) : false;
+        public bool Equals(MessageId other) => LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
+
+        public static bool operator ==(MessageId x, MessageId y) => x.Equals(y);
+        public static bool operator !=(MessageId x, MessageId y) => !x.Equals(y);
+
+        public override int GetHashCode() => HashCode.Combine(LedgerId, EntryId, Partition, BatchIndex);
+        public override string ToString() => $"LedgerId: {LedgerId}, EntryId: {EntryId}, Partition: {Partition}, BatchIndex: {BatchIndex}";
+    }
+}
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
new file mode 100644
index 0000000..006abd2
--- /dev/null
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -0,0 +1,49 @@
+namespace DotPulsar
+{
+    public sealed class MessageMetadata
+    {
+        public MessageMetadata() => Metadata = new Internal.PulsarApi.MessageMetadata();
+
+        internal Internal.PulsarApi.MessageMetadata Metadata;
+
+        public ulong EventTime
+        {
+            get => Metadata.EventTime;
+            set => Metadata.EventTime = value;
+        }
+
+        public string this[string key]
+        {
+            get
+            {
+                for (var i = 0; i < Metadata.Properties.Count; ++i)
+                {
+                    var keyValye = Metadata.Properties[i];
+                    if (keyValye.Key == key)
+                        return keyValye.Value;
+                }
+
+                return null;
+            }
+            set
+            {
+                for (var i = 0; i < Metadata.Properties.Count; ++i)
+                {
+                    var keyValye = Metadata.Properties[i];
+                    if (keyValye.Key != key)
+                        continue;
+                    keyValye.Value = value;
+                    return;
+                }
+
+                Metadata.Properties.Add(new Internal.PulsarApi.KeyValue { Key = key, Value = value });
+            }
+        }
+
+        public ulong SequenceId
+        {
+            get => Metadata.SequenceId;
+            set => Metadata.SequenceId = value;
+        }
+    }
+}
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
new file mode 100644
index 0000000..c161b11
--- /dev/null
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar
+{
+    public sealed class ProducerOptions
+    {
+        public string ProducerName { get; set; }
+        public ulong InitialSequenceId { get; set; }
+        public string Topic { get; set; }
+    }
+}
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/ProducerState.cs
new file mode 100644
index 0000000..0326e88
--- /dev/null
+++ b/src/DotPulsar/ProducerState.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar
+{
+    public enum ProducerState : byte
+    {
+        Closed,
+        Connected,
+        Disconnected,
+        Faulted
+    }
+}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
new file mode 100644
index 0000000..7fa2219
--- /dev/null
+++ b/src/DotPulsar/PulsarClient.cs
@@ -0,0 +1,94 @@
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal;
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Collections.Generic;
+
+namespace DotPulsar
+{
+    public sealed class PulsarClient : IPulsarClient
+    {
+        private readonly object _lock;
+        private readonly IFaultStrategy _faultStrategy;
+        private readonly LinkedList<IDisposable> _disposabels;
+        private readonly ConnectionPool _connectionPool;
+        private bool _isClosed;
+
+        internal PulsarClient(ConnectionPool connectionPool)
+        {
+            _lock = new object();
+            _faultStrategy = new FaultStrategy(3000);
+            _disposabels = new LinkedList<IDisposable>();
+            _connectionPool = connectionPool;
+            _isClosed = false;
+        }
+
+        public static IPulsarClientBuilder Builder() => new PulsarClientBuilder();
+
+        public IProducer CreateProducer(ProducerOptions options)
+        {
+            lock (_lock)
+            {
+                ThrowIfClosed();
+                var producer = new Producer(new ProducerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
+                _disposabels.AddFirst(producer);
+                producer.StateChangedTo(ProducerState.Closed, default).ContinueWith(t => Remove(producer));
+                return producer;
+            }
+        }
+
+        public IConsumer CreateConsumer(ConsumerOptions options)
+        {
+            lock (_lock)
+            {
+                ThrowIfClosed();
+                var consumer = new Consumer(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy, options.SubscriptionType != SubscriptionType.Failover);
+                _disposabels.AddFirst(consumer);
+                consumer.StateChangedTo(ConsumerState.Closed, default).ContinueWith(t => Remove(consumer));
+                return consumer;
+            }
+        }
+
+        public IReader CreateReader(ReaderOptions options)
+        {
+            lock (_lock)
+            {
+                ThrowIfClosed();
+                var reader = new Reader(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
+                _disposabels.AddFirst(reader);
+                reader.StateChangedTo(ReaderState.Closed, default).ContinueWith(t => Remove(reader));
+                return reader;
+            }
+        }
+
+        public void Dispose() //While we wait for IAsyncDisposable
+        {
+            lock (_lock)
+            {
+                ThrowIfClosed();
+                _isClosed = true;
+                foreach (var disposable in _disposabels)
+                {
+                    disposable.Dispose();
+                }
+            }
+
+            _connectionPool.Dispose();
+        }
+
+        private void ThrowIfClosed()
+        {
+            if (_isClosed)
+                throw new PulsarClientClosedException();
+        }
+
+        private void Remove(IDisposable disposable)
+        {
+            lock (_lock)
+            {
+                _disposabels.Remove(disposable);
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/ReaderOptions.cs b/src/DotPulsar/ReaderOptions.cs
new file mode 100644
index 0000000..a5087d1
--- /dev/null
+++ b/src/DotPulsar/ReaderOptions.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar
+{
+    public sealed class ReaderOptions
+    {
+        public string ReaderName { get; set; }
+        public uint MessagePrefetchCount { get; set; }
+        public MessageId StartMessageId { get; set; }
+        public string Topic { get; set; }
+    }
+}
diff --git a/src/DotPulsar/ReaderState.cs b/src/DotPulsar/ReaderState.cs
new file mode 100644
index 0000000..32654e6
--- /dev/null
+++ b/src/DotPulsar/ReaderState.cs
@@ -0,0 +1,11 @@
+namespace DotPulsar
+{
+    public enum ReaderState : byte
+    {
+        Closed,
+        Connected,
+        Disconnected,
+        Faulted,
+        ReachedEndOfTopic
+    }
+}
diff --git a/src/DotPulsar/SubscriptionInitialPosition.cs b/src/DotPulsar/SubscriptionInitialPosition.cs
new file mode 100644
index 0000000..8c352d0
--- /dev/null
+++ b/src/DotPulsar/SubscriptionInitialPosition.cs
@@ -0,0 +1,8 @@
+namespace DotPulsar
+{
+    public enum SubscriptionInitialPosition : byte
+    {
+        Latest = 0,
+        Earliest = 1
+    }
+}
diff --git a/src/DotPulsar/SubscriptionType.cs b/src/DotPulsar/SubscriptionType.cs
new file mode 100644
index 0000000..aa815ce
--- /dev/null
+++ b/src/DotPulsar/SubscriptionType.cs
@@ -0,0 +1,9 @@
+namespace DotPulsar
+{
+    public enum SubscriptionType : byte
+    {
+        Exclusive = 0,
+        Shared = 1,
+        Failover = 2
+    }
+}