PROTON-2505 Ensure that stream sender write fail fast on reconnection

When a stream sender message is in use and the connection drops and
reconnects future write should fail fast an not wait for the write
timeout.
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 3d3a80f..8686c4c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -628,12 +628,27 @@
     }
 
     protected boolean notClosedOrFailed(ClientFuture<?> request) {
+        return notClosedOrFailed(request, protonSender);
+    }
+
+    protected boolean notClosedOrFailed(ClientFuture<?> request, org.apache.qpid.protonj2.engine.Sender sender) {
         if (isClosed()) {
             request.failed(new ClientIllegalStateException("The Sender was explicitly closed", failureCause));
             return false;
         } else if (failureCause != null) {
             request.failed(failureCause);
             return false;
+        } else if (sender.isLocallyClosedOrDetached()) {
+            if (sender.getConnection().getRemoteCondition() != null) {
+                request.failed(ClientExceptionSupport.convertToConnectionClosedException(sender.getConnection().getRemoteCondition()));
+            } else if (sender.getSession().getRemoteCondition() != null) {
+                request.failed(ClientExceptionSupport.convertToSessionClosedException(sender.getSession().getRemoteCondition()));
+            } else if (sender.getEngine().failureCause() != null) {
+                request.failed(ClientExceptionSupport.convertToConnectionClosedException(sender.getEngine().failureCause()));
+            } else {
+                request.failed(new ClientIllegalStateException("Sender closed without a specific error condition"));
+            }
+            return false;
         } else {
             return true;
         }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 0edebac..491fe31 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -125,7 +125,7 @@
             this, context.getProtonDelivery(), messageFormat, buffer, context.completed(), operation);
 
         executor.execute(() -> {
-            if (notClosedOrFailed(operation)) {
+            if (notClosedOrFailed(operation, context.getProtonDelivery().getLink())) {
                 try {
                     if (protonSender.isSendable()) {
                         session.getTransactionContext().send(envelope, null, isSendingSettled());
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
index 0dd4d18..0796d07 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -31,7 +32,10 @@
 import org.apache.qpid.protonj2.client.OutputStreamOptions;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamSenderMessage;
+import org.apache.qpid.protonj2.client.StreamSenderOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
@@ -47,8 +51,7 @@
 
     @Test
     void testStreamMessageFlushFailsAfterConnectionDropped() throws Exception {
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
 
             firstPeer.expectSASLAnonymousConnect();
             firstPeer.expectOpen().respond();
@@ -92,14 +95,16 @@
             firstPeer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
             firstPeer.dropAfterLastHandler();
 
-            // Write two then after connection drops the message should fail on future writes
+            // Write two then after connection drops the message should fail on future
+            // writes
             stream.write(new byte[] { 0, 1, 2, 3 });
             stream.flush();
             stream.write(new byte[] { 4, 5, 6, 7 });
             stream.flush();
 
             firstPeer.waitForScriptToComplete();
-            // Reconnection should have occurred now and we should not be able to flush data from
+            // Reconnection should have occurred now and we should not be able to flush data
+            // from
             // the stream as its initial sender instance was closed on disconnect.
             finalPeer.waitForScriptToComplete();
             finalPeer.expectClose().respond();
@@ -122,35 +127,34 @@
 
     @Test
     void testStreamMessageCloseThatFlushesFailsAfterConnectionDropped() throws Exception {
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
 
-           firstPeer.expectSASLAnonymousConnect();
-           firstPeer.expectOpen().respond();
-           firstPeer.expectBegin().respond();
-           firstPeer.expectAttach().ofSender().respond();
-           firstPeer.remoteFlow().withLinkCredit(1).queue();
-           firstPeer.start();
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            firstPeer.expectAttach().ofSender().respond();
+            firstPeer.remoteFlow().withLinkCredit(1).queue();
+            firstPeer.start();
 
-           finalPeer.expectSASLAnonymousConnect();
-           finalPeer.expectOpen().respond();
-           finalPeer.expectBegin().respond();
-           finalPeer.expectAttach().ofSender().respond();
-           finalPeer.remoteFlow().withLinkCredit(1).queue();
-           finalPeer.start();
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            finalPeer.expectAttach().ofSender().respond();
+            finalPeer.remoteFlow().withLinkCredit(1).queue();
+            finalPeer.start();
 
-           final URI primaryURI = firstPeer.getServerURI();
-           final URI backupURI = finalPeer.getServerURI();
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
 
-           ConnectionOptions options = new ConnectionOptions();
-           options.idleTimeout(5, TimeUnit.SECONDS);
-           options.reconnectOptions().reconnectEnabled(true);
-           options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort());
+            ConnectionOptions options = new ConnectionOptions();
+            options.idleTimeout(5, TimeUnit.SECONDS);
+            options.reconnectOptions().reconnectEnabled(true);
+            options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort());
 
-           Client container = Client.create();
-           Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options);
-           StreamSender sender = connection.openStreamSender("test-queue");
-           StreamSenderMessage message = sender.beginMessage();
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options);
+            StreamSender sender = connection.openStreamSender("test-queue");
+            StreamSenderMessage message = sender.beginMessage();
 
             OutputStream stream = message.body();
 
@@ -167,7 +171,8 @@
             firstPeer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
             firstPeer.dropAfterLastHandler();
 
-            // Write two then after connection drops the message should fail on future writes
+            // Write two then after connection drops the message should fail on future
+            // writes
             stream.write(new byte[] { 0, 1, 2, 3 });
             stream.flush();
             stream.write(new byte[] { 4, 5, 6, 7 });
@@ -175,7 +180,8 @@
 
             firstPeer.waitForScriptToComplete();
 
-            // Reconnection should have occurred now and we should not be able to flush data from
+            // Reconnection should have occurred now and we should not be able to flush data
+            // from
             // the stream as its initial sender instance was closed on disconnect.
             finalPeer.waitForScriptToComplete();
             finalPeer.expectClose().respond();
@@ -198,8 +204,7 @@
 
     @Test
     void testStreamMessageWriteThatFlushesFailsAfterConnectionDropped() throws Exception {
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
 
             firstPeer.expectSASLAnonymousConnect();
             firstPeer.expectOpen().respond();
@@ -237,7 +242,8 @@
 
             firstPeer.waitForScriptToComplete();
 
-            // Reconnection should have occurred now and we should not be able to flush data from
+            // Reconnection should have occurred now and we should not be able to flush data
+            // from
             // the stream as its initial sender instance was closed on disconnect.
             finalPeer.waitForScriptToComplete();
             finalPeer.expectClose().respond();
@@ -256,9 +262,83 @@
     }
 
     @Test
+    public void testStreamMessageWriteThatFlushesFailsAfterConnectionDroppedAndReconnected() throws Exception {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            EncodedDataMatcher dataMatcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
+            TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
+            payloadMatcher.setMessageContentMatcher(dataMatcher);
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            firstPeer.expectAttach().ofSender().respond();
+            firstPeer.remoteFlow().withLinkCredit(1).queue();
+            firstPeer.expectTransfer().withPayload(payloadMatcher).withMore(true);
+            firstPeer.dropAfterLastHandler();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            finalPeer.expectAttach().ofSender().respond();
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.maxFrameSize(32768);
+            options.idleTimeout(5, TimeUnit.SECONDS);
+            options.reconnectOptions().reconnectEnabled(true);
+            options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options);
+            StreamSenderOptions senderOptions = new StreamSenderOptions();
+            senderOptions.sendTimeout(1000);
+            StreamSender sender = connection.openStreamSender("test-queue", senderOptions);
+            StreamSenderMessage message = sender.beginMessage();
+            OutputStream stream = message.body();
+
+            stream.write(new byte[] { 0, 1, 2, 3 });
+            stream.flush();
+
+            firstPeer.waitForScriptToComplete();
+
+            // Reconnection should have occurred now and we should not be able to flush data
+            // from the stream as its initial sender instance was closed on disconnect.
+            finalPeer.waitForScriptToComplete();
+
+            // Ensure that idle processing happens in case send blocks so we can see the
+            // send timed out exception
+            finalPeer.remoteEmptyFrame().later(5000);
+            finalPeer.remoteEmptyFrame().later(10000);
+            finalPeer.remoteEmptyFrame().later(15000);
+            finalPeer.remoteEmptyFrame().later(20000); // Test timeout kicks in now
+            finalPeer.expectClose().respond();
+
+            byte[] payload = new byte[1024];
+            Arrays.fill(payload, (byte) 65);
+
+            try {
+                stream.write(payload);
+                stream.flush();
+                fail("Should not be able to write section after connection drop");
+            } catch (IOException ioe) {
+                assertFalse(ioe.getCause() instanceof ClientSendTimedOutException);
+                assertTrue(ioe.getCause() instanceof ClientConnectionRemotelyClosedException);
+            }
+
+            connection.closeAsync().get();
+
+            finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
     void testStreamSenderRecoveredAfterReconnectCanCreateAndStreamBytes() throws Exception {
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
 
             firstPeer.expectSASLAnonymousConnect();
             firstPeer.expectOpen().respond();