IMPALA-6591: Fix test_ssl flaky test

test_ssl has a logic that waits for the number of in-flight queries to
be 1. However, the logic for wait_for_num_in_flight_queries(1) only
waits for the condition to be true for a period of time and does not
throw an exception when the time has elapsed and the condition is not
met. In other words, the logic in test_ssl that loops while the number
of in-flight queries is 1 never gets executed. I was able to simulate
this issue by making Impala shell start much longer.

Prior to this patch, in the event that Impala shell took much longer to
start, the test started sending the commands to Impala shell even when
Impala shell was not ready to receive commands. The patch fixes the
issue by waiting until Impala shell is connected. The patch also adds
assert in other places that calls wait_for_num_in_flight_queries and
updates the default behavior for Impala shell to wait until it is
connected.

Testing:
- Ran core and exhaustive tests several times on CentOS 6 without any
  issue

Change-Id: I9805269d8b806aecf5d744c219967649a041d49f
Reviewed-on: http://gerrit.cloudera.org:8080/12047
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index d21c268..010cfb0 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -721,7 +721,7 @@
     """Checks if the num of queries accounted by query_locations and in-flight are as
     expected"""
     # Wait for queries to start/un-register.
-    self.impalad_test_service.wait_for_num_in_flight_queries(expected_num)
+    assert self.impalad_test_service.wait_for_num_in_flight_queries(expected_num)
     query_locations = self.impalad_test_service.get_query_locations()
     for host, num_q in query_locations.items():
       assert num_q == expected_num, "There should be {0} running queries on either " \
diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py
index f5ada13..6be167e 100644
--- a/tests/custom_cluster/test_client_ssl.py
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -84,11 +84,11 @@
 
     # Test cancelling a query
     impalad = ImpaladService(socket.getfqdn())
-    impalad.wait_for_num_in_flight_queries(0)
+    assert impalad.wait_for_num_in_flight_queries(0)
     p = ImpalaShell(args="--ssl")
     p.send_cmd("SET DEBUG_ACTION=0:OPEN:WAIT")
     p.send_cmd("select count(*) from functional.alltypes")
-    impalad.wait_for_num_in_flight_queries(1)
+    assert impalad.wait_for_num_in_flight_queries(1)
 
     LOG = logging.getLogger('test_client_ssl')
     LOG.info("Cancelling query")
@@ -216,13 +216,13 @@
   def _validate_positive_cases(self, ca_cert=""):
     shell_options = "--ssl -q 'select 1 + 2'"
 
-    result = run_impala_shell_cmd(shell_options)
+    result = run_impala_shell_cmd(shell_options, wait_until_connected=False)
     for msg in [self.SSL_ENABLED, self.CONNECTED, self.FETCHED]:
       assert msg in result.stderr
 
     if ca_cert != "":
       shell_options = shell_options + (" --ca_cert=%s" % ca_cert)
-      result = run_impala_shell_cmd(shell_options)
+      result = run_impala_shell_cmd(shell_options, wait_until_connected=False)
       for msg in [self.SSL_ENABLED, self.CONNECTED, self.FETCHED]:
         assert msg in result.stderr
 
diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py b/tests/custom_cluster/test_shell_interactive_reconnect.py
index b5a9065..94f1a7e 100644
--- a/tests/custom_cluster/test_shell_interactive_reconnect.py
+++ b/tests/custom_cluster/test_shell_interactive_reconnect.py
@@ -56,7 +56,7 @@
 
     # wait for the USE command to finish
     impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 1)
-    impalad.wait_for_num_in_flight_queries(0)
+    assert impalad.wait_for_num_in_flight_queries(0)
 
     self._start_impala_cluster([])
 
@@ -79,7 +79,7 @@
 
     # wait for the USE command to finish
     impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 1)
-    impalad.wait_for_num_in_flight_queries(0)
+    assert impalad.wait_for_num_in_flight_queries(0)
 
     # Disconnect
     self.cluster.impalads[0].kill()
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index b69bece..819af88 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -142,7 +142,7 @@
     assert "Query: use `parquet`" in result.stderr, result.stderr
 
   def test_unsecure_message(self):
-    results = run_impala_shell_cmd("")
+    results = run_impala_shell_cmd("", wait_until_connected=False)
     assert "Starting Impala Shell without Kerberos authentication" in results.stderr
 
   def test_print_header(self, populated_table):
@@ -473,7 +473,7 @@
 
     # Testing config file related warning and error messages
     args = '--config_file=%s/impalarc_with_warnings' % QUERY_FILE_PATH
-    result = run_impala_shell_cmd(args, expect_success=True)
+    result = run_impala_shell_cmd(args, expect_success=True, wait_until_connected=False)
     assert "WARNING: Option 'config_file' can be only set from shell." in result.stderr
     err_msg = ("WARNING: Unable to read configuration file correctly. "
                "Ignoring unrecognized config option: 'invalid_option'\n")
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index e787bf4..85c1ee7 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -170,7 +170,7 @@
   @pytest.mark.execute_serially
   def test_cancellation(self):
     impalad = ImpaladService(socket.getfqdn())
-    impalad.wait_for_num_in_flight_queries(0)
+    assert impalad.wait_for_num_in_flight_queries(0)
     command = "select sleep(10000);"
     p = ImpalaShell()
     p.send_cmd(command)
@@ -207,7 +207,8 @@
   def test_disconnected_shell(self):
     """Test that the shell presents a disconnected prompt if it can't connect
     """
-    result = run_impala_shell_interactive('asdf;', shell_args='-i foo')
+    result = run_impala_shell_interactive('asdf;', shell_args='-i foo',
+                                          wait_until_connected=False)
     assert ImpalaShellClass.DISCONNECTED_PROMPT in result.stdout
 
   def test_bash_cmd_timing(self):
@@ -751,7 +752,7 @@
       assert "Fetched 0 row" in result.stderr
 
 
-def run_impala_shell_interactive(input_lines, shell_args=None):
+def run_impala_shell_interactive(input_lines, shell_args=None, wait_until_connected=True):
   """Runs a command in the Impala shell interactively."""
   # if argument "input_lines" is a string, makes it into a list
   if type(input_lines) is str:
@@ -760,7 +761,7 @@
   # since piping defaults to ascii
   my_env = os.environ
   my_env['PYTHONIOENCODING'] = 'utf-8'
-  p = ImpalaShell(args=shell_args, env=my_env)
+  p = ImpalaShell(args=shell_args, env=my_env, wait_until_connected=wait_until_connected)
   for line in input_lines:
     p.send_cmd(line)
   return p.get_result()
diff --git a/tests/shell/util.py b/tests/shell/util.py
index 6c1df0e..f49cbfe 100755
--- a/tests/shell/util.py
+++ b/tests/shell/util.py
@@ -22,7 +22,7 @@
 import pytest
 import re
 import shlex
-import shutil
+import time
 from subprocess import Popen, PIPE
 
 IMPALAD_HOST_PORT_LIST = pytest.config.option.impalad.split(',')
@@ -85,13 +85,16 @@
   m = re.search(pattern, text, re.MULTILINE)
   assert m and m.group(0) == result, message
 
-def run_impala_shell_cmd(shell_args, expect_success=True, stdin_input=None):
+
+def run_impala_shell_cmd(shell_args, expect_success=True, stdin_input=None,
+                         wait_until_connected=True):
   """Runs the Impala shell on the commandline.
 
   'shell_args' is a string which represents the commandline options.
   Returns a ImpalaShellResult.
   """
-  result = run_impala_shell_cmd_no_expect(shell_args, stdin_input)
+  result = run_impala_shell_cmd_no_expect(shell_args, stdin_input,
+                                          expect_success and wait_until_connected)
   if expect_success:
     assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (shell_args,
                                                                    result.stderr)
@@ -99,7 +102,9 @@
     assert result.rc != 0, "Cmd %s was expected to fail" % shell_args
   return result
 
-def run_impala_shell_cmd_no_expect(shell_args, stdin_input=None):
+
+def run_impala_shell_cmd_no_expect(shell_args, stdin_input=None,
+                                   wait_until_connected=True):
   """Runs the Impala shell on the commandline.
 
   'shell_args' is a string which represents the commandline options.
@@ -107,23 +112,34 @@
 
   Does not assert based on success or failure of command.
   """
-  p = ImpalaShell(shell_args)
+  p = ImpalaShell(shell_args, wait_until_connected=wait_until_connected)
   result = p.get_result(stdin_input)
-  cmd = "%s %s" % (SHELL_CMD, shell_args)
   return result
 
+
 class ImpalaShellResult(object):
   def __init__(self):
     self.rc = 0
     self.stdout = str()
     self.stderr = str()
 
+
 class ImpalaShell(object):
   """A single instance of the Impala shell. The proces is started when this object is
      constructed, and then users should repeatedly call send_cmd(), followed eventually by
-     get_result() to retrieve the process output."""
-  def __init__(self, args=None, env=None):
+     get_result() to retrieve the process output. This constructor will wait until
+     Impala shell is connected for the specified timeout unless wait_util_connected is
+     set to False or --quiet is passed into the args"""
+  def __init__(self, args=None, env=None, wait_until_connected=True, timeout=60):
     self.shell_process = self._start_new_shell_process(args, env=env)
+    # When --quiet option is passed to Impala shell, we should not wait until we see
+    # "Connected to" because it will never be printed to stderr.
+    if wait_until_connected and (args is None or "--quiet" not in args):
+      start_time = time.time()
+      connected = False
+      while time.time() - start_time < timeout and not connected:
+        connected = "Connected to" in self.shell_process.stderr.readline()
+      assert connected, "Impala shell is not connected"
 
   def pid(self):
     return self.shell_process.pid