Fix uncaught exception during graceful channel shutdown
after exceeding max orphan ids
patch by Christian Aistleitner; reviewed by Andy Tolbert, and Bret McGuire for #1938
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java
index 9060f80..90b02f3 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java
@@ -199,14 +199,14 @@
LOG.debug("[{}] No pending queries, completing graceful shutdown now", logPrefix);
ctx.channel().close();
} else {
- // remove heartbeat handler from pipeline if present.
+ // Remove heartbeat handler from pipeline if present.
ChannelHandler heartbeatHandler = ctx.pipeline().get(ChannelFactory.HEARTBEAT_HANDLER_NAME);
if (heartbeatHandler != null) {
ctx.pipeline().remove(heartbeatHandler);
}
LOG.debug("[{}] There are pending queries, delaying graceful shutdown", logPrefix);
closingGracefully = true;
- closeStartedFuture.setSuccess();
+ closeStartedFuture.trySuccess();
}
}
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java
index 79a575d..35049e9 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java
@@ -39,7 +39,9 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
@@ -256,7 +258,7 @@
}
@Test
- public void should_close_gracefully_if_orphan_ids_above_max_and_pending_requests() {
+ public void should_close_gracefully_if_orphan_ids_above_max_and_pending_request() {
// Given
addToPipeline();
// Generate n orphan ids by writing and cancelling the requests:
@@ -312,6 +314,65 @@
}
@Test
+ public void should_close_gracefully_if_orphan_ids_above_max_and_multiple_pending_requests() {
+ // Given
+ addToPipeline();
+ // Generate n orphan ids by writing and cancelling the requests.
+ for (int i = 0; i < MAX_ORPHAN_IDS; i++) {
+ when(streamIds.acquire()).thenReturn(i);
+ MockResponseCallback responseCallback = new MockResponseCallback();
+ channel
+ .writeAndFlush(
+ new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback))
+ .awaitUninterruptibly();
+ channel.writeAndFlush(responseCallback).awaitUninterruptibly();
+ }
+ // Generate 3 additional requests that are pending and not cancelled.
+ List<MockResponseCallback> pendingResponseCallbacks = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ when(streamIds.acquire()).thenReturn(MAX_ORPHAN_IDS + i);
+ MockResponseCallback responseCallback = new MockResponseCallback();
+ channel
+ .writeAndFlush(
+ new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback))
+ .awaitUninterruptibly();
+ pendingResponseCallbacks.add(responseCallback);
+ }
+
+ // When
+ // Generate the n+1th orphan id that makes us go above the threshold by canceling one if the
+ // pending requests.
+ channel.writeAndFlush(pendingResponseCallbacks.remove(0)).awaitUninterruptibly();
+
+ // Then
+ // Channel should be closing gracefully but there's no way to observe that from the outside
+ // besides writing another request and check that it's rejected.
+ assertThat(channel.closeFuture()).isNotDone();
+ ChannelFuture otherWriteFuture =
+ channel.writeAndFlush(
+ new DriverChannel.RequestMessage(
+ QUERY, false, Frame.NO_PAYLOAD, new MockResponseCallback()));
+ assertThat(otherWriteFuture).isFailed();
+ assertThat(otherWriteFuture.cause())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Channel is closing");
+
+ // When
+ // Cancel the remaining pending requests causing the n+ith orphan ids above the threshold.
+ for (MockResponseCallback pendingResponseCallback : pendingResponseCallbacks) {
+ ChannelFuture future = channel.writeAndFlush(pendingResponseCallback).awaitUninterruptibly();
+
+ // Then
+ // The future should succeed even though the channel has started closing gracefully.
+ assertThat(future).isSuccess();
+ }
+
+ // Then
+ // The graceful shutdown completes.
+ assertThat(channel.closeFuture()).isSuccess();
+ }
+
+ @Test
public void should_close_immediately_if_orphan_ids_above_max_and_no_pending_requests() {
// Given
addToPipeline();