Allow for concurrent message production (#23)

* Fix use of modified SequenceId

* Clean up SequenceId initial checking
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 824ed87..0ba8cd1 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -90,8 +90,9 @@
 
                 if (autoAssignSequenceId)
                 {
-                    sendPackage.Command.SequenceId = _sequenceId.Current;
-                    sendPackage.Metadata.SequenceId = _sequenceId.Current;
+                    var newSequenceId = _sequenceId.FetchNext();
+                    sendPackage.Command.SequenceId = newSequenceId;
+                    sendPackage.Metadata.SequenceId = newSequenceId;
                 }
                 else
                     sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId;
@@ -99,9 +100,6 @@
                 var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
 
-                if (autoAssignSequenceId)
-                    _sequenceId.Increment();
-
                 return response.SendReceipt;
             }
             finally
@@ -115,3 +113,4 @@
         }
     }
 }
+
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index c023c56..c855337 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -23,12 +23,12 @@
         private const string ConnectResponseIdentifier = "Connected";
 
         private readonly Awaiter<string, BaseCommand> _responses;
-        private ulong _requestId;
+        private SequenceId _requestId;
 
         public RequestResponseHandler()
         {
             _responses = new Awaiter<string, BaseCommand>();
-            _requestId = 1;
+            _requestId = new SequenceId(1);
         }
 
         public void Dispose()
@@ -53,31 +53,31 @@
             switch (cmd.CommandType)
             {
                 case BaseCommand.Type.Seek:
-                    cmd.Seek.RequestId = _requestId++;
+                    cmd.Seek.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Lookup:
-                    cmd.LookupTopic.RequestId = _requestId++;
+                    cmd.LookupTopic.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Error:
-                    cmd.Error.RequestId = _requestId++;
+                    cmd.Error.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Producer:
-                    cmd.Producer.RequestId = _requestId++;
+                    cmd.Producer.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.CloseProducer:
-                    cmd.CloseProducer.RequestId = _requestId++;
+                    cmd.CloseProducer.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Subscribe:
-                    cmd.Subscribe.RequestId = _requestId++;
+                    cmd.Subscribe.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Unsubscribe:
-                    cmd.Unsubscribe.RequestId = _requestId++;
+                    cmd.Unsubscribe.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.CloseConsumer:
-                    cmd.CloseConsumer.RequestId = _requestId++;
+                    cmd.CloseConsumer.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.GetLastMessageId:
-                    cmd.GetLastMessageId.RequestId = _requestId++;
+                    cmd.GetLastMessageId.RequestId = _requestId.FetchNext();
                     return;
             }
         }
@@ -90,7 +90,7 @@
                 BaseCommand.Type.Send => $"{cmd.Send.ProducerId}-{cmd.Send.SequenceId}",
                 BaseCommand.Type.SendError => $"{cmd.SendError.ProducerId}-{cmd.SendError.SequenceId}",
                 BaseCommand.Type.SendReceipt => $"{cmd.SendReceipt.ProducerId}-{cmd.SendReceipt.SequenceId}",
-                BaseCommand.Type.Error => _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString(),
+                BaseCommand.Type.Error => !_requestId.IsPastInitialId() ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString(),
                 BaseCommand.Type.Producer => cmd.Producer.RequestId.ToString(),
                 BaseCommand.Type.ProducerSuccess => cmd.ProducerSuccess.RequestId.ToString(),
                 BaseCommand.Type.CloseProducer => cmd.CloseProducer.RequestId.ToString(),
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
index 5b41d7d..55ba94a 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -12,21 +12,32 @@
  * limitations under the License.
  */
 
+using System.Threading;
+
 namespace DotPulsar.Internal
 {
     public sealed class SequenceId
     {
+        private long _current;
+        private ulong _initial;
+
         public SequenceId(ulong initialSequenceId)
         {
-            Current = initialSequenceId;
-
-            if (initialSequenceId > 0)
-                Increment();
+            // Subtracting one because Interlocked.Increment will return the post-incremented value
+            // which is expected to be the initialSequenceId for the first call
+            _current = unchecked((long)initialSequenceId - 1);
+            _initial = initialSequenceId - 1;
         }
 
-        public ulong Current { get; private set; }
+        // Returns false if FetchNext has not been called on this object before (or if it somehow wrapped around 2^64)
+        public bool IsPastInitialId()
+        {
+            return unchecked((ulong)_current != _initial);
+        }
 
-        public void Increment()
-            => ++Current;
+        public ulong FetchNext()
+        {
+            return unchecked((ulong)Interlocked.Increment(ref _current));
+        }
     }
 }