Merge branch 'cassandra-3.0' into cassandra-3.9
diff --git a/CHANGES.txt b/CHANGES.txt
index d6a9d16..ca5b383 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
  * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
  * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 Merged from 2.2:
+ * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
  * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
  * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
 Merged from 2.1:
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 2dd5613..ce85449 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -1218,7 +1218,7 @@
 
             if self.tracing_enabled:
                 try:
-                    for trace in future.get_all_query_traces(self.max_trace_wait):
+                    for trace in future.get_all_query_traces(max_wait_per=self.max_trace_wait, query_cl=self.consistency_level):
                         print_trace(self, trace)
                 except TraceUnavailable:
                     msg = "Statement trace did not complete within %d seconds; trace data may be incomplete." % (self.session.max_trace_wait,)
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index ebc4f76..64abf00 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,7 +24,6 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -113,9 +112,13 @@
         }
     }
 
+    public final static Runnable NO_OP_TASK = () -> {};
+
     /**
      * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
-     * tracing stage.  See CASSANDRA-1123 for background.
+     * tracing stage.  See CASSANDRA-1123 for background. We allow submitting NO_OP tasks, to allow
+     * a final wait on pending trace events since typically the tracing executor is single-threaded, see
+     * CASSANDRA-11465.
      */
     private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
     {
@@ -138,6 +141,11 @@
         @Override
         public Future<?> submit(Runnable task)
         {
+            if (task.equals(NO_OP_TASK))
+            {
+                assert getMaximumPoolSize() == 1 : "Cannot wait for pending tasks if running more than 1 thread";
+                return super.submit(task);
+            }
             throw new UnsupportedOperationException();
         }
 
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index fbe2c33..bc8d5dd 100644
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -42,4 +42,9 @@
     {
         delegate.traceImpl(message);
     }
+
+    protected void waitForPendingEvents()
+    {
+        delegate.waitForPendingEvents();
+    }
 }
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 5365d09..ec2bc9e 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -111,6 +111,8 @@
 
     public synchronized void stop()
     {
+        waitForPendingEvents();
+
         status = Status.STOPPED;
         notifyAll();
     }
@@ -179,6 +181,8 @@
 
     protected abstract void traceImpl(String message);
 
+    protected abstract void waitForPendingEvents();
+
     public boolean acquireReference()
     {
         while (true)
@@ -193,6 +197,7 @@
 
     public int releaseReference()
     {
+        waitForPendingEvents();
         return references.decrementAndGet();
     }
 }
diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index 113ebb7..e2d3a68 100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@ -20,6 +20,11 @@
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -27,6 +32,7 @@
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 /**
@@ -35,6 +41,10 @@
  */
 public class TraceStateImpl extends TraceState
 {
+    private static final Logger logger = LoggerFactory.getLogger(TraceStateImpl.class);
+    private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS =
+      Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "1"));
+
     public TraceStateImpl(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType)
     {
         super(coordinator, sessionId, traceType);
@@ -46,6 +56,34 @@
         final int elapsed = elapsed();
 
         executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl));
+        if (logger.isTraceEnabled())
+            logger.trace("Adding <{}> to trace events", message);
+    }
+
+    /**
+     * Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations
+     * have at least been applied to one replica. This works because the tracking executor only
+     * has one thread in its pool, see {@link StageManager#tracingExecutor()}.
+     */
+    protected void waitForPendingEvents()
+    {
+        if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
+            return;
+
+        try
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Waiting for up to {} seconds for trace events to complete",
+                             WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
+
+            StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK)
+                        .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS);
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            logger.debug("Failed to wait for tracing events to complete: {}", t);
+        }
     }
 
     static void executeMutation(final Mutation mutation)
diff --git a/test/unit/org/apache/cassandra/tracing/TracingTest.java b/test/unit/org/apache/cassandra/tracing/TracingTest.java
index 1b0e507..30521c0 100644
--- a/test/unit/org/apache/cassandra/tracing/TracingTest.java
+++ b/test/unit/org/apache/cassandra/tracing/TracingTest.java
@@ -162,6 +162,9 @@
                     traces.add(string);
                 }
 
+                protected void waitForPendingEvents()
+                {
+                }
             };
         }