IMPALA-9262: De-flake TestBlacklist::test_kill_impalad_with_running_queries

test_blacklist.py::TestBlacklist::test_kill_impalad_with_running_queries
runs a query asynchronously, waits for it to reach the RUNNING or FINISHED
state, kills an impalad, and then expects a fetch results request for
the query to fail. The test is flaky because it is possible the query can
finish successfully before an impalad is successfully killed.

The fix is to make the query slower using a debug action.

Testing:
* Looped the test for several hours

Change-Id: I8129323a7eb62cef61f1c6c34da06f08cf6d4b06
Reviewed-on: http://gerrit.cloudera.org:8080/14985
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py
index b278b8b..d2ae254 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -121,11 +121,12 @@
     """Verifies that when an Impala executor is killed while running a query, that the
     Coordinator blacklists the killed executor."""
 
-    # Run a query asynchronously. Normally, this query should take a few seconds to
-    # complete.
+    # Run a query asynchronously. With the debug actions, this query should take a few
+    # minutes to complete.
     query = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
         where t1.l_orderkey = t2.l_orderkey"
-    handle = self.execute_query_async(query)
+    handle = self.execute_query_async(query, query_options={
+        'debug_action': '0:GETNEXT:DELAY|1:GETNEXT:DELAY'})
 
     # Wait for the query to start running
     self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 10)
@@ -145,6 +146,7 @@
       assert "TransmitData() to " in str(e) or "EndDataStream() to " in str(e)
 
     # Run another query which should succeed and verify the impalad was blacklisted.
+    self.client.clear_configuration()  # remove the debug actions
     result = self.execute_query("select count(*) from tpch.lineitem")
     match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
     assert match is not None and match.group(1) == "%s:%s" % \