IMPALA-9202: Fix flakiness in test_executor_groups

Some tests in test_executor_groups immediately tried fetching the query
profile after executing it asynchronously to verify if the query was
queued. However there is a small window between the exec rpc returning
and the query being queued during which the query profile does not
contain any info about the query being queued. This was causing some
asserts in the test to fail.

Change-Id: I47070045250a12d86c99f9a30a956a268be5fa7e
Reviewed-on: http://gerrit.cloudera.org:8080/14810
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 9b696bb..1ba8830 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -113,6 +113,12 @@
     return self.coordinator.service.get_metric_value(
       METRIC_PREFIX.format(self._group_name(group_name_suffix)))
 
+  def _assert_eventually_in_profile(self, query_handle, expected_str):
+    """Assert with a timeout of 60 sec and a polling interval of 1 sec that the
+    expected_str exists in the query profile."""
+    self.assert_eventually(
+      60, 1, lambda: expected_str in self.client.get_runtime_profile(query_handle))
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-queue_wait_timeout_ms=2000")
   def test_no_group_timeout(self):
@@ -136,8 +142,7 @@
     QUERY = "select count(*) from functional.alltypestiny"
     client = self.client
     handle = client.execute_async(QUERY)
-    profile = client.get_runtime_profile(handle)
-    assert "Waiting for executors to start" in profile
+    self._assert_eventually_in_profile(handle, "Waiting for executors to start")
     assert self._get_num_executor_groups(only_healthy=True) == 0
     self._add_executor_group("group1", 2)
     client.wait_for_finished_timeout(handle, 20)
@@ -161,8 +166,7 @@
     assert self._get_num_executor_groups(only_healthy=True) == 0
     # Run query and observe timeout
     handle = client.execute_async(QUERY)
-    profile = client.get_runtime_profile(handle)
-    assert "Waiting for executors to start" in profile, profile
+    self._assert_eventually_in_profile(handle, "Waiting for executors to start")
     # Restart executor
     executor.start()
     # Query should now finish
@@ -181,8 +185,8 @@
     q1 = client.execute_async("select sleep(5000)")
     q2 = client.execute_async("select sleep(3)")
     # Verify that q2 is queued up behind q1
-    profile = client.get_runtime_profile(q2)
-    assert "Initial admission queue reason: number of running queries" in profile, profile
+    self._assert_eventually_in_profile(
+      q2, "Initial admission queue reason: number of running queries")
     # Kill an executor
     executor = self.cluster.impalads[1]
     executor.kill()
@@ -211,9 +215,8 @@
     q1 = client.execute_async(QUERY)
     client.wait_for_admission_control(q1)
     q2 = client.execute_async(QUERY)
-    profile = client.get_runtime_profile(q2)
-    assert ("Initial admission queue reason: Not enough admission control slots "
-            "available on host" in profile)
+    self._assert_eventually_in_profile(q2, "Initial admission queue reason: Not enough "
+                                           "admission control slots available on host")
     client.cancel(q1)
     client.cancel(q2)
 
@@ -223,7 +226,6 @@
     assert "number of admission control slots needed" in str(result)
     assert "is greater than total slots available" in str(result)
 
-
   @pytest.mark.execute_serially
   def test_multiple_executor_groups(self):
     """Tests that two queries can run on two separate executor groups simultaneously."""
@@ -256,8 +258,7 @@
     q1 = client.execute_async(QUERY)
     client.wait_for_admission_control(q1)
     q2 = client.execute_async(QUERY)
-    profile = client.get_runtime_profile(q2)
-    assert "Initial admission queue reason" in profile
+    self._assert_eventually_in_profile(q2, "Initial admission queue reason")
     client.cancel(q1)
     client.cancel(q2)
 
@@ -318,14 +319,13 @@
     # Run query and observe that it gets queued
     client = self.client
     handle = client.execute_async(QUERY)
-    profile = client.get_runtime_profile(handle)
-    assert "Initial admission queue reason: Waiting for executors to start" in profile
+    self._assert_eventually_in_profile(handle, "Initial admission queue reason:"
+                                               " Waiting for executors to start")
     initial_state = client.get_state(handle)
     # Start another executor and observe that the query stays queued
     self._add_executor_group("group1", 3, num_executors=1)
     self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 3)
     assert self._get_num_executor_groups(only_healthy=True) == 0
-    profile = client.get_runtime_profile(handle)
     assert client.get_state(handle) == initial_state
     # Start the remaining executor and observe that the query finishes
     self._add_executor_group("group1", 3, num_executors=1)