Fix Thread Leak in OutboundTcpConnection
patch by jasobrown, reviewed by aweisberg for CASSANDRA-13204
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c86687..9ce8d49 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.17
+ * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
* Coalescing strategy can enter infinite loop (CASSANDRA-13159)
* Upgrade netty version to fix memory leak with client encryption (CASSANDRA-13114)
* Fix paging for DISTINCT queries on partition keys and static columns (CASSANDRA-13017)
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1a88220..ff2d929 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -161,8 +161,11 @@
void closeSocket(boolean destroyThread)
{
- backlog.clear();
isStopped = destroyThread; // Exit loop to stop the thread
+ backlog.clear();
+ // in the "destroyThread = true" case, enqueuing the sentinel is important mostly to unblock the backlog.take()
+ // (via the CoalescingStrategy) in case there's a data race between this method enqueuing the sentinel
+ // and run() clearing the backlog on connection failure.
enqueue(CLOSE_SENTINEL, -1);
}
@@ -183,7 +186,7 @@
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
outer:
- while (true)
+ while (!isStopped)
{
try
{
@@ -199,6 +202,7 @@
int count = drainedMessages.size();
//The timestamp of the first message has already been provided to the coalescing strategy
//so skip logging it.
+ inner:
for (QueuedMessage qm : drainedMessages)
{
try
@@ -217,8 +221,12 @@
else if (socket != null || connect())
writeConnected(qm, count == 1 && backlog.isEmpty());
else
+ {
// clear out the queue, else gossip messages back up.
+ drainedMessages.clear();
backlog.clear();
+ break inner;
+ }
}
catch (Exception e)
{