PROTON-2830 Add an async version of BeginMessage to stream sender

Adds an async variant of the BeginMessage API to the IStreamSender to match
the other async variations of send and try send.
diff --git a/src/Proton.Client/Client/IStreamSender.cs b/src/Proton.Client/Client/IStreamSender.cs
index 47b4684..26661a1 100644
--- a/src/Proton.Client/Client/IStreamSender.cs
+++ b/src/Proton.Client/Client/IStreamSender.cs
@@ -82,8 +82,18 @@
       /// to indicate that the previous instance has not yet been completed.
       /// </summary>
       /// <param name="deliveryAnnotations">The optional delivery annotations to transmit with the message</param>
-      /// <returns>This stream sender instance</returns>
+      /// <returns>A new IStreamSenderMessage if no send is currently in progress</returns>
       IStreamSenderMessage BeginMessage(IDictionary<string, object> deliveryAnnotations = null);
 
+      /// <summary>
+      /// Creates and returns a new stream capable message that can be used by the caller to perform
+      /// streaming sends of large message payload data.  Only one streamed message can be active
+      /// at a time so any successive calls to begin a new streaming message will throw an error
+      /// to indicate that the previous instance has not yet been completed.
+      /// </summary>
+      /// <param name="deliveryAnnotations">The optional delivery annotations to transmit with the message</param>
+      /// <returns>A Task that returns new IStreamSenderMessage if no send is currently in progress</returns>
+      Task<IStreamSenderMessage> BeginMessageAsync(IDictionary<string, object> deliveryAnnotations = null);
+
    }
 }
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
index 75f10f8..76a721c 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
@@ -83,35 +83,13 @@
       public IStreamSenderMessage BeginMessage(IDictionary<string, object> deliveryAnnotations = null)
       {
          CheckClosedOrFailed();
-         DeliveryAnnotations annotations = null;
-         TaskCompletionSource<IStreamSenderMessage> request = new();
+         return DoBeginMessageAsync(deliveryAnnotations).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
 
-         if (deliveryAnnotations != null)
-         {
-            annotations = new DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations));
-         }
-
-         ClientSession.Execute(() =>
-         {
-            if (ProtonSender.Current != null)
-            {
-               request.TrySetException(new ClientIllegalStateException(
-                   "Cannot initiate a new streaming send until the previous one is complete"));
-            }
-            else
-            {
-               // Grab the next delivery and hold for stream writes, no other sends
-               // can occur while we hold the delivery.
-               IOutgoingDelivery streamDelivery = ProtonSender.Next();
-               ClientStreamTracker streamTracker = new(this, streamDelivery);
-
-               streamDelivery.LinkedResource = streamTracker;
-
-               request.TrySetResult(new ClientStreamSenderMessage(this, streamTracker, annotations));
-            }
-         });
-
-         return request.Task.ConfigureAwait(false).GetAwaiter().GetResult();
+      public Task<IStreamSenderMessage> BeginMessageAsync(IDictionary<string, object> deliveryAnnotations = null)
+      {
+         CheckClosedOrFailed();
+         return DoBeginMessageAsync(deliveryAnnotations);
       }
 
       internal string SenderId => senderId;
@@ -329,6 +307,40 @@
          return new ClientNoOpStreamTracker(this);
       }
 
+      private Task<IStreamSenderMessage> DoBeginMessageAsync(IDictionary<string, object> deliveryAnnotations = null)
+      {
+         CheckClosedOrFailed();
+         DeliveryAnnotations annotations = null;
+         TaskCompletionSource<IStreamSenderMessage> request = new();
+
+         if (deliveryAnnotations != null)
+         {
+            annotations = new DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations));
+         }
+
+         ClientSession.Execute(() =>
+         {
+            if (ProtonSender.Current != null)
+            {
+               request.TrySetException(new ClientIllegalStateException(
+                   "Cannot initiate a new streaming send until the previous one is complete"));
+            }
+            else
+            {
+               // Grab the next delivery and hold for stream writes, no other sends
+               // can occur while we hold the delivery.
+               IOutgoingDelivery streamDelivery = ProtonSender.Next();
+               ClientStreamTracker streamTracker = new(this, streamDelivery);
+
+               streamDelivery.LinkedResource = streamTracker;
+
+               request.TrySetResult(new ClientStreamSenderMessage(this, streamTracker, annotations));
+            }
+         });
+
+         return request.Task;
+      }
+
       private Task<IStreamTracker> DoSendMessageAsync<T>(IAdvancedMessage<T> message, IDictionary<string, object> deliveryAnnotations, bool waitForCredit)
       {
          TaskCompletionSource<IStreamTracker> operation = new();
diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
index 9e76808..08f1b50 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
@@ -542,6 +542,16 @@
                // Expected
             }
 
+            try
+            {
+               sender.BeginMessageAsync().GetAwaiter().GetResult();
+               Assert.Fail("Should not be able create a new streaming sender message before last one is completed.");
+            }
+            catch (ClientIllegalStateException)
+            {
+               // Expected
+            }
+
             message.Abort();
 
             Assert.Throws<ClientIllegalStateException>(() => message.Complete());
@@ -724,6 +734,48 @@
       }
 
       [Test]
+      public void TestCreateStreamFromAsyncBegin()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.RemoteFlow().WithLinkCredit(1).Queue();
+            peer.ExpectTransfer().WithMore(false).WithNullPayload();
+            peer.ExpectDetach().WithClosed(true).Respond();
+            peer.ExpectEnd().Respond();
+            peer.ExpectClose().Respond();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, remotePort);
+            IStreamSender sender = connection.OpenStreamSender("test-qos");
+            IStreamSenderMessage tracker = sender.BeginMessageAsync().GetAwaiter().GetResult();
+
+            OutputStreamOptions options = new OutputStreamOptions();
+            Stream stream = tracker.GetBodyStream(options);
+
+            Assert.IsNotNull(stream);
+
+            sender.OpenTask.Wait();
+
+            stream.Close();
+
+            sender.Close();
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
       public void TestOutputStreamOptionsEnforcesValidBodySizeValues()
       {
          OutputStreamOptions options = new OutputStreamOptions();