Fix uncaught exception during graceful channel shutdown

after exceeding max orphan ids

patch by Christian Aistleitner; reviewed by Andy Tolbert, and Bret McGuire for #1938
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java
index 9060f80..90b02f3 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java
@@ -199,14 +199,14 @@
       LOG.debug("[{}] No pending queries, completing graceful shutdown now", logPrefix);
       ctx.channel().close();
     } else {
-      // remove heartbeat handler from pipeline if present.
+      // Remove heartbeat handler from pipeline if present.
       ChannelHandler heartbeatHandler = ctx.pipeline().get(ChannelFactory.HEARTBEAT_HANDLER_NAME);
       if (heartbeatHandler != null) {
         ctx.pipeline().remove(heartbeatHandler);
       }
       LOG.debug("[{}] There are pending queries, delaying graceful shutdown", logPrefix);
       closingGracefully = true;
-      closeStartedFuture.setSuccess();
+      closeStartedFuture.trySuccess();
     }
   }
 
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java
index 79a575d..35049e9 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java
@@ -39,7 +39,9 @@
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelPromise;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
@@ -256,7 +258,7 @@
   }
 
   @Test
-  public void should_close_gracefully_if_orphan_ids_above_max_and_pending_requests() {
+  public void should_close_gracefully_if_orphan_ids_above_max_and_pending_request() {
     // Given
     addToPipeline();
     // Generate n orphan ids by writing and cancelling the requests:
@@ -312,6 +314,65 @@
   }
 
   @Test
+  public void should_close_gracefully_if_orphan_ids_above_max_and_multiple_pending_requests() {
+    // Given
+    addToPipeline();
+    // Generate n orphan ids by writing and cancelling the requests.
+    for (int i = 0; i < MAX_ORPHAN_IDS; i++) {
+      when(streamIds.acquire()).thenReturn(i);
+      MockResponseCallback responseCallback = new MockResponseCallback();
+      channel
+          .writeAndFlush(
+              new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback))
+          .awaitUninterruptibly();
+      channel.writeAndFlush(responseCallback).awaitUninterruptibly();
+    }
+    // Generate 3 additional requests that are pending and not cancelled.
+    List<MockResponseCallback> pendingResponseCallbacks = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      when(streamIds.acquire()).thenReturn(MAX_ORPHAN_IDS + i);
+      MockResponseCallback responseCallback = new MockResponseCallback();
+      channel
+          .writeAndFlush(
+              new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback))
+          .awaitUninterruptibly();
+      pendingResponseCallbacks.add(responseCallback);
+    }
+
+    // When
+    // Generate the n+1th orphan id that makes us go above the threshold by canceling one if the
+    // pending requests.
+    channel.writeAndFlush(pendingResponseCallbacks.remove(0)).awaitUninterruptibly();
+
+    // Then
+    // Channel should be closing gracefully but there's no way to observe that from the outside
+    // besides writing another request and check that it's rejected.
+    assertThat(channel.closeFuture()).isNotDone();
+    ChannelFuture otherWriteFuture =
+        channel.writeAndFlush(
+            new DriverChannel.RequestMessage(
+                QUERY, false, Frame.NO_PAYLOAD, new MockResponseCallback()));
+    assertThat(otherWriteFuture).isFailed();
+    assertThat(otherWriteFuture.cause())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Channel is closing");
+
+    // When
+    // Cancel the remaining pending requests causing the n+ith orphan ids above the threshold.
+    for (MockResponseCallback pendingResponseCallback : pendingResponseCallbacks) {
+      ChannelFuture future = channel.writeAndFlush(pendingResponseCallback).awaitUninterruptibly();
+
+      // Then
+      // The future should succeed even though the channel has started closing gracefully.
+      assertThat(future).isSuccess();
+    }
+
+    // Then
+    // The graceful shutdown completes.
+    assertThat(channel.closeFuture()).isSuccess();
+  }
+
+  @Test
   public void should_close_immediately_if_orphan_ids_above_max_and_no_pending_requests() {
     // Given
     addToPipeline();