Fix Thread Leak in OutboundTcpConnection

patch by jasobrown, reviewed by aweisberg for CASSANDRA-13204
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c86687..9ce8d49 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.17
+ * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
  * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
  * Upgrade netty version to fix memory leak with client encryption (CASSANDRA-13114)
  * Fix paging for DISTINCT queries on partition keys and static columns (CASSANDRA-13017)
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1a88220..ff2d929 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -161,8 +161,11 @@
 
     void closeSocket(boolean destroyThread)
     {
-        backlog.clear();
         isStopped = destroyThread; // Exit loop to stop the thread
+        backlog.clear();
+        // in the "destroyThread = true" case, enqueuing the sentinel is important mostly to unblock the backlog.take()
+        // (via the CoalescingStrategy) in case there's a data race between this method enqueuing the sentinel
+        // and run() clearing the backlog on connection failure.
         enqueue(CLOSE_SENTINEL, -1);
     }
 
@@ -183,7 +186,7 @@
         final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
 
         outer:
-        while (true)
+        while (!isStopped)
         {
             try
             {
@@ -199,6 +202,7 @@
             int count = drainedMessages.size();
             //The timestamp of the first message has already been provided to the coalescing strategy
             //so skip logging it.
+            inner:
             for (QueuedMessage qm : drainedMessages)
             {
                 try
@@ -217,8 +221,12 @@
                     else if (socket != null || connect())
                         writeConnected(qm, count == 1 && backlog.isEmpty());
                     else
+                    {
                         // clear out the queue, else gossip messages back up.
+                        drainedMessages.clear();
                         backlog.clear();
+                        break inner;
+                    }
                 }
                 catch (Exception e)
                 {