first full cleanup
diff --git a/.editorconfig b/.editorconfig
index 5021ae4..44b4c97 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -9,6 +9,21 @@
 # Microsoft .NET properties
 csharp_using_directive_placement = inside_namespace:silent
 
+# ReSharper properties
+resharper_constructor_or_destructor_body=expression_body
+resharper_local_function_body=expression_body
+resharper_method_or_operator_body=expression_body
+resharper_wrap_before_arrow_with_expressions=true
+resharper_csharp_insert_final_newline=true
+resharper_csharp_keep_blank_lines_in_code=1
+resharper_csharp_keep_blank_lines_in_declarations=1
+resharper_empty_block_style=together_same_line
+resharper_keep_existing_expr_member_arrangement=false
+resharper_place_expr_accessor_on_single_line=true
+resharper_place_expr_property_on_single_line=true
+resharper_space_within_single_line_array_initializer_braces=true
+resharper_use_indent_from_vs=false
+
 [{*.yaml, *.yml}]
 indent_size = 2
 
diff --git a/.idea/.idea.DotPulsar/.idea/codeStyles/Project.xml b/.idea/.idea.DotPulsar/.idea/codeStyles/Project.xml
index 0521549..25b0629 100644
--- a/.idea/.idea.DotPulsar/.idea/codeStyles/Project.xml
+++ b/.idea/.idea.DotPulsar/.idea/codeStyles/Project.xml
@@ -1,8 +1,8 @@
 <component name="ProjectCodeStyleConfiguration">
   <code_scheme name="Project" version="173">
     <option name="AUTODETECT_INDENTS" value="false" />
-    <option name="RIGHT_MARGIN" value="160" />
+    <option name="RIGHT_MARGIN" value="180" />
     <option name="WRAP_WHEN_TYPING_REACHES_RIGHT_MARGIN" value="true" />
-    <option name="SOFT_MARGINS" value="140,160" />
+    <option name="SOFT_MARGINS" value="180" />
   </code_scheme>
 </component>
\ No newline at end of file
diff --git a/DotPulsar.sln.DotSettings b/DotPulsar.sln.DotSettings
index cbd4041..c011518 100644
--- a/DotPulsar.sln.DotSettings
+++ b/DotPulsar.sln.DotSettings
@@ -1,4 +1,8 @@
 <wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
+	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeAccessorOwnerBody/@EntryIndexedValue">DO_NOT_SHOW</s:String>
+	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeConstructorOrDestructorBody/@EntryIndexedValue"></s:String>
+	<s:Boolean x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeConstructorOrDestructorBody/@EntryIndexRemoved">True</s:Boolean>
+	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=SuggestDiscardDeclarationVarStyle/@EntryIndexedValue">DO_NOT_SHOW</s:String>
 	<s:String x:Key="/Default/CodeStyle/CodeCleanup/Profiles/=DotPulsar_003A_0020Full_0020Cleanup/@EntryIndexedValue">&lt;?xml version="1.0" encoding="utf-16"?&gt;&lt;Profile name="DotPulsar: Full Cleanup"&gt;&lt;XMLReformatCode&gt;True&lt;/XMLReformatCode&gt;&lt;CSCodeStyleAttributes ArrangeTypeAccessModifier="True" ArrangeTypeMemberAccessModifier="True" SortModifiers="True" RemoveRedundantParentheses="True" AddMissingParentheses="True" ArrangeBraces="True" ArrangeAttributes="True" ArrangeArgumentsStyle="True" ArrangeCodeBodyStyle="True" ArrangeVarStyle="True" ArrangeTrailingCommas="True" /&gt;&lt;CSOptimizeUsings&gt;&lt;OptimizeUsings&gt;True&lt;/OptimizeUsings&gt;&lt;EmbraceInRegion&gt;False&lt;/EmbraceInRegion&gt;&lt;RegionName&gt;&lt;/RegionName&gt;&lt;/CSOptimizeUsings&gt;&lt;CSReformatCode&gt;True&lt;/CSReformatCode&gt;&lt;RemoveCodeRedundanciesVB&gt;True&lt;/RemoveCodeRedundanciesVB&gt;&lt;VBOptimizeImports&gt;True&lt;/VBOptimizeImports&gt;&lt;VBShortenReferences&gt;True&lt;/VBShortenReferences&gt;&lt;VBFormatDocComments&gt;True&lt;/VBFormatDocComments&gt;&lt;FormatAttributeQuoteDescriptor&gt;True&lt;/FormatAttributeQuoteDescriptor&gt;&lt;XAMLCollapseEmptyTags&gt;False&lt;/XAMLCollapseEmptyTags&gt;&lt;RemoveCodeRedundancies&gt;True&lt;/RemoveCodeRedundancies&gt;&lt;CSUseAutoProperty&gt;True&lt;/CSUseAutoProperty&gt;&lt;CSArrangeQualifiers&gt;True&lt;/CSArrangeQualifiers&gt;&lt;CSShortenReferences&gt;True&lt;/CSShortenReferences&gt;&lt;CSMakeFieldReadonly&gt;True&lt;/CSMakeFieldReadonly&gt;&lt;CSMakeAutoPropertyGetOnly&gt;True&lt;/CSMakeAutoPropertyGetOnly&gt;&lt;IDEA_SETTINGS&gt;&amp;lt;profile version="1.0"&amp;gt;
   &amp;lt;option name="myName" value="DotPulsar: Full Cleanup" /&amp;gt;
   &amp;lt;inspection_tool class="ES6ShorthandObjectProperty" enabled="false" level="INFORMATION" enabled_by_default="false" /&amp;gt;
@@ -14,5 +18,36 @@
   &amp;lt;inspection_tool class="UnnecessaryReturnJS" enabled="false" level="WARNING" enabled_by_default="false" /&amp;gt;
 &amp;lt;/profile&amp;gt;&lt;/IDEA_SETTINGS&gt;&lt;/Profile&gt;</s:String>
 	<s:String x:Key="/Default/CodeStyle/CodeCleanup/SilentCleanupProfile/@EntryValue">DotPulsar: Full Cleanup</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/BRACES_FOR_FOR/@EntryValue">RequiredForMultiline</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/BRACES_FOR_FOREACH/@EntryValue">RequiredForMultiline</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/BRACES_FOR_WHILE/@EntryValue">RequiredForMultiline</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/CONSTRUCTOR_OR_DESTRUCTOR_BODY/@EntryValue">ExpressionBody</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/LOCAL_FUNCTION_BODY/@EntryValue">ExpressionBody</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/METHOD_OR_OPERATOR_BODY/@EntryValue">ExpressionBody</s:String>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/USE_HEURISTICS_FOR_BODY_STYLE/@EntryValue">False</s:Boolean>
+	<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/BLANK_LINES_BEFORE_MULTILINE_STATEMENTS/@EntryValue">1</s:Int64>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/EMPTY_BLOCK_STYLE/@EntryValue">TOGETHER_SAME_LINE</s:String>
+	<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_BLANK_LINES_IN_CODE/@EntryValue">1</s:Int64>
+	<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_BLANK_LINES_IN_DECLARATIONS/@EntryValue">1</s:Int64>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_EMBEDDED_ARRANGEMENT/@EntryValue">False</s:Boolean>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_EXPR_MEMBER_ARRANGEMENT/@EntryValue">False</s:Boolean>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_INITIALIZER_ARRANGEMENT/@EntryValue">False</s:Boolean>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_SWITCH_EXPRESSION_ARRANGEMENT/@EntryValue">False</s:Boolean>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/LINE_FEED_AT_FILE_END/@EntryValue">True</s:Boolean>
+	<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/MAX_ARRAY_INITIALIZER_ELEMENTS_ON_LINE/@EntryValue">4</s:Int64>
+	<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/MAX_INITIALIZER_ELEMENTS_ON_LINE/@EntryValue">1</s:Int64>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_ACCESSORHOLDER_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">NEVER</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_EXPR_ACCESSOR_ON_SINGLE_LINE/@EntryValue">ALWAYS</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_EXPR_METHOD_ON_SINGLE_LINE/@EntryValue">NEVER</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_EXPR_PROPERTY_ON_SINGLE_LINE/@EntryValue">ALWAYS</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_FIELD_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">NEVER</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_CASE_STATEMENT_ON_SAME_LINE/@EntryValue">ALWAYS</s:String>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_EMBEDDED_STATEMENT_ON_SAME_LINE/@EntryValue">NEVER</s:String>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_INITIALIZER_ON_SINGLE_LINE/@EntryValue">False</s:Boolean>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/SPACE_WITHIN_SINGLE_LINE_ARRAY_INITIALIZER_BRACES/@EntryValue">True</s:Boolean>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_BEFORE_ARROW_WITH_EXPRESSIONS/@EntryValue">True</s:Boolean>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_BEFORE_EXTENDS_COLON/@EntryValue">True</s:Boolean>
+	<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_BEFORE_FIRST_TYPE_PARAMETER_CONSTRAINT/@EntryValue">True</s:Boolean>
+	<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_OBJECT_AND_COLLECTION_INITIALIZER_STYLE/@EntryValue">CHOP_ALWAYS</s:String>
     <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean>
 </wpf:ResourceDictionary>
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 5b58a14..b792652 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -12,20 +12,20 @@
  * limitations under the License.
  */
 
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-using System;
-using System.Buffers;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace Consuming
 {
-    class Program
+    using System;
+    using System.Buffers;
+    using System.Text;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using DotPulsar;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Extensions;
+
+    internal class Program
     {
-        static async Task Main(string[] args)
+        private static async Task Main(string[] args)
         {
             const string myTopic = "persistent://public/default/mytopic";
 
@@ -68,10 +68,7 @@
                     await consumer.Acknowledge(message, cancellationToken).ConfigureAwait(false);
                 }
             }
-            catch(OperationCanceledException)
-            {
-                return;
-            }
+            catch (OperationCanceledException) { }
         }
 
         private static async Task Monitor(IConsumer consumer)
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index b742d02..b268c6a 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -12,19 +12,19 @@
  * limitations under the License.
  */
 
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-using System;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace Producing
 {
-    class Program
+    using System;
+    using System.Text;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using DotPulsar;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Extensions;
+
+    internal class Program
     {
-        static async Task Main(string[] args)
+        private static async Task Main(string[] args)
         {
             const string myTopic = "persistent://public/default/mytopic";
 
@@ -61,15 +61,13 @@
             {
                 while (!cancellationToken.IsCancellationRequested)
                 {
-                    var data = Encoding.UTF8.GetBytes("Sent " + DateTime.UtcNow.ToString());
+                    var data = Encoding.UTF8.GetBytes("Sent " + DateTime.UtcNow);
                     _ = await producer.Send(data, cancellationToken).ConfigureAwait(false);
                     await Task.Delay(delay).ConfigureAwait(false);
                 }
             }
             catch (OperationCanceledException) // If not using the cancellationToken, then just dispose the producer and catch ObjectDisposedException instead
-            {
-                return;
-            }
+            { }
         }
 
         private static async Task Monitor(IProducer producer)
@@ -84,10 +82,10 @@
 
                 var stateMessage = state switch
                 {
-                    ProducerState.Connected => $"The producer is connected",
-                    ProducerState.Disconnected => $"The producer is disconnected",
-                    ProducerState.Closed => $"The producer has closed",
-                    ProducerState.Faulted => $"The producer has faulted",
+                    ProducerState.Connected => "The producer is connected",
+                    ProducerState.Disconnected => "The producer is disconnected",
+                    ProducerState.Closed => "The producer has closed",
+                    ProducerState.Faulted => "The producer has faulted",
                     _ => $"The producer has an unknown state '{state}'"
                 };
 
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 515cf60..3dbe68f 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -12,20 +12,20 @@
  * limitations under the License.
  */
 
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-using System;
-using System.Buffers;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace Reading
 {
-    class Program
+    using System;
+    using System.Buffers;
+    using System.Text;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using DotPulsar;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Extensions;
+
+    internal class Program
     {
-        static async Task Main(string[] args)
+        private static async Task Main(string[] args)
         {
             const string myTopic = "persistent://public/default/mytopic";
 
@@ -36,7 +36,6 @@
                 .Topic(myTopic)
                 .Create();
 
-
             var monitoring = Monitor(reader);
 
             var cts = new CancellationTokenSource();
@@ -44,7 +43,7 @@
             var reading = ReadMessages(reader, cts.Token);
 
             Console.WriteLine("Press a key to exit");
-            
+
             _ = Console.ReadKey();
 
             cts.Cancel();
@@ -68,10 +67,7 @@
                     Console.WriteLine("Received: " + data);
                 }
             }
-            catch(OperationCanceledException)
-            {
-                return;
-            }
+            catch (OperationCanceledException) { }
         }
 
         private static async Task Monitor(IReader reader)
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index a43112f..4d1c54c 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -12,13 +12,13 @@
  * limitations under the License.
  */
 
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Abstractions
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     /// <summary>
     /// A consumer abstraction.
     /// </summary>
diff --git a/src/DotPulsar/Abstractions/IHandleException.cs b/src/DotPulsar/Abstractions/IHandleException.cs
index 17e73ce..f4acbcf 100644
--- a/src/DotPulsar/Abstractions/IHandleException.cs
+++ b/src/DotPulsar/Abstractions/IHandleException.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System.Threading.Tasks;
-
 namespace DotPulsar.Abstractions
 {
+    using System.Threading.Tasks;
+
     /// <summary>
     /// An exception handling abstraction.
     /// </summary>
diff --git a/src/DotPulsar/Abstractions/IMessageBuilder.cs b/src/DotPulsar/Abstractions/IMessageBuilder.cs
index 0cea4c4..c5812ed 100644
--- a/src/DotPulsar/Abstractions/IMessageBuilder.cs
+++ b/src/DotPulsar/Abstractions/IMessageBuilder.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     /// <summary>
     /// A message building abstraction.
     /// </summary>
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index d75ca77..dec0362 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -12,13 +12,13 @@
  * limitations under the License.
  */
 
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Abstractions
 {
+    using System;
+    using System.Buffers;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     /// <summary>
     /// A producer abstraction.
     /// </summary>
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
index 339ec02..ddce2e8 100644
--- a/src/DotPulsar/Abstractions/IPulsarClient.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System;
-
 namespace DotPulsar.Abstractions
 {
+    using System;
+
     /// <summary>
     /// A pulsar client abstraction.
     /// </summary>
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 37e0e9f..55cd63b 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using System;
-using System.Security.Cryptography.X509Certificates;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Abstractions
 {
+    using System;
+    using System.Security.Cryptography.X509Certificates;
+    using System.Threading.Tasks;
+
     /// <summary>
     /// A pulsar client building abstraction.
     /// </summary>
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 7510cbe..d3a04c1 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using System;
-using System.Collections.Generic;
-using System.Threading;
-
 namespace DotPulsar.Abstractions
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Threading;
+
     /// <summary>
     /// A reader abstraction.
     /// </summary>
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs b/src/DotPulsar/Abstractions/IStateChanged.cs
index 46bb5cc..079b9be 100644
--- a/src/DotPulsar/Abstractions/IStateChanged.cs
+++ b/src/DotPulsar/Abstractions/IStateChanged.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Abstractions
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+
     /// <summary>
     /// A state change monitoring abstraction.
     /// </summary>
diff --git a/src/DotPulsar/ExceptionContext.cs b/src/DotPulsar/ExceptionContext.cs
index f2d88fa..c911791 100644
--- a/src/DotPulsar/ExceptionContext.cs
+++ b/src/DotPulsar/ExceptionContext.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using System;
-using System.Threading;
-
 namespace DotPulsar
 {
+    using System;
+    using System.Threading;
+
     public sealed class ExceptionContext
     {
         internal ExceptionContext(Exception exception, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Exceptions/AuthenticationException.cs b/src/DotPulsar/Exceptions/AuthenticationException.cs
index e26ebd6..b9634b2 100644
--- a/src/DotPulsar/Exceptions/AuthenticationException.cs
+++ b/src/DotPulsar/Exceptions/AuthenticationException.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System;
-
 namespace DotPulsar.Exceptions
 {
+    using System;
+
     public sealed class AuthenticationException : DotPulsarException
     {
         public AuthenticationException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ConsumerBusyException.cs b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
index 3d5f87c..3f6214e 100644
--- a/src/DotPulsar/Exceptions/ConsumerBusyException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
@@ -18,4 +18,4 @@
     {
         public ConsumerBusyException(string message) : base(message) { }
     }
-}
\ No newline at end of file
+}
diff --git a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
index f6079bd..c11206b 100644
--- a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
@@ -1,8 +1,8 @@
-using DotPulsar.Internal;
-using System;
-
-namespace DotPulsar.Exceptions
+namespace DotPulsar.Exceptions
 {
+    using System;
+    using Internal;
+
     public sealed class ConsumerDisposedException : ObjectDisposedException
     {
         public ConsumerDisposedException() : base(typeof(Consumer).FullName) { }
diff --git a/src/DotPulsar/Exceptions/DotPulsarException.cs b/src/DotPulsar/Exceptions/DotPulsarException.cs
index 07e1f4f..158f906 100644
--- a/src/DotPulsar/Exceptions/DotPulsarException.cs
+++ b/src/DotPulsar/Exceptions/DotPulsarException.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System;
-
 namespace DotPulsar.Exceptions
 {
+    using System;
+
     public abstract class DotPulsarException : Exception
     {
         public DotPulsarException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ProducerBusyException.cs b/src/DotPulsar/Exceptions/ProducerBusyException.cs
index 4e1c0f8..094e615 100644
--- a/src/DotPulsar/Exceptions/ProducerBusyException.cs
+++ b/src/DotPulsar/Exceptions/ProducerBusyException.cs
@@ -18,4 +18,4 @@
     {
         public ProducerBusyException(string message) : base(message) { }
     }
-}
\ No newline at end of file
+}
diff --git a/src/DotPulsar/Exceptions/ProducerDisposedException.cs b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
index c881b45..aee0b97 100644
--- a/src/DotPulsar/Exceptions/ProducerDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
@@ -1,8 +1,8 @@
-using DotPulsar.Internal;
-using System;
-
-namespace DotPulsar.Exceptions
+namespace DotPulsar.Exceptions
 {
+    using System;
+    using Internal;
+
     public sealed class ProducerDisposedException : ObjectDisposedException
     {
         public ProducerDisposedException() : base(typeof(Producer).FullName) { }
diff --git a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
index f4a4f00..1066730 100644
--- a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
+++ b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
@@ -1,7 +1,7 @@
-using System;
-
-namespace DotPulsar.Exceptions
+namespace DotPulsar.Exceptions
 {
+    using System;
+
     public sealed class PulsarClientDisposedException : ObjectDisposedException
     {
         public PulsarClientDisposedException() : base(typeof(PulsarClient).FullName) { }
diff --git a/src/DotPulsar/Exceptions/ReaderDisposedException.cs b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
index ee02457..d95f5c8 100644
--- a/src/DotPulsar/Exceptions/ReaderDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
@@ -1,8 +1,8 @@
-using DotPulsar.Internal;
-using System;
-
-namespace DotPulsar.Exceptions
+namespace DotPulsar.Exceptions
 {
+    using System;
+    using Internal;
+
     public sealed class ReaderDisposedException : ObjectDisposedException
     {
         public ReaderDisposedException() : base(typeof(Reader).FullName) { }
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
index b45c97f..bf37687 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -12,13 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Internal;
-
 namespace DotPulsar.Extensions
 {
+    using Abstractions;
+    using Internal;
+
     public static class ProducerExtensions
     {
-        public static IMessageBuilder NewMessage(this IProducer producer) => new MessageBuilder(producer);
+        public static IMessageBuilder NewMessage(this IProducer producer)
+            => new MessageBuilder(producer);
     }
 }
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
index abdf6a9..8a8698a 100644
--- a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -12,15 +12,20 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Internal;
-
 namespace DotPulsar.Extensions
 {
+    using Abstractions;
+    using Internal;
+
     public static class PulsarClientExtensions
     {
-        public static IProducerBuilder NewProducer(this IPulsarClient pulsarClient) => new ProducerBuilder(pulsarClient);
-        public static IConsumerBuilder NewConsumer(this IPulsarClient pulsarClient) => new ConsumerBuilder(pulsarClient);
-        public static IReaderBuilder NewReader(this IPulsarClient pulsarClient) => new ReaderBuilder(pulsarClient);
+        public static IProducerBuilder NewProducer(this IPulsarClient pulsarClient)
+            => new ProducerBuilder(pulsarClient);
+
+        public static IConsumerBuilder NewConsumer(this IPulsarClient pulsarClient)
+            => new ConsumerBuilder(pulsarClient);
+
+        public static IReaderBuilder NewReader(this IPulsarClient pulsarClient)
+            => new ReaderBuilder(pulsarClient);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index cce261d..1e6857e 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -12,13 +12,13 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using PulsarApi;
+
     public interface IConnection : IAsyncDisposable
     {
         ValueTask<bool> HasChannels(CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
index 656d78c..ad313b1 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public interface IConnectionPool : IAsyncDisposable
     {
         ValueTask<IConnection> FindConnectionForTopic(string topic, CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index a2cbe34..bbca812 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -12,13 +12,13 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using PulsarApi;
+
     public interface IConsumerChannel : IAsyncDisposable
     {
         Task Send(CommandAck command, CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
index b8bbca2..3065119 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public interface IConsumerChannelFactory
     {
         Task<IConsumerChannel> Create(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IDequeue.cs b/src/DotPulsar/Internal/Abstractions/IDequeue.cs
index fbda477..f99da40 100644
--- a/src/DotPulsar/Internal/Abstractions/IDequeue.cs
+++ b/src/DotPulsar/Internal/Abstractions/IDequeue.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public interface IDequeue<T>
     {
         ValueTask<T> Dequeue(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IExecute.cs b/src/DotPulsar/Internal/Abstractions/IExecute.cs
index 8a3336a..1b4ec62 100644
--- a/src/DotPulsar/Internal/Abstractions/IExecute.cs
+++ b/src/DotPulsar/Internal/Abstractions/IExecute.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public interface IExecute
     {
         ValueTask Execute(Action action, CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IProcess.cs b/src/DotPulsar/Internal/Abstractions/IProcess.cs
index 9b3e0e3..4c48c39 100644
--- a/src/DotPulsar/Internal/Abstractions/IProcess.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProcess.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+
     public interface IProcess : IAsyncDisposable
     {
         Guid CorrelationId { get; }
@@ -23,4 +23,4 @@
         void Start();
         void Handle(IEvent @event);
     }
-}
\ No newline at end of file
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index f15fca2..43a97a9 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -12,17 +12,17 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+    using System.Buffers;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using PulsarApi;
+
     public interface IProducerChannel : IAsyncDisposable
     {
         Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
-        Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+        Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
     }
-}
\ No newline at end of file
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
index 4f6fefb..2f84785 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public interface IProducerChannelFactory
     {
         Task<IProducerChannel> Create(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IPulsarStream.cs b/src/DotPulsar/Internal/Abstractions/IPulsarStream.cs
index d032675..0ce67ef 100644
--- a/src/DotPulsar/Internal/Abstractions/IPulsarStream.cs
+++ b/src/DotPulsar/Internal/Abstractions/IPulsarStream.cs
@@ -12,14 +12,14 @@
  * limitations under the License.
  */
 
-using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public interface IPulsarStream : IAsyncDisposable
     {
         Task Send(ReadOnlySequence<byte> sequence);
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index b2ef7b0..64c2289 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public interface IReaderChannel : IAsyncDisposable
     {
         ValueTask<Message> Receive(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
index 46e83b4..c651dbf 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public interface IReaderChannelFactory
     {
         Task<IReaderChannel> Create(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IStateManager.cs b/src/DotPulsar/Internal/Abstractions/IStateManager.cs
index 0226f7a..83f5e19 100644
--- a/src/DotPulsar/Internal/Abstractions/IStateManager.cs
+++ b/src/DotPulsar/Internal/Abstractions/IStateManager.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-
 namespace DotPulsar.Internal.Abstractions
 {
+    using DotPulsar.Abstractions;
+
     public interface IStateManager<TState> : IStateChanged<TState> where TState : notnull
     {
         TState CurrentState { get; }
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
index bd2d8c4..cad8312 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -12,14 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Exceptions;
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Exceptions;
+
     public sealed class AsyncLock : IAsyncDisposable
     {
         private readonly LinkedList<CancelableCompletionSource<IDisposable>> _pending;
@@ -33,7 +33,7 @@
             _pending = new LinkedList<CancelableCompletionSource<IDisposable>>();
             _semaphoreSlim = new SemaphoreSlim(1, 1);
             _releaser = new Releaser(Release);
-            _completedTask = Task.FromResult((IDisposable)_releaser);
+            _completedTask = Task.FromResult((IDisposable) _releaser);
         }
 
         public Task<IDisposable> Lock(CancellationToken cancellationToken)
@@ -68,9 +68,7 @@
                     return;
 
                 foreach (var pending in _pending)
-                {
                     pending.Dispose();
-                }
 
                 _pending.Clear();
             }
@@ -124,9 +122,11 @@
         {
             private readonly Action _release;
 
-            public Releaser(Action release) => _release = release;
+            public Releaser(Action release)
+                => _release = release;
 
-            public void Dispose() => _release();
+            public void Dispose()
+                => _release();
         }
     }
 }
diff --git a/src/DotPulsar/Internal/AsyncLockExecutor.cs b/src/DotPulsar/Internal/AsyncLockExecutor.cs
index 98d2663..9258784 100644
--- a/src/DotPulsar/Internal/AsyncLockExecutor.cs
+++ b/src/DotPulsar/Internal/AsyncLockExecutor.cs
@@ -12,13 +12,13 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+
     public sealed class AsyncLockExecutor : IExecute, IAsyncDisposable
     {
         private readonly AsyncLock _lock;
@@ -30,7 +30,8 @@
             _executor = executor;
         }
 
-        public async ValueTask DisposeAsync() => await _lock.DisposeAsync().ConfigureAwait(false);
+        public async ValueTask DisposeAsync()
+            => await _lock.DisposeAsync().ConfigureAwait(false);
 
         public async ValueTask Execute(Action action, CancellationToken cancellationToken)
         {
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs
index 70519de..5d9286e 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Exceptions;
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Exceptions;
+
     public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable
     {
         private readonly object _lock;
@@ -48,7 +48,9 @@
                     tcs.Value.SetResult(item);
                 }
                 else
+                {
                     _queue.Enqueue(item);
+                }
             }
         }
 
diff --git a/src/DotPulsar/Internal/Awaitor.cs b/src/DotPulsar/Internal/Awaitor.cs
index 62cd3e8..130e467 100644
--- a/src/DotPulsar/Internal/Awaitor.cs
+++ b/src/DotPulsar/Internal/Awaitor.cs
@@ -12,26 +12,27 @@
  * limitations under the License.
  */
 
-using System;
-using System.Collections.Concurrent;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
-    public sealed class Awaitor<T, Result> : IDisposable
+    using System;
+    using System.Collections.Concurrent;
+    using System.Threading.Tasks;
+
+    public sealed class Awaitor<T, TResult> : IDisposable
     {
-        private readonly ConcurrentDictionary<T, TaskCompletionSource<Result>> _items;
+        private readonly ConcurrentDictionary<T, TaskCompletionSource<TResult>> _items;
 
-        public Awaitor() => _items = new ConcurrentDictionary<T, TaskCompletionSource<Result>>();
+        public Awaitor()
+            => _items = new ConcurrentDictionary<T, TaskCompletionSource<TResult>>();
 
-        public Task<Result> CreateTask(T item)
+        public Task<TResult> CreateTask(T item)
         {
-            var tcs = new TaskCompletionSource<Result>(TaskCreationOptions.RunContinuationsAsynchronously);
+            var tcs = new TaskCompletionSource<TResult>(TaskCreationOptions.RunContinuationsAsynchronously);
             _ = _items.TryAdd(item, tcs);
             return tcs.Task;
         }
 
-        public void SetResult(T item, Result result)
+        public void SetResult(T item, TResult result)
         {
             if (_items.TryRemove(item, out var tcs))
                 tcs.SetResult(result);
@@ -40,9 +41,7 @@
         public void Dispose()
         {
             foreach (var item in _items.Values)
-            {
                 item.SetCanceled();
-            }
 
             _items.Clear();
         }
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index c8e8c58..9aaff16 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -12,14 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System.Buffers;
-using System.Collections;
-using System.Collections.Generic;
-
 namespace DotPulsar.Internal
 {
+    using System.Buffers;
+    using System.Collections;
+    using System.Collections.Generic;
+    using Extensions;
+    using PulsarApi;
+
     public sealed class BatchHandler
     {
         private readonly bool _trackBatches;
@@ -33,12 +33,13 @@
             _batches = new LinkedList<Batch>();
         }
 
-        public Message Add(MessageIdData messageId, PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data)
+        public Message Add(MessageIdData messageId, MessageMetadata metadata, ReadOnlySequence<byte> data)
         {
             if (_trackBatches)
                 _batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
 
             long index = 0;
+
             for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
             {
                 var singleMetadataSize = data.ReadUInt32(index, true);
@@ -48,13 +49,14 @@
                 var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
                 var message = new Message(singleMessageId, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
                 _messages.Enqueue(message);
-                index += (uint)singleMetadata.PayloadSize;
+                index += (uint) singleMetadata.PayloadSize;
             }
 
             return _messages.Dequeue();
         }
 
-        public Message? GetNext() => _messages.Count == 0 ? null : _messages.Dequeue();
+        public Message? GetNext()
+            => _messages.Count == 0 ? null : _messages.Dequeue();
 
         public void Clear()
         {
@@ -72,11 +74,13 @@
                     continue;
 
                 batch.Acknowledge(messageId.BatchIndex);
+
                 if (batch.IsAcknowledged())
                 {
                     _batches.Remove(batch);
                     return batch.MessageId;
                 }
+
                 break;
             }
 
@@ -95,7 +99,8 @@
 
             public MessageIdData MessageId { get; }
 
-            public void Acknowledge(int batchIndex) => _acknowledgementIndex.Set(batchIndex, true);
+            public void Acknowledge(int batchIndex)
+                => _acknowledgementIndex.Set(batchIndex, true);
 
             public bool IsAcknowledged()
             {
diff --git a/src/DotPulsar/Internal/CancelableCompletionSource.cs b/src/DotPulsar/Internal/CancelableCompletionSource.cs
index ddc3428..c099aa3 100644
--- a/src/DotPulsar/Internal/CancelableCompletionSource.cs
+++ b/src/DotPulsar/Internal/CancelableCompletionSource.cs
@@ -12,24 +12,28 @@
  * limitations under the License.
  */
 
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public sealed class CancelableCompletionSource<T> : IDisposable
     {
         private readonly TaskCompletionSource<T> _source;
         private CancellationTokenRegistration? _registration;
 
-        public CancelableCompletionSource() => _source = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
+        public CancelableCompletionSource()
+            => _source = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
 
-        public void SetupCancellation(Action callback, CancellationToken token) => _registration = token.Register(() => callback());
+        public void SetupCancellation(Action callback, CancellationToken token)
+            => _registration = token.Register(() => callback());
 
-        public void SetResult(T result) => _ = _source.TrySetResult(result);
+        public void SetResult(T result)
+            => _ = _source.TrySetResult(result);
 
-        public void SetException(Exception exception) => _ = _source.TrySetException(exception);
+        public void SetException(Exception exception)
+            => _ = _source.TrySetException(exception);
 
         public Task<T> Task => _source.Task;
 
diff --git a/src/DotPulsar/Internal/Channel.cs b/src/DotPulsar/Internal/Channel.cs
index 2d5d73e..9fe0819 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using Abstractions;
+    using Events;
+
     public sealed class Channel : IChannel
     {
         private readonly Guid _correlationId;
@@ -31,13 +31,28 @@
             _enqueue = enqueue;
         }
 
-        public void Received(MessagePackage message) => _enqueue.Enqueue(message);
-        public void Activated() => _eventRegister.Register(new ChannelActivated(_correlationId));
-        public void ClosedByServer() => _eventRegister.Register(new ChannelClosedByServer(_correlationId));
-        public void Connected() => _eventRegister.Register(new ChannelConnected(_correlationId));
-        public void Deactivated() => _eventRegister.Register(new ChannelDeactivated(_correlationId));
-        public void Disconnected() => _eventRegister.Register(new ChannelDisconnected(_correlationId));
-        public void ReachedEndOfTopic() => _eventRegister.Register(new ChannelReachedEndOfTopic(_correlationId));
-        public void Unsubscribed() => _eventRegister.Register(new ChannelUnsubscribed(_correlationId));
+        public void Received(MessagePackage message)
+            => _enqueue.Enqueue(message);
+
+        public void Activated()
+            => _eventRegister.Register(new ChannelActivated(_correlationId));
+
+        public void ClosedByServer()
+            => _eventRegister.Register(new ChannelClosedByServer(_correlationId));
+
+        public void Connected()
+            => _eventRegister.Register(new ChannelConnected(_correlationId));
+
+        public void Deactivated()
+            => _eventRegister.Register(new ChannelDeactivated(_correlationId));
+
+        public void Disconnected()
+            => _eventRegister.Register(new ChannelDisconnected(_correlationId));
+
+        public void ReachedEndOfTopic()
+            => _eventRegister.Register(new ChannelReachedEndOfTopic(_correlationId));
+
+        public void Unsubscribed()
+            => _eventRegister.Register(new ChannelUnsubscribed(_correlationId));
     }
 }
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index d03e99d..b0d6b27 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Buffers;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Extensions;
+    using PulsarApi;
+
     public sealed class ChannelManager : IDisposable
     {
         private readonly IdLookup<IChannel> _consumerChannels;
@@ -32,12 +32,14 @@
             _producerChannels = new IdLookup<IChannel>();
         }
 
-        public bool HasChannels() => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
+        public bool HasChannels()
+            => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
 
         public Task<ProducerResponse> Outgoing(CommandProducer command, Task<BaseCommand> response, IChannel channel)
         {
             var producerId = _producerChannels.Add(channel);
             command.ProducerId = producerId;
+
             return response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Error)
@@ -45,6 +47,7 @@
                     _producerChannels.Remove(producerId);
                     result.Result.Error.Throw();
                 }
+
                 channel.Connected();
                 return new ProducerResponse(producerId, result.Result.ProducerSuccess.ProducerName);
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
@@ -54,6 +57,7 @@
         {
             var consumerId = _consumerChannels.Add(channel);
             command.ConsumerId = consumerId;
+
             return response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Error)
@@ -61,6 +65,7 @@
                     _consumerChannels.Remove(consumerId);
                     result.Result.Error.Throw();
                 }
+
                 channel.Connected();
                 return new SubscribeResponse(consumerId);
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
@@ -97,6 +102,7 @@
                 if (result.Result.CommandType == BaseCommand.Type.Success)
                 {
                     var channel = _consumerChannels.Remove(consumerId);
+
                     if (channel != null)
                         channel.Unsubscribed();
                 }
@@ -106,6 +112,7 @@
         public void Incoming(CommandCloseConsumer command)
         {
             var channel = _consumerChannels.Remove(command.ConsumerId);
+
             if (channel != null)
                 channel.ClosedByServer();
         }
@@ -113,6 +120,7 @@
         public void Incoming(CommandCloseProducer command)
         {
             var inbox = _producerChannels.Remove(command.ProducerId);
+
             if (inbox != null)
                 inbox.ClosedByServer();
         }
@@ -120,6 +128,7 @@
         public void Incoming(CommandActiveConsumerChange command)
         {
             var channel = _consumerChannels[command.ConsumerId];
+
             if (channel is null)
                 return;
 
@@ -132,6 +141,7 @@
         public void Incoming(CommandReachedEndOfTopic command)
         {
             var channel = _consumerChannels[command.ConsumerId];
+
             if (channel != null)
                 channel.ReachedEndOfTopic();
         }
@@ -139,6 +149,7 @@
         public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
         {
             var consumer = _consumerChannels[command.ConsumerId];
+
             if (consumer != null)
                 consumer.Received(new MessagePackage(command.MessageId, data));
         }
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index b5a2372..6446817 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Exceptions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Exceptions;
+    using Extensions;
+    using PulsarApi;
+
     public sealed class Connection : IConnection
     {
         private readonly AsyncLock _lock;
@@ -85,17 +85,17 @@
             return await responseTask.ConfigureAwait(false);
         }
 
-        public async Task Send(CommandPing command, CancellationToken cancellationToken) =>
-            await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+        public async Task Send(CommandPing command, CancellationToken cancellationToken)
+            => await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
 
-        public async Task Send(CommandPong command, CancellationToken cancellationToken) =>
-            await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+        public async Task Send(CommandPong command, CancellationToken cancellationToken)
+            => await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
 
-        public async Task Send(CommandAck command, CancellationToken cancellationToken) =>
-            await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+        public async Task Send(CommandAck command, CancellationToken cancellationToken)
+            => await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
 
-        public async Task Send(CommandFlow command, CancellationToken cancellationToken) =>
-            await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+        public async Task Send(CommandFlow command, CancellationToken cancellationToken)
+            => await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
 
         public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
@@ -115,17 +115,17 @@
             return await responseTask.ConfigureAwait(false);
         }
 
-        public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken) =>
-            await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+        public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken)
+            => await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
 
-        public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken) =>
-            await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+        public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken)
+            => await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
 
-        public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken) =>
-            await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+        public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken)
+            => await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
 
-        public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) =>
-            await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+        public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+            => await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
 
         public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken)
         {
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index 106d2cc..e5dce0a 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -12,17 +12,17 @@
  * limitations under the License.
  */
 
-using System;
-using System.IO;
-using System.Net;
-using System.Net.Security;
-using System.Net.Sockets;
-using System.Security.Authentication;
-using System.Security.Cryptography.X509Certificates;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.IO;
+    using System.Net;
+    using System.Net.Security;
+    using System.Net.Sockets;
+    using System.Security.Authentication;
+    using System.Security.Cryptography.X509Certificates;
+    using System.Threading.Tasks;
+
     public sealed class Connector
     {
         private readonly X509Certificate2Collection _clientCertificates;
@@ -30,7 +30,8 @@
         private readonly bool _verifyCertificateAuthority;
         private readonly bool _verifyCertificateName;
 
-        public Connector(X509Certificate2Collection clientCertificates, X509Certificate2? trustedCertificateAuthority, bool verifyCertificateAuthority, bool verifyCertificateName)
+        public Connector(X509Certificate2Collection clientCertificates, X509Certificate2? trustedCertificateAuthority, bool verifyCertificateAuthority,
+            bool verifyCertificateName)
         {
             _clientCertificates = clientCertificates;
             _trustedCertificateAuthority = trustedCertificateAuthority;
@@ -84,7 +85,7 @@
 
             try
             {
-                sslStream = new SslStream(stream, false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
+                sslStream = new SslStream(stream, false, ValidateServerCertificate, null);
                 await sslStream.AuthenticateAsClientAsync(host, _clientCertificates, SslProtocols.None, true).ConfigureAwait(false);
                 return sslStream;
             }
@@ -116,7 +117,8 @@
                     return false;
 
                 chain.ChainPolicy.ExtraStore.Add(_trustedCertificateAuthority);
-                _ = chain.Build((X509Certificate2)certificate);
+                _ = chain.Build((X509Certificate2) certificate);
+
                 for (var i = 0; i < chain.ChainElements.Count; i++)
                 {
                     if (chain.ChainElements[i].Certificate.Thumbprint == _trustedCertificateAuthority.Thumbprint)
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index c1d5f1c..6e6a51c 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System.Reflection;
-
 namespace DotPulsar.Internal
 {
+    using System.Reflection;
+
     public static class Constants
     {
         static Constants()
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index d239d83..5f0e763 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -12,19 +12,19 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Collections.Generic;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+    using PulsarApi;
+
     public sealed class Consumer : IConsumer
     {
         private readonly Guid _correlationId;
@@ -59,9 +59,11 @@
         public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
             => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
 
-        public bool IsFinalState() => _state.IsFinalState();
+        public bool IsFinalState()
+            => _state.IsFinalState();
 
-        public bool IsFinalState(ConsumerState state) => _state.IsFinalState(state);
+        public bool IsFinalState(ConsumerState state)
+            => _state.IsFinalState(state);
 
         public async ValueTask DisposeAsync()
         {
@@ -77,9 +79,7 @@
             ThrowIfDisposed();
 
             while (!cancellationToken.IsCancellationRequested)
-            {
                 yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
-            }
         }
 
         public async ValueTask Acknowledge(Message message, CancellationToken cancellationToken)
@@ -105,19 +105,21 @@
             ThrowIfDisposed();
             var seek = new CommandSeek { MessageId = messageId.Data };
             _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
-            return;
         }
 
         public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken).ConfigureAwait(false);
+
+            var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken)
+                .ConfigureAwait(false);
             return new MessageId(response.LastMessageId);
         }
 
         private async ValueTask Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
+
             await _executor.Execute(() =>
             {
                 _cachedCommandAck.Type = ackType;
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 64b6c7a..15b2875 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-
 namespace DotPulsar.Internal
 {
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+
     public sealed class ConsumerBuilder : IConsumerBuilder
     {
         private readonly IPulsarClient _pulsarClient;
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 9a05dce..44cfd94 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Extensions;
+    using PulsarApi;
+
     public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
     {
         private readonly ulong _id;
@@ -57,6 +57,7 @@
                 _sendWhenZero--;
 
                 var message = _batchHandler.GetNext();
+
                 if (message != null)
                     return message;
 
@@ -82,9 +83,11 @@
         public async Task Send(CommandAck command, CancellationToken cancellationToken)
         {
             var messageId = command.MessageIds[0];
+
             if (messageId.BatchIndex != -1)
             {
                 var batchMessageId = _batchHandler.Acknowledge(messageId);
+
                 if (batchMessageId is null)
                     return;
 
@@ -140,7 +143,7 @@
 
             if (_firstFlow)
             {
-                _cachedCommandFlow.MessagePermits = (uint)Math.Ceiling(_cachedCommandFlow.MessagePermits * 0.5);
+                _cachedCommandFlow.MessagePermits = (uint) Math.Ceiling(_cachedCommandFlow.MessagePermits * 0.5);
                 _firstFlow = false;
             }
 
@@ -149,11 +152,7 @@
 
         private async Task RejectPackage(MessagePackage messagePackage, CancellationToken cancellationToken)
         {
-            var ack = new CommandAck
-            {
-                Type = CommandAck.AckType.Individual,
-                validation_error = CommandAck.ValidationError.ChecksumMismatch
-            };
+            var ack = new CommandAck { Type = CommandAck.AckType.Individual, validation_error = CommandAck.ValidationError.ChecksumMismatch };
 
             ack.MessageIds.Add(messagePackage.MessageId);
 
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index aa07a40..183c0e3 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -12,14 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using PulsarApi;
+
     public sealed class ConsumerChannelFactory : IConsumerChannelFactory
     {
         private readonly Guid _correlationId;
@@ -46,12 +46,12 @@
             _subscribe = new CommandSubscribe
             {
                 ConsumerName = options.ConsumerName,
-                initialPosition = (CommandSubscribe.InitialPosition)options.InitialPosition,
+                initialPosition = (CommandSubscribe.InitialPosition) options.InitialPosition,
                 PriorityLevel = options.PriorityLevel,
                 ReadCompacted = options.ReadCompacted,
                 Subscription = options.SubscriptionName,
                 Topic = options.Topic,
-                Type = (CommandSubscribe.SubType)options.SubscriptionType
+                Type = (CommandSubscribe.SubType) options.SubscriptionType
             };
 
             _batchHandler = new BatchHandler(true);
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs b/src/DotPulsar/Internal/ConsumerProcess.cs
index af75d0f..7db9994 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading.Tasks;
+    using Abstractions;
+
     public sealed class ConsumerProcess : Process
     {
         private readonly IStateManager<ConsumerState> _stateManager;
@@ -38,7 +38,7 @@
             _isFailoverSubscription = isFailoverSubscription;
         }
 
-        public async override ValueTask DisposeAsync()
+        public override async ValueTask DisposeAsync()
         {
             _stateManager.SetState(ConsumerState.Closed);
             CancellationTokenSource.Cancel();
diff --git a/src/DotPulsar/Internal/Crc32C.cs b/src/DotPulsar/Internal/Crc32C.cs
index b348189..2dfac40 100644
--- a/src/DotPulsar/Internal/Crc32C.cs
+++ b/src/DotPulsar/Internal/Crc32C.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System.Buffers;
-
 namespace DotPulsar.Internal
 {
+    using System.Buffers;
+
     public static class Crc32C
     {
         private const uint Generator = 0x82F63B78u;
@@ -29,13 +29,13 @@
             for (uint i = 0; i < 256; i++)
             {
                 var entry = i;
+
                 for (var j = 0; j < 16; j++)
                 {
                     for (var k = 0; k < 8; k++)
-                    {
-                        entry = (entry & 1) == 1 ? Generator ^ (entry >> 1) : (entry >> 1);
-                    }
-                    Lookup[(j * 256) + i] = entry;
+                        entry = (entry & 1) == 1 ? Generator ^ (entry >> 1) : entry >> 1;
+
+                    Lookup[j * 256 + i] = entry;
                 }
             }
         }
@@ -51,20 +51,21 @@
             foreach (var memory in sequence)
             {
                 var span = memory.Span;
+
                 for (var i = 0; i < span.Length; ++i)
                 {
                     var currentByte = span[i];
 
                     if (!readingBlock)
                     {
-                        checksum = Lookup[(byte)(checksum ^ currentByte)] ^ checksum >> 8;
+                        checksum = Lookup[(byte) (checksum ^ currentByte)] ^ (checksum >> 8);
                         continue;
                     }
 
                     var offSetBase = offset * 256;
 
                     if (offset > 11)
-                        block[offset] = Lookup[offSetBase + ((byte)(checksum >> (8 * (15 - offset))) ^ currentByte)];
+                        block[offset] = Lookup[offSetBase + ((byte) (checksum >> (8 * (15 - offset))) ^ currentByte)];
                     else
                         block[offset] = Lookup[offSetBase + currentByte];
 
@@ -75,6 +76,7 @@
                         offset = 15;
                         readingBlock = remaningBytes >= 16;
                         checksum = 0;
+
                         for (var j = 0; j < block.Length; ++j)
                             checksum ^= block[j];
                     }
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index 4c8b4f0..855657c 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -12,25 +12,27 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Exceptions;
-using System;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Net.Sockets;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Exceptions;
+
     public sealed class DefaultExceptionHandler : IHandleException
     {
         private readonly TimeSpan _retryInterval;
 
-        public DefaultExceptionHandler(TimeSpan retryInterval) => _retryInterval = retryInterval;
+        public DefaultExceptionHandler(TimeSpan retryInterval)
+            => _retryInterval = retryInterval;
 
         public async ValueTask OnException(ExceptionContext exceptionContext)
         {
             exceptionContext.Result = DetermineFaultAction(exceptionContext.Exception, exceptionContext.CancellationToken);
+
             if (exceptionContext.Result == FaultAction.Retry)
                 await Task.Delay(_retryInterval, exceptionContext.CancellationToken).ConfigureAwait(false);
 
@@ -41,15 +43,24 @@
         {
             switch (exception)
             {
-                case TooManyRequestsException _: return FaultAction.Retry;
-                case ChannelNotReadyException _: return FaultAction.Retry;
-                case ServiceNotReadyException _: return FaultAction.Retry;
-                case ConnectionDisposedException _: return FaultAction.Retry;
-                case AsyncLockDisposedException _: return FaultAction.Retry;
-                case PulsarStreamDisposedException _: return FaultAction.Retry;
-                case AsyncQueueDisposedException _: return FaultAction.Retry;
-                case OperationCanceledException _: return cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry;
-                case DotPulsarException _: return FaultAction.Rethrow;
+                case TooManyRequestsException _:
+                    return FaultAction.Retry;
+                case ChannelNotReadyException _:
+                    return FaultAction.Retry;
+                case ServiceNotReadyException _:
+                    return FaultAction.Retry;
+                case ConnectionDisposedException _:
+                    return FaultAction.Retry;
+                case AsyncLockDisposedException _:
+                    return FaultAction.Retry;
+                case PulsarStreamDisposedException _:
+                    return FaultAction.Retry;
+                case AsyncQueueDisposedException _:
+                    return FaultAction.Retry;
+                case OperationCanceledException _:
+                    return cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry;
+                case DotPulsarException _:
+                    return FaultAction.Rethrow;
                 case SocketException socketException:
                     switch (socketException.SocketErrorCode)
                     {
@@ -58,6 +69,7 @@
                         case SocketError.NetworkUnreachable:
                             return FaultAction.Rethrow;
                     }
+
                     return FaultAction.Retry;
             }
 
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index b9f2e95..3042270 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using System.Diagnostics.Tracing;
-using System.Threading;
-
 namespace DotPulsar.Internal
 {
 #if NETSTANDARD2_1
+    using System.Diagnostics.Tracing;
+    using System.Threading;
+
     public sealed class DotPulsarEventSource : EventSource
     {
         private readonly PollingCounter _totalClientsCounter;
@@ -164,16 +164,25 @@
     public sealed class DotPulsarEventSource
     {
         public static readonly DotPulsarEventSource Log = new DotPulsarEventSource();
-        public DotPulsarEventSource() { }
+
         public void ClientCreated() { }
+
         public void ClientDisposed() { }
+
         public void ConnectionCreated() { }
+
         public void ConnectionDisposed() { }
+
         public void ConsumerCreated() { }
+
         public void ConsumerDisposed() { }
+
         public void ProducerCreated() { }
+
         public void ProducerDisposed() { }
+
         public void ReaderCreated() { }
+
         public void ReaderDisposed() { }
     }
 #endif
diff --git a/src/DotPulsar/Internal/Events/ChannelActivated.cs b/src/DotPulsar/Internal/Events/ChannelActivated.cs
index 1f8df0a..d5d4d44 100644
--- a/src/DotPulsar/Internal/Events/ChannelActivated.cs
+++ b/src/DotPulsar/Internal/Events/ChannelActivated.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ChannelActivated : IEvent
     {
-        public ChannelActivated(Guid correlationId) => CorrelationId = correlationId;
+        public ChannelActivated(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
     }
diff --git a/src/DotPulsar/Internal/Events/ChannelClosedByServer.cs b/src/DotPulsar/Internal/Events/ChannelClosedByServer.cs
index 6b64a53..7c14bbf 100644
--- a/src/DotPulsar/Internal/Events/ChannelClosedByServer.cs
+++ b/src/DotPulsar/Internal/Events/ChannelClosedByServer.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ChannelClosedByServer : IEvent
     {
-        public ChannelClosedByServer(Guid correlationId) => CorrelationId = correlationId;
+        public ChannelClosedByServer(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
     }
diff --git a/src/DotPulsar/Internal/Events/ChannelConnected.cs b/src/DotPulsar/Internal/Events/ChannelConnected.cs
index 63b1b36..83cc366 100644
--- a/src/DotPulsar/Internal/Events/ChannelConnected.cs
+++ b/src/DotPulsar/Internal/Events/ChannelConnected.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ChannelConnected : IEvent
     {
-        public ChannelConnected(Guid correlationId) => CorrelationId = correlationId;
+        public ChannelConnected(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
     }
diff --git a/src/DotPulsar/Internal/Events/ChannelDeactivated.cs b/src/DotPulsar/Internal/Events/ChannelDeactivated.cs
index 0ab9a0d..fa47c19 100644
--- a/src/DotPulsar/Internal/Events/ChannelDeactivated.cs
+++ b/src/DotPulsar/Internal/Events/ChannelDeactivated.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ChannelDeactivated : IEvent
     {
-        public ChannelDeactivated(Guid correlationId) => CorrelationId = correlationId;
+        public ChannelDeactivated(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
     }
diff --git a/src/DotPulsar/Internal/Events/ChannelDisconnected.cs b/src/DotPulsar/Internal/Events/ChannelDisconnected.cs
index 09972bd..a3b223e 100644
--- a/src/DotPulsar/Internal/Events/ChannelDisconnected.cs
+++ b/src/DotPulsar/Internal/Events/ChannelDisconnected.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ChannelDisconnected : IEvent
     {
-        public ChannelDisconnected(Guid correlationId) => CorrelationId = correlationId;
+        public ChannelDisconnected(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
     }
diff --git a/src/DotPulsar/Internal/Events/ChannelReachedEndOfTopic.cs b/src/DotPulsar/Internal/Events/ChannelReachedEndOfTopic.cs
index 6edfa94..7f16ec5 100644
--- a/src/DotPulsar/Internal/Events/ChannelReachedEndOfTopic.cs
+++ b/src/DotPulsar/Internal/Events/ChannelReachedEndOfTopic.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ChannelReachedEndOfTopic : IEvent
     {
-        public ChannelReachedEndOfTopic(Guid correlationId) => CorrelationId = correlationId;
+        public ChannelReachedEndOfTopic(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
     }
diff --git a/src/DotPulsar/Internal/Events/ChannelUnsubscribed.cs b/src/DotPulsar/Internal/Events/ChannelUnsubscribed.cs
index c53e716..eae9ef4 100644
--- a/src/DotPulsar/Internal/Events/ChannelUnsubscribed.cs
+++ b/src/DotPulsar/Internal/Events/ChannelUnsubscribed.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ChannelUnsubscribed : IEvent
     {
-        public ChannelUnsubscribed(Guid correlationId) => CorrelationId = correlationId;
+        public ChannelUnsubscribed(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
     }
diff --git a/src/DotPulsar/Internal/Events/ConsumerCreated.cs b/src/DotPulsar/Internal/Events/ConsumerCreated.cs
index db1b88b..0add8f0 100644
--- a/src/DotPulsar/Internal/Events/ConsumerCreated.cs
+++ b/src/DotPulsar/Internal/Events/ConsumerCreated.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ConsumerCreated : IEvent
     {
         public ConsumerCreated(Guid correlationId, Consumer consumer)
diff --git a/src/DotPulsar/Internal/Events/ConsumerDisposed.cs b/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
index 074e1db..be05a6f 100644
--- a/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ConsumerDisposed : IEvent
     {
         public ConsumerDisposed(Guid correlationId, Consumer consumer)
diff --git a/src/DotPulsar/Internal/Events/ExecutorFaulted.cs b/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
index 8fc30ef..2b4c356 100644
--- a/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
+++ b/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ExecutorFaulted : IEvent
     {
-        public ExecutorFaulted(Guid correlationId) => CorrelationId = correlationId;
+        public ExecutorFaulted(Guid correlationId)
+            => CorrelationId = correlationId;
 
         public Guid CorrelationId { get; }
     }
diff --git a/src/DotPulsar/Internal/Events/ProducerCreated.cs b/src/DotPulsar/Internal/Events/ProducerCreated.cs
index 679d7fc..8df093e 100644
--- a/src/DotPulsar/Internal/Events/ProducerCreated.cs
+++ b/src/DotPulsar/Internal/Events/ProducerCreated.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ProducerCreated : IEvent
     {
         public ProducerCreated(Guid correlationId, Producer producer)
diff --git a/src/DotPulsar/Internal/Events/ProducerDisposed.cs b/src/DotPulsar/Internal/Events/ProducerDisposed.cs
index a348086..7de116e 100644
--- a/src/DotPulsar/Internal/Events/ProducerDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ProducerDisposed.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ProducerDisposed : IEvent
     {
         public ProducerDisposed(Guid correlationId, Producer producer)
diff --git a/src/DotPulsar/Internal/Events/ReaderCreated.cs b/src/DotPulsar/Internal/Events/ReaderCreated.cs
index 7a354fa..beb0750 100644
--- a/src/DotPulsar/Internal/Events/ReaderCreated.cs
+++ b/src/DotPulsar/Internal/Events/ReaderCreated.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ReaderCreated : IEvent
     {
         public ReaderCreated(Guid correlationId, Reader reader)
diff --git a/src/DotPulsar/Internal/Events/ReaderDisposed.cs b/src/DotPulsar/Internal/Events/ReaderDisposed.cs
index a8647d2..ba1cb11 100644
--- a/src/DotPulsar/Internal/Events/ReaderDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ReaderDisposed.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-
 namespace DotPulsar.Internal.Events
 {
+    using System;
+    using Abstractions;
+
     public sealed class ReaderDisposed : IEvent
     {
         public ReaderDisposed(Guid correlationId, Reader reader)
diff --git a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
index ebf164c..fd5f619 100644
--- a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
+++ b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
@@ -12,24 +12,26 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Threading.Tasks;
+    using DotPulsar.Abstractions;
+
     public sealed class ExceptionHandlerPipeline : IHandleException
     {
         private readonly IHandleException[] _handlers;
 
-        public ExceptionHandlerPipeline(IEnumerable<IHandleException> handlers) => _handlers = handlers.ToArray();
+        public ExceptionHandlerPipeline(IEnumerable<IHandleException> handlers)
+            => _handlers = handlers.ToArray();
 
         public async ValueTask OnException(ExceptionContext exceptionContext)
         {
             foreach (var handler in _handlers)
             {
                 await handler.OnException(exceptionContext).ConfigureAwait(false);
+
                 if (exceptionContext.ExceptionHandled)
                     break;
             }
diff --git a/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs b/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
index b3dbbdd..0efb519 100644
--- a/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
+++ b/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
@@ -1,7 +1,21 @@
-using System;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 namespace DotPulsar.Internal.Exceptions
 {
+    using System;
+
     public sealed class AsyncLockDisposedException : ObjectDisposedException
     {
         public AsyncLockDisposedException() : base(typeof(AsyncLock).FullName) { }
diff --git a/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs b/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
index 27156cd..ea5f5fd 100644
--- a/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
+++ b/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
@@ -1,7 +1,21 @@
-using System;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 namespace DotPulsar.Internal.Exceptions
 {
+    using System;
+
     public sealed class AsyncQueueDisposedException : ObjectDisposedException
     {
         public AsyncQueueDisposedException() : base(typeof(AsyncQueue<>).FullName) { }
diff --git a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
index 649420d..64178cd 100644
--- a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
+++ b/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using DotPulsar.Exceptions;
-
 namespace DotPulsar.Internal.Exceptions
 {
+    using DotPulsar.Exceptions;
+
     public sealed class ChannelNotReadyException : DotPulsarException
     {
         public ChannelNotReadyException() : base("The service is not ready yet") { }
diff --git a/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs b/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
index e445908..7b0b1d4 100644
--- a/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
+++ b/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
@@ -1,7 +1,21 @@
-using System;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 namespace DotPulsar.Internal.Exceptions
 {
+    using System;
+
     public sealed class ConnectionDisposedException : ObjectDisposedException
     {
         public ConnectionDisposedException() : base(typeof(Connection).FullName) { }
diff --git a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs b/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
index f34323f..ade8ecd 100644
--- a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
+++ b/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using DotPulsar.Exceptions;
-
 namespace DotPulsar.Internal.Exceptions
 {
+    using DotPulsar.Exceptions;
+
     public sealed class ConsumerNotFoundException : DotPulsarException
     {
         public ConsumerNotFoundException(string message) : base(message) { }
diff --git a/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs b/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
index 8bd4567..b4ec0a0 100644
--- a/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
+++ b/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
@@ -1,7 +1,21 @@
-using System;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 namespace DotPulsar.Internal.Exceptions
 {
+    using System;
+
     public sealed class PulsarStreamDisposedException : ObjectDisposedException
     {
         public PulsarStreamDisposedException() : base(typeof(PulsarStream).FullName) { }
diff --git a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
index 0e5e313..d7f2975 100644
--- a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
+++ b/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using DotPulsar.Exceptions;
-
 namespace DotPulsar.Internal.Exceptions
 {
+    using DotPulsar.Exceptions;
+
     public sealed class ServiceNotReadyException : DotPulsarException
     {
         public ServiceNotReadyException(string message) : base(message) { }
diff --git a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs b/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
index b2949a5..80c91c2 100644
--- a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
+++ b/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using DotPulsar.Exceptions;
-
 namespace DotPulsar.Internal.Exceptions
 {
+    using DotPulsar.Exceptions;
+
     public sealed class TooManyRequestsException : DotPulsarException
     {
         public TooManyRequestsException(string message) : base(message) { }
diff --git a/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs b/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
index b72df3b..e8a1f98 100644
--- a/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
+++ b/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
@@ -12,14 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Exceptions;
-using System;
-
 namespace DotPulsar.Internal.Exceptions
 {
+    using System;
+    using DotPulsar.Exceptions;
+
     public sealed class UnexpectedResponseException : DotPulsarException
     {
         public UnexpectedResponseException(string message) : base(message) { }
+
         public UnexpectedResponseException(string message, Exception innerException) : base(message, innerException) { }
     }
 }
diff --git a/src/DotPulsar/Internal/Executor.cs b/src/DotPulsar/Internal/Executor.cs
index f5ba80b..6463ce5 100644
--- a/src/DotPulsar/Internal/Executor.cs
+++ b/src/DotPulsar/Internal/Executor.cs
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+
     public sealed class Executor : IExecute
     {
         private readonly Guid _correlationId;
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 4c80d2e..8cf27ab 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Exceptions;
-using DotPulsar.Internal.PulsarApi;
-
 namespace DotPulsar.Internal.Extensions
 {
+    using DotPulsar.Exceptions;
+    using Exceptions;
+    using PulsarApi;
+
     public static class CommandExtensions
     {
         public static void Expect(this BaseCommand command, BaseCommand.Type type)
@@ -38,165 +38,137 @@
             throw new UnexpectedResponseException($"Expected '{type}' but got '{command.CommandType}'");
         }
 
-        public static void Throw(this CommandSendError command) => Throw(command.Error, command.Message);
+        public static void Throw(this CommandSendError command)
+            => Throw(command.Error, command.Message);
 
-        public static void Throw(this CommandLookupTopicResponse command) => Throw(command.Error, command.Message);
+        public static void Throw(this CommandLookupTopicResponse command)
+            => Throw(command.Error, command.Message);
 
-        public static void Throw(this CommandError error) => Throw(error.Error, error.Message);
+        public static void Throw(this CommandError error)
+            => Throw(error.Error, error.Message);
 
         private static void Throw(ServerError error, string message)
-        {
-            switch (error)
+            => throw (error switch
             {
-                case ServerError.AuthenticationError: throw new AuthenticationException(message);
-                case ServerError.AuthorizationError: throw new AuthorizationException(message);
-                case ServerError.ChecksumError: throw new ChecksumException(message);
-                case ServerError.ConsumerAssignError: throw new ConsumerAssignException(message);
-                case ServerError.ConsumerBusy: throw new ConsumerBusyException(message);
-                case ServerError.ConsumerNotFound: throw new ConsumerNotFoundException(message);
-                case ServerError.IncompatibleSchema: throw new IncompatibleSchemaException(message);
-                case ServerError.InvalidTopicName: throw new InvalidTopicNameException(message);
-                case ServerError.MetadataError: throw new MetadataException(message);
-                case ServerError.PersistenceError: throw new PersistenceException(message);
-                case ServerError.ProducerBlockedQuotaExceededError:
-                case ServerError.ProducerBlockedQuotaExceededException:
-                    throw new ProducerBlockedQuotaExceededException(message + ". Error code: " + error);
-                case ServerError.ProducerBusy: throw new ProducerBusyException(message);
-                case ServerError.ServiceNotReady: throw new ServiceNotReadyException(message);
-                case ServerError.SubscriptionNotFound: throw new SubscriptionNotFoundException(message);
-                case ServerError.TooManyRequests: throw new TooManyRequestsException(message);
-                case ServerError.TopicNotFound: throw new TopicNotFoundException(message);
-                case ServerError.TopicTerminatedError: throw new TopicTerminatedException(message);
-                case ServerError.UnsupportedVersionError: throw new UnsupportedVersionException(message);
-                case ServerError.UnknownError:
-                default: throw new UnknownException(message + ". Error code: " + error);
-            }
-        }
+                ServerError.AuthenticationError => new AuthenticationException(message),
+                ServerError.AuthorizationError => new AuthorizationException(message),
+                ServerError.ChecksumError => new ChecksumException(message),
+                ServerError.ConsumerAssignError => new ConsumerAssignException(message),
+                ServerError.ConsumerBusy => new ConsumerBusyException(message),
+                ServerError.ConsumerNotFound => new ConsumerNotFoundException(message),
+                ServerError.IncompatibleSchema => new IncompatibleSchemaException(message),
+                ServerError.InvalidTopicName => new InvalidTopicNameException(message),
+                ServerError.MetadataError => new MetadataException(message),
+                ServerError.PersistenceError => new PersistenceException(message),
+                ServerError.ProducerBlockedQuotaExceededError => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
+                ServerError.ProducerBlockedQuotaExceededException => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
+                ServerError.ProducerBusy => new ProducerBusyException(message),
+                ServerError.ServiceNotReady => new ServiceNotReadyException(message),
+                ServerError.SubscriptionNotFound => new SubscriptionNotFoundException(message),
+                ServerError.TooManyRequests => new TooManyRequestsException(message),
+                ServerError.TopicNotFound => new TopicNotFoundException(message),
+                ServerError.TopicTerminatedError => new TopicTerminatedException(message),
+                ServerError.UnsupportedVersionError => new UnsupportedVersionException(message),
+                ServerError.UnknownError => new UnknownException($"{message}. Error code: {error}"),
+                _ => new UnknownException($"{message}. Error code: {error}")
+            });
 
         public static BaseCommand AsBaseCommand(this CommandAck command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Ack,
                 Ack = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandConnect command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Connect,
                 Connect = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandPing command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Ping,
                 Ping = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandPong command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Pong,
                 Pong = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandProducer command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Producer,
                 Producer = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandGetLastMessageId command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.GetLastMessageId,
                 GetLastMessageId = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandUnsubscribe command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Unsubscribe,
                 Unsubscribe = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandSubscribe command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Subscribe,
                 Subscribe = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandLookupTopic command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Lookup,
                 LookupTopic = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandSend command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Send,
                 Send = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandFlow command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Flow,
                 Flow = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandCloseProducer command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.CloseProducer,
                 CloseProducer = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandCloseConsumer command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.CloseConsumer,
                 CloseConsumer = command
             };
-        }
 
         public static BaseCommand AsBaseCommand(this CommandSeek command)
-        {
-            return new BaseCommand
+            => new BaseCommand
             {
                 CommandType = BaseCommand.Type.Seek,
                 Seek = command
             };
-        }
     }
 }
diff --git a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
index 31aa5bf..e653650 100644
--- a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using System;
-using Metadata = DotPulsar.Internal.PulsarApi.MessageMetadata;
-
 namespace DotPulsar.Internal.Extensions
 {
+    using System;
+    using Metadata = PulsarApi.MessageMetadata;
+
     public static class MessageMetadataExtensions
     {
         public static DateTimeOffset GetDeliverAtTimeAsDateTimeOffset(this Metadata metadata)
@@ -26,10 +26,10 @@
             => metadata.DeliverAtTime = timestamp.ToUnixTimeMilliseconds();
 
         public static DateTimeOffset GetEventTimeAsDateTimeOffset(this Metadata metadata)
-            => DateTimeOffset.FromUnixTimeMilliseconds((long)metadata.EventTime);
+            => DateTimeOffset.FromUnixTimeMilliseconds((long) metadata.EventTime);
 
         public static void SetEventTime(this Metadata metadata, DateTimeOffset timestamp)
-            => metadata.EventTime = (ulong)timestamp.ToUnixTimeMilliseconds();
+            => metadata.EventTime = (ulong) timestamp.ToUnixTimeMilliseconds();
 
         public static byte[]? GetKeyAsBytes(this Metadata metadata)
             => metadata.PartitionKeyB64Encoded ? Convert.FromBase64String(metadata.PartitionKey) : null;
diff --git a/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
index 5a94334..2c53da0 100644
--- a/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
@@ -12,17 +12,18 @@
  * limitations under the License.
  */
 
-using System.Buffers;
-
 namespace DotPulsar.Internal.Extensions
 {
+    using System.Buffers;
+    using PulsarApi;
+
     public static class MessagePackageExtensions
     {
         public static uint GetMetadataSize(this MessagePackage package)
             => package.Data.ReadUInt32(Constants.MetadataSizeOffset, true);
 
-        public static PulsarApi.MessageMetadata ExtractMetadata(this MessagePackage package, uint metadataSize)
-            => Serializer.Deserialize<PulsarApi.MessageMetadata>(package.Data.Slice(Constants.MetadataOffset, metadataSize));
+        public static MessageMetadata ExtractMetadata(this MessagePackage package, uint metadataSize)
+            => Serializer.Deserialize<MessageMetadata>(package.Data.Slice(Constants.MetadataOffset, metadataSize));
 
         public static ReadOnlySequence<byte> ExtractData(this MessagePackage package, uint metadataSize)
             => package.Data.Slice(Constants.MetadataOffset + metadataSize);
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
index e530e46..5312166 100644
--- a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using System;
-using System.Buffers;
-
 namespace DotPulsar.Internal.Extensions
 {
+    using System;
+    using System.Buffers;
+
     public static class ReadOnlySequenceExtensions
     {
         public static bool StartsWith<T>(this ReadOnlySequence<T> sequence, ReadOnlyMemory<T> target) where T : IEquatable<T>
@@ -30,12 +30,14 @@
             foreach (var memory in sequence)
             {
                 var span = memory.Span;
+
                 for (var i = 0; i < span.Length; ++i)
                 {
                     if (!span[i].Equals(targetSpan[targetIndex]))
                         return false;
 
                     ++targetIndex;
+
                     if (targetIndex == targetSpan.Length)
                         return true;
                 }
@@ -46,7 +48,7 @@
 
         public static uint ReadUInt32(this ReadOnlySequence<byte> sequence, long start, bool isBigEndian)
         {
-            if (sequence.Length < (4 + start))
+            if (sequence.Length < 4 + start)
                 throw new ArgumentOutOfRangeException(nameof(start), start, "Sequence must be at least 4 bytes long from 'start' to end");
 
             var reverse = isBigEndian != BitConverter.IsLittleEndian;
@@ -62,31 +64,41 @@
                 }
 
                 var span = memory.Span;
-                for (var i = (int)start; i < span.Length; ++i, ++read)
+
+                for (var i = (int) start; i < span.Length; ++i, ++read)
                 {
                     switch (read)
                     {
                         case 0:
-                            if (reverse) union.B0 = span[i];
-                            else union.B3 = span[i];
+                            if (reverse)
+                                union.B0 = span[i];
+                            else
+                                union.B3 = span[i];
                             continue;
                         case 1:
-                            if (reverse) union.B1 = span[i];
-                            else union.B2 = span[i];
+                            if (reverse)
+                                union.B1 = span[i];
+                            else
+                                union.B2 = span[i];
                             continue;
                         case 2:
-                            if (reverse) union.B2 = span[i];
-                            else union.B1 = span[i];
+                            if (reverse)
+                                union.B2 = span[i];
+                            else
+                                union.B1 = span[i];
                             continue;
                         case 3:
-                            if (reverse) union.B3 = span[i];
-                            else union.B0 = span[i];
+                            if (reverse)
+                                union.B3 = span[i];
+                            else
+                                union.B0 = span[i];
                             break;
                     }
                 }
 
                 if (read == 3)
                     break;
+                
                 start = 0;
             }
 
diff --git a/src/DotPulsar/Internal/FuncExceptionHandler.cs b/src/DotPulsar/Internal/FuncExceptionHandler.cs
index 6e22d97..122e727 100644
--- a/src/DotPulsar/Internal/FuncExceptionHandler.cs
+++ b/src/DotPulsar/Internal/FuncExceptionHandler.cs
@@ -1,15 +1,17 @@
-using DotPulsar.Abstractions;
-using System;
-using System.Threading.Tasks;
-
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading.Tasks;
+    using DotPulsar.Abstractions;
+
     public sealed class FuncExceptionHandler : IHandleException
     {
         private readonly Func<ExceptionContext, ValueTask> _exceptionHandler;
 
-        public FuncExceptionHandler(Func<ExceptionContext, ValueTask> exceptionHandler) => _exceptionHandler = exceptionHandler;
+        public FuncExceptionHandler(Func<ExceptionContext, ValueTask> exceptionHandler)
+            => _exceptionHandler = exceptionHandler;
 
-        public ValueTask OnException(ExceptionContext exceptionContext) => _exceptionHandler(exceptionContext);
+        public ValueTask OnException(ExceptionContext exceptionContext)
+            => _exceptionHandler(exceptionContext);
     }
 }
diff --git a/src/DotPulsar/Internal/IdLookup.cs b/src/DotPulsar/Internal/IdLookup.cs
index 60c010a..85c7ee0 100644
--- a/src/DotPulsar/Internal/IdLookup.cs
+++ b/src/DotPulsar/Internal/IdLookup.cs
@@ -12,15 +12,16 @@
  * limitations under the License.
  */
 
-using System.Collections.Generic;
-
 namespace DotPulsar.Internal
 {
+    using System.Collections.Generic;
+
     public sealed class IdLookup<T> where T : class
     {
         private T?[] _items;
 
-        public IdLookup() => _items = new T[1];
+        public IdLookup()
+            => _items = new T[1];
 
         public bool IsEmpty()
         {
@@ -46,7 +47,7 @@
                         continue;
 
                     _items[i] = item;
-                    return (ulong)i;
+                    return (ulong) i;
                 }
 
                 var newArray = new T[_items.Length + 1];
@@ -54,7 +55,7 @@
                 var id = newArray.Length - 1;
                 newArray[id] = item;
                 _items = newArray;
-                return (ulong)id;
+                return (ulong) id;
             }
         }
 
@@ -62,8 +63,8 @@
         {
             lock (_items)
             {
-                var item = _items[(int)id];
-                _items[(int)id] = null;
+                var item = _items[(int) id];
+                _items[(int) id] = null;
                 return item;
             }
         }
@@ -73,15 +74,18 @@
             lock (_items)
             {
                 var items = new List<T>();
+
                 for (var i = 0; i < _items.Length; ++i)
                 {
                     var item = _items[i];
+
                     if (item != null)
                     {
                         items.Add(item);
                         _items[i] = null;
                     }
                 }
+
                 return items.ToArray();
             }
         }
@@ -92,7 +96,7 @@
             {
                 lock (_items)
                 {
-                    return _items[(int)id];
+                    return _items[(int) id];
                 }
             }
         }
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index a5da955..d6e8799 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -12,14 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Internal.Extensions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using DotPulsar.Abstractions;
+    using Extensions;
+
     public sealed class MessageBuilder : IMessageBuilder
     {
         private readonly IProducer _producer;
diff --git a/src/DotPulsar/Internal/MessagePackage.cs b/src/DotPulsar/Internal/MessagePackage.cs
index 5e0803e..c2d14d7 100644
--- a/src/DotPulsar/Internal/MessagePackage.cs
+++ b/src/DotPulsar/Internal/MessagePackage.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.PulsarApi;
-using System.Buffers;
-
 namespace DotPulsar.Internal
 {
+    using System.Buffers;
+    using PulsarApi;
+
     public struct MessagePackage
     {
         public MessagePackage(MessageIdData messageId, ReadOnlySequence<byte> data)
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index ca958b7..0c4b728 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -12,35 +12,43 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Exceptions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Buffers;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Exceptions;
+    using PulsarApi;
+
     public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel, IReaderChannel
     {
-        public ValueTask DisposeAsync() => new ValueTask();
+        public ValueTask DisposeAsync()
+            => new ValueTask();
 
-        public ValueTask<Message> Receive(CancellationToken cancellationToken = default) => throw GetException();
+        public ValueTask<Message> Receive(CancellationToken cancellationToken = default)
+            => throw GetException();
 
-        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken) => throw GetException();
+        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+            => throw GetException();
 
-        public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken) =>
-            throw GetException();
+        public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+            => throw GetException();
 
-        public Task Send(CommandAck command, CancellationToken cancellationToken) => throw GetException();
+        public Task Send(CommandAck command, CancellationToken cancellationToken)
+            => throw GetException();
 
-        public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken) => throw GetException();
+        public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
+            => throw GetException();
 
-        public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken) => throw GetException();
+        public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
+            => throw GetException();
 
-        public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) => throw GetException();
+        public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+            => throw GetException();
 
-        private Exception GetException() => new ChannelNotReadyException();
+        private Exception GetException()
+            => new ChannelNotReadyException();
     }
 }
diff --git a/src/DotPulsar/Internal/ProcessManager.cs b/src/DotPulsar/Internal/ProcessManager.cs
index 0e8c7df..c6eb088 100644
--- a/src/DotPulsar/Internal/ProcessManager.cs
+++ b/src/DotPulsar/Internal/ProcessManager.cs
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Events;
+
     public sealed class ProcessManager : IRegisterEvent, IAsyncDisposable
     {
         private readonly ConcurrentDictionary<Guid, IProcess> _processes;
@@ -42,11 +42,12 @@
             await _connectionPool.DisposeAsync().ConfigureAwait(false);
         }
 
-        public void Add(IProcess process) => _processes[process.CorrelationId] = process;
+        public void Add(IProcess process)
+            => _processes[process.CorrelationId] = process;
 
         private async void Remove(Guid correlationId)
         {
-            if (_processes.TryRemove(correlationId, out IProcess process))
+            if (_processes.TryRemove(correlationId, out var process))
                 await process.DisposeAsync().ConfigureAwait(false);
         }
 
@@ -76,10 +77,12 @@
                     DotPulsarEventSource.Log.ReaderDisposed();
                     break;
                 default:
-                    if (_processes.TryGetValue(e.CorrelationId, out IProcess process))
+                    if (_processes.TryGetValue(e.CorrelationId, out var process))
                         process.Handle(e);
                     break;
-            };
+            }
+
+            ;
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 2f94e13..e993e85 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -12,17 +12,17 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Buffers;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+
     public sealed class Producer : IProducer
     {
         private readonly Guid _correlationId;
@@ -55,9 +55,11 @@
         public async ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
             => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
 
-        public bool IsFinalState() => _state.IsFinalState();
+        public bool IsFinalState()
+            => _state.IsFinalState();
 
-        public bool IsFinalState(ProducerState state) => _state.IsFinalState(state);
+        public bool IsFinalState(ProducerState state)
+            => _state.IsFinalState(state);
 
         public async ValueTask DisposeAsync()
         {
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 09f1c7a..72a7c4f 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-
 namespace DotPulsar.Internal
 {
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+
     public sealed class ProducerBuilder : IProducerBuilder
     {
         private readonly IPulsarClient _pulsarClient;
@@ -53,11 +53,7 @@
             if (string.IsNullOrEmpty(_topic))
                 throw new ConfigurationException("ProducerOptions.Topic may not be null or empty");
 
-            var options = new ProducerOptions(_topic!)
-            {
-                InitialSequenceId = _initialSequenceId,
-                ProducerName = _producerName
-            };
+            var options = new ProducerOptions(_topic!) { InitialSequenceId = _initialSequenceId, ProducerName = _producerName };
 
             return _pulsarClient.CreateProducer(options);
         }
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 0a9dff9..e281154 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -12,19 +12,19 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Buffers;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Extensions;
+    using PulsarApi;
+
     public sealed class ProducerChannel : IProducerChannel
     {
-        private readonly PulsarApi.MessageMetadata _cachedMetadata;
+        private readonly MessageMetadata _cachedMetadata;
         private readonly SendPackage _cachedSendPackage;
         private readonly ulong _id;
         private readonly SequenceId _sequenceId;
@@ -32,16 +32,9 @@
 
         public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection)
         {
-            _cachedMetadata = new PulsarApi.MessageMetadata
-            {
-                ProducerName = name
-            };
+            _cachedMetadata = new MessageMetadata { ProducerName = name };
 
-            var commandSend = new CommandSend
-            {
-                ProducerId = id,
-                NumMessages = 1
-            };
+            var commandSend = new CommandSend { ProducerId = id, NumMessages = 1 };
 
             _cachedSendPackage = new SendPackage(commandSend, _cachedMetadata);
 
@@ -69,7 +62,7 @@
             return await SendPackage(true, cancellationToken).ConfigureAwait(false);
         }
 
-        public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+        public async Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
         {
             metadata.ProducerName = _cachedMetadata.ProducerName;
             _cachedSendPackage.Metadata = metadata;
@@ -81,7 +74,7 @@
         {
             try
             {
-                _cachedSendPackage.Metadata.PublishTime = (ulong)DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+                _cachedSendPackage.Metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
 
                 if (autoAssignSequenceId)
                 {
@@ -89,7 +82,9 @@
                     _cachedSendPackage.Metadata.SequenceId = _sequenceId.Current;
                 }
                 else
+                {
                     _cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId;
+                }
 
                 var response = await _connection.Send(_cachedSendPackage, cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
@@ -102,7 +97,8 @@
             finally
             {
                 if (autoAssignSequenceId)
-                    _cachedSendPackage.Metadata.SequenceId = 0; // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
+                    _cachedSendPackage.Metadata.SequenceId =
+                        0; // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
             }
         }
     }
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index ae8c15e..4cbd01a 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -12,14 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using PulsarApi;
+
     public sealed class ProducerChannelFactory : IProducerChannelFactory
     {
         private readonly Guid _correlationId;
@@ -42,11 +42,7 @@
             _executor = executor;
             _sequenceId = new SequenceId(options.InitialSequenceId);
 
-            _commandProducer = new CommandProducer
-            {
-                ProducerName = options.ProducerName,
-                Topic = options.Topic
-            };
+            _commandProducer = new CommandProducer { ProducerName = options.ProducerName, Topic = options.Topic };
         }
 
         public async Task<IProducerChannel> Create(CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index a4e7d52..ef39c9b 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading.Tasks;
+    using Abstractions;
+
     public sealed class ProducerProcess : Process
     {
         private readonly IStateManager<ProducerState> _stateManager;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index dc22d89..fd6870a 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -12,37 +12,33 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Collections.Generic;
-using System.Security.Cryptography.X509Certificates;
-using System.Text;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Security.Cryptography.X509Certificates;
+    using System.Text;
+    using System.Threading.Tasks;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using PulsarApi;
+
     public sealed class PulsarClientBuilder : IPulsarClientBuilder
     {
         private readonly CommandConnect _commandConnect;
-        private List<IHandleException> _exceptionHandlers;
+        private readonly List<IHandleException> _exceptionHandlers;
         private EncryptionPolicy? _encryptionPolicy;
         private TimeSpan _retryInterval;
         private Uri _serviceUrl;
         private X509Certificate2? _trustedCertificateAuthority;
-        private X509Certificate2Collection _clientCertificates;
+        private readonly X509Certificate2Collection _clientCertificates;
         private bool _verifyCertificateAuthority;
         private bool _verifyCertificateName;
         private TimeSpan _closeInactiveConnectionsInterval;
 
         public PulsarClientBuilder()
         {
-            _commandConnect = new CommandConnect
-            {
-                ProtocolVersion = Constants.ProtocolVersion,
-                ClientVersion = Constants.ClientVersion
-            };
+            _commandConnect = new CommandConnect { ProtocolVersion = Constants.ProtocolVersion, ClientVersion = Constants.ClientVersion };
 
             _exceptionHandlers = new List<IHandleException>();
             _retryInterval = TimeSpan.FromSeconds(3);
@@ -131,7 +127,8 @@
                     _encryptionPolicy = EncryptionPolicy.EnforceUnencrypted;
 
                 if (_encryptionPolicy.Value == EncryptionPolicy.EnforceEncrypted)
-                    throw new ConnectionSecurityException($"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarScheme}' and cannot be used with an encryption policy of 'EnforceEncrypted'");
+                    throw new ConnectionSecurityException(
+                        $"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarScheme}' and cannot be used with an encryption policy of 'EnforceEncrypted'");
             }
             else if (scheme == Constants.PulsarSslScheme)
             {
@@ -139,17 +136,17 @@
                     _encryptionPolicy = EncryptionPolicy.EnforceEncrypted;
 
                 if (_encryptionPolicy.Value == EncryptionPolicy.EnforceUnencrypted)
-                    throw new ConnectionSecurityException($"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'");
+                    throw new ConnectionSecurityException(
+                        $"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'");
             }
             else
+            {
                 throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
+            }
 
             var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
             var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval);
-            var exceptionHandlers = new List<IHandleException>(_exceptionHandlers)
-            {
-                new DefaultExceptionHandler(_retryInterval)
-            };
+            var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
             var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
             var processManager = new ProcessManager(connectionPool);
             return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline);
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 6c7b330..d4c3f97 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -12,20 +12,20 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Exceptions;
-using DotPulsar.Internal.Extensions;
-using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.IO;
-using System.IO.Pipelines;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.IO;
+    using System.IO.Pipelines;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Exceptions;
+    using Extensions;
+
     public sealed class PulsarStream : IPulsarStream
     {
         private const long PauseAtMoreThan10Mb = 10485760;
@@ -101,6 +101,7 @@
                     _writer.Advance(bytesRead);
 
                     var result = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
+
                     if (result.IsCompleted)
                         break;
                 }
@@ -132,6 +133,7 @@
 
                         var frameSize = buffer.ReadUInt32(0, true);
                         var totalSize = frameSize + 4;
+
                         if (buffer.Length < totalSize)
                             break;
 
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 6c7d1c5..d559663 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -12,18 +12,18 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-using System.Collections.Generic;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+    using Events;
+
     public sealed class Reader : IReader
     {
         private readonly Guid _correlationId;
@@ -56,18 +56,18 @@
         public async ValueTask<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
             => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
 
-        public bool IsFinalState() => _state.IsFinalState();
+        public bool IsFinalState()
+            => _state.IsFinalState();
 
-        public bool IsFinalState(ReaderState state) => _state.IsFinalState(state);
+        public bool IsFinalState(ReaderState state)
+            => _state.IsFinalState(state);
 
         public async IAsyncEnumerable<Message> Messages([EnumeratorCancellation] CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
             while (!cancellationToken.IsCancellationRequested)
-            {
                 yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
-            }
         }
 
         public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
index b04747b..16d99ab 100644
--- a/src/DotPulsar/Internal/ReaderBuilder.cs
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-
 namespace DotPulsar.Internal
 {
+    using DotPulsar.Abstractions;
+    using DotPulsar.Exceptions;
+
     public sealed class ReaderBuilder : IReaderBuilder
     {
         private readonly IPulsarClient _pulsarClient;
@@ -71,12 +71,7 @@
             if (string.IsNullOrEmpty(_topic))
                 throw new ConfigurationException("Topic may not be null or empty");
 
-            var options = new ReaderOptions(_startMessageId, _topic!)
-            {
-                MessagePrefetchCount = _messagePrefetchCount,
-                ReadCompacted = _readCompacted,
-                ReaderName = _readerName
-            };
+            var options = new ReaderOptions(_startMessageId, _topic!) { MessagePrefetchCount = _messagePrefetchCount, ReadCompacted = _readCompacted, ReaderName = _readerName };
 
             return _pulsarClient.CreateReader(options);
         }
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index d37319f..4458b7a 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -12,14 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using PulsarApi;
+
     public sealed class ReaderChannelFactory : IReaderChannelFactory
     {
         private readonly Guid _correlationId;
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index 3a7ec7d..845bf54 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading.Tasks;
+    using Abstractions;
+
     public sealed class ReaderProcess : Process
     {
         private readonly IStateManager<ReaderState> _stateManager;
@@ -35,7 +35,7 @@
             _reader = reader;
         }
 
-        public async override ValueTask DisposeAsync()
+        public override async ValueTask DisposeAsync()
         {
             _stateManager.SetState(ReaderState.Closed);
             CancellationTokenSource.Cancel();
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 6df4ceb..79c8533 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Threading.Tasks;
+    using PulsarApi;
+
     public sealed class RequestResponseHandler : IDisposable
     {
         private const string ConnectResponseIdentifier = "Connected";
@@ -31,7 +31,8 @@
             _requestId = 1;
         }
 
-        public void Dispose() => _responses.Dispose();
+        public void Dispose()
+            => _responses.Dispose();
 
         public Task<BaseCommand> Outgoing(BaseCommand command)
         {
@@ -42,6 +43,7 @@
         public void Incoming(BaseCommand command)
         {
             var identifier = GetResponseIdentifier(command);
+
             if (identifier != null)
                 _responses.SetResult(identifier, command);
         }
@@ -85,24 +87,42 @@
             switch (cmd.CommandType)
             {
                 case BaseCommand.Type.Connect:
-                case BaseCommand.Type.Connected: return ConnectResponseIdentifier;
-                case BaseCommand.Type.Send: return cmd.Send.ProducerId.ToString() + '-' + cmd.Send.SequenceId.ToString();
-                case BaseCommand.Type.SendError: return cmd.SendError.ProducerId.ToString() + '-' + cmd.SendError.SequenceId.ToString();
-                case BaseCommand.Type.SendReceipt: return cmd.SendReceipt.ProducerId.ToString() + '-' + cmd.SendReceipt.SequenceId.ToString();
-                case BaseCommand.Type.Error: return _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString();
-                case BaseCommand.Type.Producer: return cmd.Producer.RequestId.ToString();
-                case BaseCommand.Type.ProducerSuccess: return cmd.ProducerSuccess.RequestId.ToString();
-                case BaseCommand.Type.CloseProducer: return cmd.CloseProducer.RequestId.ToString();
-                case BaseCommand.Type.Lookup: return cmd.LookupTopic.RequestId.ToString();
-                case BaseCommand.Type.LookupResponse: return cmd.LookupTopicResponse.RequestId.ToString();
-                case BaseCommand.Type.Unsubscribe: return cmd.Unsubscribe.RequestId.ToString();
-                case BaseCommand.Type.Subscribe: return cmd.Subscribe.RequestId.ToString();
-                case BaseCommand.Type.Success: return cmd.Success.RequestId.ToString();
-                case BaseCommand.Type.Seek: return cmd.Seek.RequestId.ToString();
-                case BaseCommand.Type.CloseConsumer: return cmd.CloseConsumer.RequestId.ToString();
-                case BaseCommand.Type.GetLastMessageId: return cmd.GetLastMessageId.RequestId.ToString();
-                case BaseCommand.Type.GetLastMessageIdResponse: return cmd.GetLastMessageIdResponse.RequestId.ToString();
-                default: throw new ArgumentOutOfRangeException("CommandType", cmd.CommandType, "CommandType not supported as request/response type");
+                case BaseCommand.Type.Connected:
+                    return ConnectResponseIdentifier;
+                case BaseCommand.Type.Send:
+                    return cmd.Send.ProducerId.ToString() + '-' + cmd.Send.SequenceId;
+                case BaseCommand.Type.SendError:
+                    return cmd.SendError.ProducerId.ToString() + '-' + cmd.SendError.SequenceId;
+                case BaseCommand.Type.SendReceipt:
+                    return cmd.SendReceipt.ProducerId.ToString() + '-' + cmd.SendReceipt.SequenceId;
+                case BaseCommand.Type.Error:
+                    return _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString();
+                case BaseCommand.Type.Producer:
+                    return cmd.Producer.RequestId.ToString();
+                case BaseCommand.Type.ProducerSuccess:
+                    return cmd.ProducerSuccess.RequestId.ToString();
+                case BaseCommand.Type.CloseProducer:
+                    return cmd.CloseProducer.RequestId.ToString();
+                case BaseCommand.Type.Lookup:
+                    return cmd.LookupTopic.RequestId.ToString();
+                case BaseCommand.Type.LookupResponse:
+                    return cmd.LookupTopicResponse.RequestId.ToString();
+                case BaseCommand.Type.Unsubscribe:
+                    return cmd.Unsubscribe.RequestId.ToString();
+                case BaseCommand.Type.Subscribe:
+                    return cmd.Subscribe.RequestId.ToString();
+                case BaseCommand.Type.Success:
+                    return cmd.Success.RequestId.ToString();
+                case BaseCommand.Type.Seek:
+                    return cmd.Seek.RequestId.ToString();
+                case BaseCommand.Type.CloseConsumer:
+                    return cmd.CloseConsumer.RequestId.ToString();
+                case BaseCommand.Type.GetLastMessageId:
+                    return cmd.GetLastMessageId.RequestId.ToString();
+                case BaseCommand.Type.GetLastMessageIdResponse:
+                    return cmd.GetLastMessageIdResponse.RequestId.ToString();
+                default:
+                    throw new ArgumentOutOfRangeException("CommandType", cmd.CommandType, "CommandType not supported as request/response type");
             }
         }
     }
diff --git a/src/DotPulsar/Internal/SendPackage.cs b/src/DotPulsar/Internal/SendPackage.cs
index ba71078..2acc07e 100644
--- a/src/DotPulsar/Internal/SendPackage.cs
+++ b/src/DotPulsar/Internal/SendPackage.cs
@@ -12,21 +12,21 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.PulsarApi;
-using System.Buffers;
-
 namespace DotPulsar.Internal
 {
+    using System.Buffers;
+    using PulsarApi;
+
     public sealed class SendPackage
     {
-        public SendPackage(CommandSend command, PulsarApi.MessageMetadata metadata)
+        public SendPackage(CommandSend command, MessageMetadata metadata)
         {
             Command = command;
             Metadata = metadata;
         }
 
         public CommandSend Command { get; }
-        public PulsarApi.MessageMetadata Metadata { get; set; }
+        public MessageMetadata Metadata { get; set; }
         public ReadOnlySequence<byte> Payload { get; set; }
     }
 }
diff --git a/src/DotPulsar/Internal/SequenceBuilder.cs b/src/DotPulsar/Internal/SequenceBuilder.cs
index 7c231e8..4ad0a41 100644
--- a/src/DotPulsar/Internal/SequenceBuilder.cs
+++ b/src/DotPulsar/Internal/SequenceBuilder.cs
@@ -12,18 +12,19 @@
  * limitations under the License.
  */
 
-using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.Linq;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.Linq;
+
     public sealed class SequenceBuilder<T> where T : notnull
     {
         private readonly LinkedList<ReadOnlyMemory<T>> _elements;
 
-        public SequenceBuilder() => _elements = new LinkedList<ReadOnlyMemory<T>>();
+        public SequenceBuilder()
+            => _elements = new LinkedList<ReadOnlyMemory<T>>();
 
         public SequenceBuilder<T> Prepend(ReadOnlyMemory<T> memory)
         {
@@ -37,10 +38,9 @@
 
             foreach (var memory in sequence)
             {
-                if (index is null)
-                    index = _elements.AddFirst(memory);
-                else
-                    index = _elements.AddAfter(index, memory);
+                index = index is null
+                    ? _elements.AddFirst(memory)
+                    : _elements.AddAfter(index, memory);
             }
 
             return this;
@@ -78,7 +78,9 @@
                     start = current;
                 }
                 else
+                {
                     current = current.CreateNext(element);
+                }
             }
 
             return new ReadOnlySequence<T>(start, 0, current, current!.Memory.Length);
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
index 4499dbf..5b41d7d 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -19,12 +19,14 @@
         public SequenceId(ulong initialSequenceId)
         {
             Current = initialSequenceId;
+
             if (initialSequenceId > 0)
                 Increment();
         }
 
         public ulong Current { get; private set; }
 
-        public void Increment() => ++Current;
+        public void Increment()
+            => ++Current;
     }
 }
diff --git a/src/DotPulsar/Internal/Serializer.cs b/src/DotPulsar/Internal/Serializer.cs
index 093e6ce..ca5f108 100644
--- a/src/DotPulsar/Internal/Serializer.cs
+++ b/src/DotPulsar/Internal/Serializer.cs
@@ -12,26 +12,27 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.IO;
-
 namespace DotPulsar.Internal
 {
+    using System;
+    using System.Buffers;
+    using System.IO;
+    using PulsarApi;
+
     public static class Serializer
     {
         public static T Deserialize<T>(ReadOnlySequence<byte> sequence)
         {
-            using var ms = new MemoryStream(sequence.ToArray()); //TODO Fix this when protobuf-net start supporting sequences or .NET supports creating a stream from a sequence
+            //TODO Fix this when protobuf-net start supporting sequences or .NET supports creating a stream from a sequence
+            using var ms = new MemoryStream(sequence.ToArray());
             return ProtoBuf.Serializer.Deserialize<T>(ms);
         }
 
         public static ReadOnlySequence<byte> Serialize(BaseCommand command)
         {
             var commandBytes = Serialize<BaseCommand>(command);
-            var commandSizeBytes = ToBigEndianBytes((uint)commandBytes.Length);
-            var totalSizeBytes = ToBigEndianBytes((uint)commandBytes.Length + 4);
+            var commandSizeBytes = ToBigEndianBytes((uint) commandBytes.Length);
+            var totalSizeBytes = ToBigEndianBytes((uint) commandBytes.Length + 4);
 
             return new SequenceBuilder<byte>()
                 .Append(totalSizeBytes)
@@ -40,13 +41,13 @@
                 .Build();
         }
 
-        public static ReadOnlySequence<byte> Serialize(BaseCommand command, PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload)
+        public static ReadOnlySequence<byte> Serialize(BaseCommand command, MessageMetadata metadata, ReadOnlySequence<byte> payload)
         {
             var commandBytes = Serialize<BaseCommand>(command);
-            var commandSizeBytes = ToBigEndianBytes((uint)commandBytes.Length);
+            var commandSizeBytes = ToBigEndianBytes((uint) commandBytes.Length);
 
             var metadataBytes = Serialize(metadata);
-            var metadataSizeBytes = ToBigEndianBytes((uint)metadataBytes.Length);
+            var metadataSizeBytes = ToBigEndianBytes((uint) metadataBytes.Length);
 
             var sb = new SequenceBuilder<byte>().Append(metadataSizeBytes).Append(metadataBytes).Append(payload);
             var checksum = Crc32C.Calculate(sb.Build());
@@ -55,17 +56,17 @@
                 .Prepend(Constants.MagicNumber)
                 .Prepend(commandBytes)
                 .Prepend(commandSizeBytes)
-                .Prepend(ToBigEndianBytes((uint)sb.Length))
+                .Prepend(ToBigEndianBytes((uint) sb.Length))
                 .Build();
         }
 
         public static byte[] ToBigEndianBytes(uint integer)
         {
             var union = new UIntUnion(integer);
-            if (BitConverter.IsLittleEndian)
-                return new[] { union.B3, union.B2, union.B1, union.B0 };
-            else
-                return new[] { union.B0, union.B1, union.B2, union.B3 };
+
+            return BitConverter.IsLittleEndian
+                ? new[] { union.B3, union.B2, union.B1, union.B0 }
+                : new[] { union.B0, union.B1, union.B2, union.B3 };
         }
 
         private static byte[] Serialize<T>(T item)
diff --git a/src/DotPulsar/Internal/StateManager.cs b/src/DotPulsar/Internal/StateManager.cs
index e2f1c0b..f9fab5d 100644
--- a/src/DotPulsar/Internal/StateManager.cs
+++ b/src/DotPulsar/Internal/StateManager.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Abstractions;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+
     public sealed class StateManager<TState> : IStateManager<TState> where TState : notnull
     {
         private readonly object _lock;
@@ -84,6 +84,7 @@
             return false;
         }
 
-        public bool IsFinalState() => IsFinalState(CurrentState);
+        public bool IsFinalState()
+            => IsFinalState(CurrentState);
     }
 }
diff --git a/src/DotPulsar/Internal/StateTask.cs b/src/DotPulsar/Internal/StateTask.cs
index 452276d..dbd5a08 100644
--- a/src/DotPulsar/Internal/StateTask.cs
+++ b/src/DotPulsar/Internal/StateTask.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System;
-
 namespace DotPulsar.Internal
 {
+    using System;
+
     public sealed class StateTask<TState> : IDisposable where TState : notnull
     {
         private readonly TState _state;
@@ -30,7 +30,8 @@
 
         public CancelableCompletionSource<TState> CancelableCompletionSource { get; }
 
-        public void Dispose() => CancelableCompletionSource.Dispose();
+        public void Dispose()
+            => CancelableCompletionSource.Dispose();
 
         public bool IsAwaiting(TState state)
         {
diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs b/src/DotPulsar/Internal/StateTaskCollection.cs
index 418c5e6..d8d2547 100644
--- a/src/DotPulsar/Internal/StateTaskCollection.cs
+++ b/src/DotPulsar/Internal/StateTaskCollection.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar.Internal
 {
+    using System.Collections.Generic;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     public sealed class StateTaskCollection<TState> where TState : notnull
     {
         private readonly object _lock;
@@ -47,15 +47,18 @@
             lock (_lock)
             {
                 var awaitor = _awaitors.First;
+
                 while (awaitor != null)
                 {
                     var next = awaitor.Next;
+
                     if (awaitor.Value.IsAwaiting(state))
                     {
                         _awaitors.Remove(awaitor);
                         awaitor.Value.CancelableCompletionSource.SetResult(state);
                         awaitor.Value.CancelableCompletionSource.Dispose();
                     }
+
                     awaitor = next;
                 }
             }
@@ -70,6 +73,7 @@
                     awaitor.CancelableCompletionSource.SetResult(state);
                     awaitor.CancelableCompletionSource.Dispose();
                 }
+
                 _awaitors.Clear();
             }
         }
diff --git a/src/DotPulsar/Internal/SubscribeResponse.cs b/src/DotPulsar/Internal/SubscribeResponse.cs
index db135d1..472fddd 100644
--- a/src/DotPulsar/Internal/SubscribeResponse.cs
+++ b/src/DotPulsar/Internal/SubscribeResponse.cs
@@ -16,7 +16,8 @@
 {
     public sealed class SubscribeResponse
     {
-        public SubscribeResponse(ulong consumerId) => ConsumerId = consumerId;
+        public SubscribeResponse(ulong consumerId)
+            => ConsumerId = consumerId;
 
         public ulong ConsumerId { get; }
     }
diff --git a/src/DotPulsar/Internal/UIntUnion.cs b/src/DotPulsar/Internal/UIntUnion.cs
index 21dc4e4..96b3a10 100644
--- a/src/DotPulsar/Internal/UIntUnion.cs
+++ b/src/DotPulsar/Internal/UIntUnion.cs
@@ -12,10 +12,10 @@
  * limitations under the License.
  */
 
-using System.Runtime.InteropServices;
-
 namespace DotPulsar.Internal
 {
+    using System.Runtime.InteropServices;
+
     [StructLayout(LayoutKind.Explicit)]
     public struct UIntUnion
     {
@@ -39,10 +39,13 @@
 
         [FieldOffset(0)]
         public byte B0;
+
         [FieldOffset(1)]
         public byte B1;
+
         [FieldOffset(2)]
         public byte B2;
+
         [FieldOffset(3)]
         public byte B3;
 
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index 53d0693..2c0adec 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -12,22 +12,23 @@
  * limitations under the License.
  */
 
-using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.Linq;
-
 namespace DotPulsar
 {
+    using System;
+    using System.Buffers;
+    using System.Collections.Generic;
+    using System.Linq;
+    using Internal.PulsarApi;
+
     public sealed class Message
     {
-        private readonly List<Internal.PulsarApi.KeyValue> _keyValues;
+        private readonly List<KeyValue> _keyValues;
         private IReadOnlyDictionary<string, string>? _properties;
 
         internal Message(
             MessageId messageId,
             Internal.PulsarApi.MessageMetadata metadata,
-            Internal.PulsarApi.SingleMessageMetadata? singleMetadata,
+            SingleMessageMetadata? singleMetadata,
             ReadOnlySequence<byte> data)
         {
             MessageId = messageId;
@@ -62,7 +63,7 @@
 
         public bool HasEventTime => EventTime != 0;
         public ulong EventTime { get; }
-        public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)EventTime);
+        public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) EventTime);
 
         public bool HasBase64EncodedKey { get; }
         public bool HasKey => Key != null;
@@ -72,9 +73,8 @@
         public bool HasOrderingKey => OrderingKey != null;
         public byte[]? OrderingKey { get; }
 
-
         public ulong PublishTime { get; }
-        public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)PublishTime);
+        public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) PublishTime);
 
         public IReadOnlyDictionary<string, string> Properties => _properties ??= _keyValues.ToDictionary(p => p.Key, p => p.Value);
     }
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index e2c26c1..9e90967 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.PulsarApi;
-using System;
-
 namespace DotPulsar
 {
+    using System;
+    using Internal.PulsarApi;
+
     public sealed class MessageId : IEquatable<MessageId>
     {
         static MessageId()
@@ -28,18 +28,11 @@
         public static MessageId Earliest { get; }
         public static MessageId Latest { get; }
 
-        internal MessageId(MessageIdData messageIdData) => Data = messageIdData;
+        internal MessageId(MessageIdData messageIdData)
+            => Data = messageIdData;
 
         public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex)
-        {
-            Data = new MessageIdData
-            {
-                LedgerId = ledgerId,
-                EntryId = entryId,
-                Partition = partition,
-                BatchIndex = batchIndex
-            };
-        }
+            => Data = new MessageIdData { LedgerId = ledgerId, EntryId = entryId, Partition = partition, BatchIndex = batchIndex };
 
         internal MessageIdData Data { get; }
 
@@ -48,13 +41,22 @@
         public int Partition => Data.Partition;
         public int BatchIndex => Data.BatchIndex;
 
-        public override bool Equals(object o) => o is MessageId ? Equals((MessageId)o) : false;
-        public bool Equals(MessageId other) => LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
+        public override bool Equals(object o)
+            => o is MessageId ? Equals((MessageId) o) : false;
 
-        public static bool operator ==(MessageId x, MessageId y) => x.Equals(y);
-        public static bool operator !=(MessageId x, MessageId y) => !x.Equals(y);
+        public bool Equals(MessageId other)
+            => LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
 
-        public override int GetHashCode() => HashCode.Combine(LedgerId, EntryId, Partition, BatchIndex);
-        public override string ToString() => $"LedgerId: {LedgerId}, EntryId: {EntryId}, Partition: {Partition}, BatchIndex: {BatchIndex}";
+        public static bool operator ==(MessageId x, MessageId y)
+            => x.Equals(y);
+
+        public static bool operator !=(MessageId x, MessageId y)
+            => !x.Equals(y);
+
+        public override int GetHashCode()
+            => HashCode.Combine(LedgerId, EntryId, Partition, BatchIndex);
+
+        public override string ToString()
+            => $"LedgerId: {LedgerId}, EntryId: {EntryId}, Partition: {Partition}, BatchIndex: {BatchIndex}";
     }
 }
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index 98cd366..2af466e 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -12,14 +12,16 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal.Extensions;
-using System;
-
 namespace DotPulsar
 {
+    using System;
+    using Internal.Extensions;
+    using Internal.PulsarApi;
+
     public sealed class MessageMetadata
     {
-        public MessageMetadata() => Metadata = new Internal.PulsarApi.MessageMetadata();
+        public MessageMetadata()
+            => Metadata = new Internal.PulsarApi.MessageMetadata();
 
         internal Internal.PulsarApi.MessageMetadata Metadata;
 
@@ -72,6 +74,7 @@
                 for (var i = 0; i < Metadata.Properties.Count; ++i)
                 {
                     var keyValye = Metadata.Properties[i];
+
                     if (keyValye.Key == key)
                         return keyValye.Value;
                 }
@@ -83,13 +86,16 @@
                 for (var i = 0; i < Metadata.Properties.Count; ++i)
                 {
                     var keyValye = Metadata.Properties[i];
+
                     if (keyValye.Key != key)
                         continue;
+
                     keyValye.Value = value;
+
                     return;
                 }
 
-                Metadata.Properties.Add(new Internal.PulsarApi.KeyValue { Key = key, Value = value });
+                Metadata.Properties.Add(new KeyValue { Key = key, Value = value });
             }
         }
 
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index ad5f0e5..46dcfa9 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -12,16 +12,16 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal;
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
 namespace DotPulsar
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Exceptions;
+    using Internal;
+    using Internal.Abstractions;
+
     public sealed class PulsarClient : IPulsarClient
     {
         private readonly IConnectionPool _connectionPool;
@@ -38,7 +38,8 @@
             DotPulsarEventSource.Log.ClientCreated();
         }
 
-        public static IPulsarClientBuilder Builder() => new PulsarClientBuilder();
+        public static IPulsarClientBuilder Builder()
+            => new PulsarClientBuilder();
 
         public IProducer CreateProducer(ProducerOptions options)
         {
@@ -60,7 +61,9 @@
             var correlationId = Guid.NewGuid();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
-            var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+
+            var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic,
+                ConsumerState.Faulted);
             var consumer = new Consumer(correlationId, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager);
             var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
             _processManager.Add(process);
diff --git a/tests/DotPulsar.StressTests/ConnectionTests.cs b/tests/DotPulsar.StressTests/ConnectionTests.cs
index 3a5f694..cafc5b6 100644
--- a/tests/DotPulsar.StressTests/ConnectionTests.cs
+++ b/tests/DotPulsar.StressTests/ConnectionTests.cs
@@ -12,22 +12,23 @@
  * limitations under the License.
  */
 
-using DotPulsar.Extensions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using DotPulsar.StressTests.Fixtures;
-using Xunit;
-using Xunit.Abstractions;
-
 namespace DotPulsar.StressTests
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Extensions;
+    using Fixtures;
+    using Xunit;
+    using Xunit.Abstractions;
+
     [Collection(nameof(StandaloneClusterTest))]
     public class ConnectionTests
     {
         private readonly ITestOutputHelper _output;
 
-        public ConnectionTests(ITestOutputHelper output) => _output = output;
+        public ConnectionTests(ITestOutputHelper output)
+            => _output = output;
 
         [Theory]
         [InlineData("pulsar://localhost:54545")] // test that we can connect directly to a broker
@@ -43,9 +44,7 @@
                 .ExceptionHandler(new XunitExceptionHandler(_output));
 
             if (!string.IsNullOrEmpty(serviceUrl))
-            {
                 builder.ServiceUrl(new Uri(serviceUrl));
-            }
 
             await using var client = builder.Build();
 
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs b/tests/DotPulsar.StressTests/ConsumerTests.cs
index ae84601..db3de6e 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -12,26 +12,27 @@
  * limitations under the License.
  */
 
-using DotPulsar.Extensions;
-using FluentAssertions;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using DotPulsar.StressTests.Fixtures;
-using Xunit;
-using Xunit.Abstractions;
-
 namespace DotPulsar.StressTests
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Text;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Extensions;
+    using Fixtures;
+    using FluentAssertions;
+    using Xunit;
+    using Xunit.Abstractions;
+
     [Collection(nameof(StandaloneClusterTest))]
     public class ConsumerTests
     {
         private readonly ITestOutputHelper _output;
 
-        public ConsumerTests(ITestOutputHelper output) => _output = output;
+        public ConsumerTests(ITestOutputHelper output)
+            => _output = output;
 
         [Theory]
         [InlineData(10000)]
@@ -83,7 +84,8 @@
                 {
                     ids.Add(message.MessageId);
 
-                    if (ids.Count != numberOfMessages) continue;
+                    if (ids.Count != numberOfMessages)
+                        continue;
 
                     await consumer.AcknowledgeCumulative(message, ct).ConfigureAwait(false);
 
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index 0558fac..7f6eca1 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -12,13 +12,16 @@
  * limitations under the License.
  */
 
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Threading.Tasks;
+#pragma warning disable 8601
+#pragma warning disable 8625
 
 namespace DotPulsar.StressTests
 {
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Threading.Tasks;
+
     public static class EnumerableValueTaskExtensions
     {
         [DebuggerStepThrough]
@@ -47,19 +50,19 @@
         public static async IAsyncEnumerable<TResult> Enumerate<TResult>(this IEnumerable<ValueTask<TResult>> source)
         {
             foreach (var operation in source.Select(GetInfo))
+            {
                 yield return operation.IsTask
                     ? await operation.Task.ConfigureAwait(false)
                     : operation.Result;
+            }
         }
 
         private static ValueTaskInfo<TResult> GetInfo<TResult>(this ValueTask<TResult> source)
-        {
-            return source.IsCompleted
+            => source.IsCompleted
                 ? new ValueTaskInfo<TResult>(source.Result)
                 : new ValueTaskInfo<TResult>(source.AsTask());
-        }
 
-        private struct ValueTaskInfo<TResult>
+        private readonly struct ValueTaskInfo<TResult>
         {
             public ValueTaskInfo(Task<TResult> task)
             {
@@ -84,9 +87,11 @@
     public static class EnumerableTaskExtensions
     {
         [DebuggerStepThrough]
-        public static Task WhenAll(this IEnumerable<Task> source) => Task.WhenAll(source);
+        public static Task WhenAll(this IEnumerable<Task> source)
+            => Task.WhenAll(source);
 
         [DebuggerStepThrough]
-        public static Task<T[]> WhenAll<T>(this IEnumerable<Task<T>> source) => Task.WhenAll(source);
+        public static Task<T[]> WhenAll<T>(this IEnumerable<Task<T>> source)
+            => Task.WhenAll(source);
     }
 }
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
index 7583fc6..2874bfd 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
@@ -12,28 +12,25 @@
  * limitations under the License.
  */
 
-using System;
-using System.Diagnostics;
-using System.Net.Http;
-using System.Threading.Tasks;
-using Xunit;
-
 namespace DotPulsar.StressTests.Fixtures
 {
+    using System;
+    using System.Diagnostics;
+    using System.Net.Http;
+    using System.Threading.Tasks;
+    using Xunit;
+
     public class StandaloneClusterFixture : IAsyncLifetime
     {
         public async Task InitializeAsync()
         {
-            TakeDownPulsar();  // clean-up if anything was left running from previous run
+            TakeDownPulsar(); // clean-up if anything was left running from previous run
 
             RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml up -d");
 
             var waitTries = 10;
 
-            using var handler = new HttpClientHandler
-            {
-                AllowAutoRedirect = true
-            };
+            using var handler = new HttpClientHandler { AllowAutoRedirect = true };
 
             using var client = new HttpClient(handler);
 
@@ -65,11 +62,7 @@
 
         private static void RunProcess(string name, string arguments)
         {
-            var processStartInfo = new ProcessStartInfo()
-            {
-                FileName = name,
-                Arguments = arguments
-            };
+            var processStartInfo = new ProcessStartInfo { FileName = name, Arguments = arguments };
 
             processStartInfo.Environment["TAG"] = "test";
             processStartInfo.Environment["CONFIGURATION"] = "Debug";
@@ -80,9 +73,7 @@
             process.WaitForExit();
 
             if (process.ExitCode != 0)
-            {
                 throw new Exception($"Exit code {process.ExitCode} when running process {name} with arguments {arguments}");
-            }
         }
     }
 }
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterTests.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterTests.cs
index e603fb5..cb2de57 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterTests.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterTests.cs
@@ -12,13 +12,10 @@
  * limitations under the License.
  */
 
-using Xunit;
-
 namespace DotPulsar.StressTests.Fixtures
 {
-    [CollectionDefinition(nameof(StandaloneClusterTest))]
-    public class StandaloneClusterTest : ICollectionFixture<StandaloneClusterFixture>
-    {
+    using Xunit;
 
-    }
+    [CollectionDefinition(nameof(StandaloneClusterTest))]
+    public class StandaloneClusterTest : ICollectionFixture<StandaloneClusterFixture> { }
 }
diff --git a/tests/DotPulsar.StressTests/XunitExceptionHandler.cs b/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
index 159781d..e381c95 100644
--- a/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
+++ b/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
@@ -12,14 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Abstractions;
-using DotPulsar.Internal;
-using System;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-
 namespace DotPulsar.StressTests
 {
+    using System;
+    using System.Threading.Tasks;
+    using Abstractions;
+    using Internal;
+    using Xunit.Abstractions;
+
     internal class XunitExceptionHandler : IHandleException
     {
         private readonly ITestOutputHelper _output;
@@ -31,9 +31,7 @@
             _exceptionHandler = exceptionHandler;
         }
 
-        public XunitExceptionHandler(ITestOutputHelper output) : this(output, new DefaultExceptionHandler(TimeSpan.FromSeconds(3)))
-        {
-        }
+        public XunitExceptionHandler(ITestOutputHelper output) : this(output, new DefaultExceptionHandler(TimeSpan.FromSeconds(3))) { }
 
         public async ValueTask OnException(ExceptionContext exceptionContext)
         {
diff --git a/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs b/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
index adce63f..bb7a151 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -12,15 +12,14 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal;
-using DotPulsar.Internal.Exceptions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit;
-
 namespace DotPulsar.Tests.Internal
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Exceptions;
+    using Xunit;
+
     public class AsyncLockTests
     {
         [Fact]
diff --git a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
index 290932c..d7afda4 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -12,13 +12,13 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit;
-
 namespace DotPulsar.Tests.Internal
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+    using DotPulsar.Internal;
+    using Xunit;
+
     public class AsyncQueueTests
     {
         [Fact]
diff --git a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
index 687b6d4..2c3b27e 100644
--- a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal;
-using Xunit;
-
 namespace DotPulsar.Tests.Internal
 {
+    using DotPulsar.Internal;
+    using Xunit;
+
     public class Crc32CTests
     {
         [Fact]
@@ -38,7 +38,11 @@
         public void Calculate_GivenSequenceWithMultipleSegments_ShouldReturnExpectedChecksum()
         {
             //Arrange
-            var s1 = new byte[] { 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18, 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, 0x58, 0x01 };
+            var s1 = new byte[]
+            {
+                0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18, 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c,
+                0x58, 0x01
+            };
             var s2 = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 0x2c };
             var sequence = new SequenceBuilder<byte>().Append(s1).Append(s2).Build();
 
diff --git a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
index 4d4f900..91f2c7b 100644
--- a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal;
-using DotPulsar.Internal.Extensions;
-using Xunit;
-
 namespace DotPulsar.Tests.Internal.Extensions
 {
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Extensions;
+    using Xunit;
+
     public class ReadOnlySequenceExtensionsTests
     {
         [Fact]
diff --git a/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs b/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
index ede8b2d..1a2ae51 100644
--- a/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
@@ -12,12 +12,12 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal;
-using System.Buffers;
-using Xunit;
-
 namespace DotPulsar.Tests.Internal
 {
+    using System.Buffers;
+    using DotPulsar.Internal;
+    using Xunit;
+
     public class SequenceBuilderTests
     {
         [Fact]
@@ -34,6 +34,7 @@
 
             //Assert
             var array = sequence.ToArray();
+
             for (byte i = 0; i < array.Length; ++i)
                 Assert.Equal(i, array[i]);
         }
@@ -55,6 +56,7 @@
 
             //Assert
             var array = sequence.ToArray();
+
             for (byte i = 0; i < array.Length; ++i)
                 Assert.Equal(i, array[i]);
         }
@@ -73,6 +75,7 @@
 
             //Assert
             var array = sequence.ToArray();
+
             for (byte i = 0; i < array.Length; ++i)
                 Assert.Equal(i, array[i]);
         }
@@ -94,6 +97,7 @@
 
             //Assert
             var array = sequence.ToArray();
+
             for (byte i = 0; i < array.Length; ++i)
                 Assert.Equal(i, array[i]);
         }
diff --git a/tests/DotPulsar.Tests/Internal/SerializerTests.cs b/tests/DotPulsar.Tests/Internal/SerializerTests.cs
index 81375a2..87df2b3 100644
--- a/tests/DotPulsar.Tests/Internal/SerializerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/SerializerTests.cs
@@ -12,11 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal;
-using Xunit;
-
 namespace DotPulsar.Tests.Internal
 {
+    using DotPulsar.Internal;
+    using Xunit;
+
     public class SerializerTests
     {
         [Fact]
diff --git a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
index 152b55b..9ea4aa9 100644
--- a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -12,13 +12,13 @@
  * limitations under the License.
  */
 
-using DotPulsar.Internal;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit;
-
 namespace DotPulsar.Tests.Internal
 {
+    using System.Threading;
+    using System.Threading.Tasks;
+    using DotPulsar.Internal;
+    using Xunit;
+
     public class StateManagerTests
     {
         [Theory]