AMQNET-619: Handle remote detach properly for sender link
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index d767577..d3982dd 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -54,7 +54,7 @@
 
             string linkName = info.Id + ":" + target.Address;
             var taskCompletionSource = new TaskCompletionSource<bool>();
-            senderLink = new SenderLink(session.UnderlyingSession, linkName, frame, (link, attach) => { taskCompletionSource.SetResult(true); });
+            senderLink = new SenderLink(session.UnderlyingSession, linkName, frame, HandleOpened(taskCompletionSource));
 
             senderLink.AddClosedCallback((sender, error) =>
             {
@@ -73,6 +73,21 @@
 
             return taskCompletionSource.Task;
         }
+        
+        private OnAttached HandleOpened(TaskCompletionSource<bool> tsc) => (link, attach) =>
+        {
+            if (IsClosePending(attach))
+                return;
+
+            tsc.SetResult(true);
+        };
+        
+        private static bool IsClosePending(Attach attach)
+        {
+            // When no link terminus was created, the peer will now detach/close us otherwise
+            // we need to validate the returned remote target prior to open completion.
+            return attach.Target == null;
+        }
 
         private Source CreateSource() => new Source
         {
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 816dc84..cb3f4cc 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -1028,6 +1028,45 @@
             }
         }
 
+        [Test, Timeout(20_000)]
+        public void TestCreateProducerFailsWhenLinkRefused()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                testPeer.ExpectSaslAnonymous();
+                testPeer.ExpectOpen();
+                testPeer.ExpectBegin();
+
+                NmsConnection connection = EstablishAnonymousConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+                string topicName = "myTopic";
+                ITopic topic = session.GetTopic(topicName);
+
+                // Expect a link to a topic node, which we will then refuse
+                testPeer.ExpectSenderAttach(targetMatcher: source =>
+                {
+                    Assert.AreEqual(topicName, source.Address);
+                    Assert.IsFalse(source.Dynamic);
+                    Assert.AreEqual((uint) TerminusDurability.NONE, source.Durable);
+                }, sourceMatcher: Assert.NotNull, refuseLink: true);
+
+                //Expect the detach response to the test peer closing the producer link after refusal.
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: false, replyClosed: false);
+
+                Assert.Catch<NMSException>(() => session.CreateProducer(topic));
+
+                // Shut it down
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
         private NmsConnection EstablishAnonymousConnection(params TestAmqpPeer[] peers)
         {
             return EstablishAnonymousConnection(null, null, peers);
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 5646215..f1981e5 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -421,7 +421,11 @@
             ExpectSenderAttach(sourceMatcher: Assert.NotNull, targetMatcher: Assert.NotNull);
         }
 
-        public void ExpectSenderAttach(Action<Source> sourceMatcher, Action<Target> targetMatcher, uint creditAmount = 100, bool senderSettled = false)
+        public void ExpectSenderAttach(Action<Source> sourceMatcher,
+            Action<Target> targetMatcher,
+            bool refuseLink = false,
+            uint creditAmount = 100,
+            bool senderSettled = false)
         {
             var attachMatcher = new FrameMatcher<Attach>()
                 .WithAssertion(attach => Assert.IsNotNull(attach.LinkName))
@@ -440,15 +444,26 @@
                         RcvSettleMode = ReceiverSettleMode.First,
                         Handle = context.Command.Handle,
                         LinkName = context.Command.LinkName,
-                        Source = context.Command.Source,
-                        Target = context.Command.Target
+                        Source = context.Command.Source
                     };
 
+                    if (refuseLink)
+                        attach.Target = null;
+                    else
+                        attach.Target = context.Command.Target;
+
                     lastInitiatedLinkHandle = context.Command.Handle;
 
                     context.SendCommand(attach);
 
-                    var flow = new Flow()
+                    if (refuseLink)
+                    {
+                        var detach = new Detach { Closed = true, Handle = context.Command.Handle };
+                        context.SendCommand(detach);
+                    }
+                    else
+                    {
+                        var flow = new Flow
                     {
                         NextIncomingId = 1,
                         IncomingWindow = 2048,
@@ -460,6 +475,7 @@
                     };
 
                     context.SendCommand(flow);
+                    }
                 });
 
             AddMatcher(attachMatcher);