PROTON-2505 Ensure that stream sender write fail fast on reconnection
When a stream sender message is in use and the connection drops and
reconnects future write should fail fast an not wait for the write
timeout.
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 3d3a80f..8686c4c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -628,12 +628,27 @@
}
protected boolean notClosedOrFailed(ClientFuture<?> request) {
+ return notClosedOrFailed(request, protonSender);
+ }
+
+ protected boolean notClosedOrFailed(ClientFuture<?> request, org.apache.qpid.protonj2.engine.Sender sender) {
if (isClosed()) {
request.failed(new ClientIllegalStateException("The Sender was explicitly closed", failureCause));
return false;
} else if (failureCause != null) {
request.failed(failureCause);
return false;
+ } else if (sender.isLocallyClosedOrDetached()) {
+ if (sender.getConnection().getRemoteCondition() != null) {
+ request.failed(ClientExceptionSupport.convertToConnectionClosedException(sender.getConnection().getRemoteCondition()));
+ } else if (sender.getSession().getRemoteCondition() != null) {
+ request.failed(ClientExceptionSupport.convertToSessionClosedException(sender.getSession().getRemoteCondition()));
+ } else if (sender.getEngine().failureCause() != null) {
+ request.failed(ClientExceptionSupport.convertToConnectionClosedException(sender.getEngine().failureCause()));
+ } else {
+ request.failed(new ClientIllegalStateException("Sender closed without a specific error condition"));
+ }
+ return false;
} else {
return true;
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 0edebac..491fe31 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -125,7 +125,7 @@
this, context.getProtonDelivery(), messageFormat, buffer, context.completed(), operation);
executor.execute(() -> {
- if (notClosedOrFailed(operation)) {
+ if (notClosedOrFailed(operation, context.getProtonDelivery().getLink())) {
try {
if (protonSender.isSendable()) {
session.getTransactionContext().send(envelope, null, isSendingSettled());
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
index 0dd4d18..0796d07 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.qpid.protonj2.client.impl;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -31,7 +32,10 @@
import org.apache.qpid.protonj2.client.OutputStreamOptions;
import org.apache.qpid.protonj2.client.StreamSender;
import org.apache.qpid.protonj2.client.StreamSenderMessage;
+import org.apache.qpid.protonj2.client.StreamSenderOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
@@ -47,8 +51,7 @@
@Test
void testStreamMessageFlushFailsAfterConnectionDropped() throws Exception {
- try (ProtonTestServer firstPeer = new ProtonTestServer();
- ProtonTestServer finalPeer = new ProtonTestServer()) {
+ try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
firstPeer.expectSASLAnonymousConnect();
firstPeer.expectOpen().respond();
@@ -92,14 +95,16 @@
firstPeer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
firstPeer.dropAfterLastHandler();
- // Write two then after connection drops the message should fail on future writes
+ // Write two then after connection drops the message should fail on future
+ // writes
stream.write(new byte[] { 0, 1, 2, 3 });
stream.flush();
stream.write(new byte[] { 4, 5, 6, 7 });
stream.flush();
firstPeer.waitForScriptToComplete();
- // Reconnection should have occurred now and we should not be able to flush data from
+ // Reconnection should have occurred now and we should not be able to flush data
+ // from
// the stream as its initial sender instance was closed on disconnect.
finalPeer.waitForScriptToComplete();
finalPeer.expectClose().respond();
@@ -122,35 +127,34 @@
@Test
void testStreamMessageCloseThatFlushesFailsAfterConnectionDropped() throws Exception {
- try (ProtonTestServer firstPeer = new ProtonTestServer();
- ProtonTestServer finalPeer = new ProtonTestServer()) {
+ try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
- firstPeer.expectSASLAnonymousConnect();
- firstPeer.expectOpen().respond();
- firstPeer.expectBegin().respond();
- firstPeer.expectAttach().ofSender().respond();
- firstPeer.remoteFlow().withLinkCredit(1).queue();
- firstPeer.start();
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+ firstPeer.expectAttach().ofSender().respond();
+ firstPeer.remoteFlow().withLinkCredit(1).queue();
+ firstPeer.start();
- finalPeer.expectSASLAnonymousConnect();
- finalPeer.expectOpen().respond();
- finalPeer.expectBegin().respond();
- finalPeer.expectAttach().ofSender().respond();
- finalPeer.remoteFlow().withLinkCredit(1).queue();
- finalPeer.start();
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+ finalPeer.expectAttach().ofSender().respond();
+ finalPeer.remoteFlow().withLinkCredit(1).queue();
+ finalPeer.start();
- final URI primaryURI = firstPeer.getServerURI();
- final URI backupURI = finalPeer.getServerURI();
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
- ConnectionOptions options = new ConnectionOptions();
- options.idleTimeout(5, TimeUnit.SECONDS);
- options.reconnectOptions().reconnectEnabled(true);
- options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort());
+ ConnectionOptions options = new ConnectionOptions();
+ options.idleTimeout(5, TimeUnit.SECONDS);
+ options.reconnectOptions().reconnectEnabled(true);
+ options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort());
- Client container = Client.create();
- Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options);
- StreamSender sender = connection.openStreamSender("test-queue");
- StreamSenderMessage message = sender.beginMessage();
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options);
+ StreamSender sender = connection.openStreamSender("test-queue");
+ StreamSenderMessage message = sender.beginMessage();
OutputStream stream = message.body();
@@ -167,7 +171,8 @@
firstPeer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
firstPeer.dropAfterLastHandler();
- // Write two then after connection drops the message should fail on future writes
+ // Write two then after connection drops the message should fail on future
+ // writes
stream.write(new byte[] { 0, 1, 2, 3 });
stream.flush();
stream.write(new byte[] { 4, 5, 6, 7 });
@@ -175,7 +180,8 @@
firstPeer.waitForScriptToComplete();
- // Reconnection should have occurred now and we should not be able to flush data from
+ // Reconnection should have occurred now and we should not be able to flush data
+ // from
// the stream as its initial sender instance was closed on disconnect.
finalPeer.waitForScriptToComplete();
finalPeer.expectClose().respond();
@@ -198,8 +204,7 @@
@Test
void testStreamMessageWriteThatFlushesFailsAfterConnectionDropped() throws Exception {
- try (ProtonTestServer firstPeer = new ProtonTestServer();
- ProtonTestServer finalPeer = new ProtonTestServer()) {
+ try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
firstPeer.expectSASLAnonymousConnect();
firstPeer.expectOpen().respond();
@@ -237,7 +242,8 @@
firstPeer.waitForScriptToComplete();
- // Reconnection should have occurred now and we should not be able to flush data from
+ // Reconnection should have occurred now and we should not be able to flush data
+ // from
// the stream as its initial sender instance was closed on disconnect.
finalPeer.waitForScriptToComplete();
finalPeer.expectClose().respond();
@@ -256,9 +262,83 @@
}
@Test
+ public void testStreamMessageWriteThatFlushesFailsAfterConnectionDroppedAndReconnected() throws Exception {
+ try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ EncodedDataMatcher dataMatcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
+ TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
+ payloadMatcher.setMessageContentMatcher(dataMatcher);
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+ firstPeer.expectAttach().ofSender().respond();
+ firstPeer.remoteFlow().withLinkCredit(1).queue();
+ firstPeer.expectTransfer().withPayload(payloadMatcher).withMore(true);
+ firstPeer.dropAfterLastHandler();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+ finalPeer.expectAttach().ofSender().respond();
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.maxFrameSize(32768);
+ options.idleTimeout(5, TimeUnit.SECONDS);
+ options.reconnectOptions().reconnectEnabled(true);
+ options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options);
+ StreamSenderOptions senderOptions = new StreamSenderOptions();
+ senderOptions.sendTimeout(1000);
+ StreamSender sender = connection.openStreamSender("test-queue", senderOptions);
+ StreamSenderMessage message = sender.beginMessage();
+ OutputStream stream = message.body();
+
+ stream.write(new byte[] { 0, 1, 2, 3 });
+ stream.flush();
+
+ firstPeer.waitForScriptToComplete();
+
+ // Reconnection should have occurred now and we should not be able to flush data
+ // from the stream as its initial sender instance was closed on disconnect.
+ finalPeer.waitForScriptToComplete();
+
+ // Ensure that idle processing happens in case send blocks so we can see the
+ // send timed out exception
+ finalPeer.remoteEmptyFrame().later(5000);
+ finalPeer.remoteEmptyFrame().later(10000);
+ finalPeer.remoteEmptyFrame().later(15000);
+ finalPeer.remoteEmptyFrame().later(20000); // Test timeout kicks in now
+ finalPeer.expectClose().respond();
+
+ byte[] payload = new byte[1024];
+ Arrays.fill(payload, (byte) 65);
+
+ try {
+ stream.write(payload);
+ stream.flush();
+ fail("Should not be able to write section after connection drop");
+ } catch (IOException ioe) {
+ assertFalse(ioe.getCause() instanceof ClientSendTimedOutException);
+ assertTrue(ioe.getCause() instanceof ClientConnectionRemotelyClosedException);
+ }
+
+ connection.closeAsync().get();
+
+ finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
void testStreamSenderRecoveredAfterReconnectCanCreateAndStreamBytes() throws Exception {
- try (ProtonTestServer firstPeer = new ProtonTestServer();
- ProtonTestServer finalPeer = new ProtonTestServer()) {
+ try (ProtonTestServer firstPeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) {
firstPeer.expectSASLAnonymousConnect();
firstPeer.expectOpen().respond();