IMPALA-10369: Dump server stacktraces when test_concurrent_ddls.py timeout

Recently, we see many timeout failures of test_concurrent_ddls.py in S3
builds, e.g. IMPALA-10280, IMPALA-10301, IMPALA-10363. It'd be helpful
to dump the server stacktraces so we can understand why some RPCs are
slow/stuck.

This patch extracts the logic of dumping stacktraces in
script-timeout-check.sh to a separate script, dump-stacktraces.sh.
The script also dumps jstacks of HMS and NameNode. Dumping all these
stacktraces is time-consuming so we do them in parallel, which also
helps to get consistent snapshots of all servers.

When any tests in test_concurrent_ddls.py timeout, we use
dump-stacktraces.sh to dump the stacktraces before exit. Previously,
some tests depend on pytest.mark.timeout for detecting timeouts. It's
hard to add a customized callback for dumping server stacktraces. So
this patch refactors test_concurrent_ddls.py to only use timeout of
multiprocessing.

Tests:
 - Tested the scripts locally.
 - Verified the error handling of timeout logics in Jenkins jobs

Change-Id: I514cf2d0ff842805c0abf7211f2a395151174173
Reviewed-on: http://gerrit.cloudera.org:8080/16800
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/bin/dump-stacktraces.sh b/bin/dump-stacktraces.sh
new file mode 100755
index 0000000..7a9dee9
--- /dev/null
+++ b/bin/dump-stacktraces.sh
@@ -0,0 +1,79 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Helper script that dumps the stacktraces of catalogd, statestored, all impalads
+# (if running) and Hive Metastore server. Results files are put in
+# $IMPALA_TIMEOUT_LOGS_DIR.
+#
+
+function collect_gdb_backtraces() {
+  name=$1
+  pid=$2
+  result="${IMPALA_TIMEOUT_LOGS_DIR}/${name}_${pid}_$(date +%Y%m%d-%H%M%S).txt"
+  echo "**** Generating backtrace of $name with process id: $pid to $result ****"
+  gdb -ex "thread apply all bt" --batch -p $pid >"$result"
+}
+
+function collect_jstacks() {
+  name=$1
+  pid=$2
+  result="${IMPALA_TIMEOUT_LOGS_DIR}/${name}_${pid}_jstack_$(date +%Y%m%d-%H%M%S).txt"
+  echo "**** Generating jstack of $name with process id: $pid to $result ****"
+  $JAVA_HOME/bin/jstack -F $pid >"$result"
+}
+
+# Take stacktraces in parallel to get consistent snapshots
+WORKER_PIDS=()
+mkdir -p "$IMPALA_TIMEOUT_LOGS_DIR"
+
+for pid in $(pgrep impalad); do
+  collect_gdb_backtraces impalad $pid && collect_jstacks impalad $pid &
+  WORKER_PIDS+=($!)
+done
+
+# Catalogd's process name may change. Use 'ps' directly to search the binary name.
+CATALOGD_PID=$(ps aux | grep [c]atalogd | awk '{print $2}')
+if [[ ! -z $CATALOGD_PID ]]; then
+  collect_gdb_backtraces catalogd $CATALOGD_PID && \
+      collect_jstacks catalogd $CATALOGD_PID &
+  WORKER_PIDS+=($!)
+fi
+
+STATESTORED_PID=$(pgrep statestored)
+if [[ ! -z $STATESTORED_PID ]]; then
+  collect_gdb_backtraces statestored $STATESTORED_PID &
+  WORKER_PIDS+=($!)
+fi
+
+HMS_PID=$(ps aux | grep [H]iveMetaStore | awk '{print $2}')
+if [[ ! -z $HMS_PID ]]; then
+  collect_jstacks hms $HMS_PID &
+  WORKER_PIDS+=($!)
+fi
+
+NAMENODE_PID=$(ps aux | grep [N]ameNode | awk '{print $2}')
+if [[ ! -z $NAMENODE_PID ]]; then
+  collect_jstacks namenode $NAMENODE_PID &
+  WORKER_PIDS+=($!)
+fi
+
+for pid in "${WORKER_PIDS[@]}"; do
+  wait $pid
+done
+
diff --git a/bin/script-timeout-check.sh b/bin/script-timeout-check.sh
index 4aec3af..6de23c2 100755
--- a/bin/script-timeout-check.sh
+++ b/bin/script-timeout-check.sh
@@ -32,15 +32,6 @@
 # pkill -P $TIMEOUT_PID || true
 # kill $TIMEOUT_PID
 
-function collect_stacktraces() {
-  name=$1
-  pid=$2
-  echo "**** Generating stacktrace of $name with process id: $pid ****"
-  gdb -ex "thread apply all bt"  --batch -p $pid > \
-      "${IMPALA_TIMEOUT_LOGS_DIR}/${name}_${pid}.txt"
-  $JAVA_HOME/bin/jstack -F $pid > "${IMPALA_TIMEOUT_LOGS_DIR}/${name}_${pid}_jstack.txt"
-}
-
 SCRIPT_NAME=""
 SLEEP_TIMEOUT_MIN=""
 
@@ -102,18 +93,8 @@
 echo
 echo
 
-# Impala might have a thread stuck. Print the stacktrace to the console output.
-mkdir -p "$IMPALA_TIMEOUT_LOGS_DIR"
-for pid in $(pgrep impalad); do
-  collect_stacktraces impalad $pid
-done
-
-# Catalogd's process name may change. Use 'ps' directly to search the binary name.
-CATALOGD_PID=$(ps aux | grep [c]atalogd | awk '{print $2}')
-collect_stacktraces catalogd $CATALOGD_PID
-
-STATESTORED_PID=$(pgrep statestored)
-collect_stacktraces statestored $STATESTORED_PID
+# Impala might have a thread stuck. Dumps the stacktrace for diagnostic.
+"${IMPALA_HOME}"/bin/dump-stacktraces.sh
 
 # Now kill the caller
 kill $PPID
diff --git a/tests/custom_cluster/test_concurrent_ddls.py b/tests/custom_cluster/test_concurrent_ddls.py
index 595ebb7..7ec6153 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -23,7 +23,7 @@
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-
+from tests.util.shell_util import dump_server_stacktraces
 
 class TestConcurrentDdls(CustomClusterTestSuite):
   """Test concurrent DDLs with invalidate metadata"""
@@ -34,7 +34,6 @@
             for e in local_catalog_enabled]
     return "--per_impalad_args=" + ";".join(args)
 
-  @pytest.mark.timeout(120)
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=false",
@@ -42,7 +41,6 @@
   def test_ddls_with_invalidate_metadata(self, unique_database):
     self._run_ddls_with_invalidation(unique_database, sync_ddl=False)
 
-  @pytest.mark.timeout(300)
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=false",
@@ -50,7 +48,6 @@
   def test_ddls_with_invalidate_metadata_sync_ddl(self, unique_database):
     self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
 
-  @pytest.mark.timeout(120)
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     start_args=_make_per_impalad_args([True, False]),
@@ -58,7 +55,6 @@
   def test_mixed_catalog_ddls_with_invalidate_metadata(self, unique_database):
     self._run_ddls_with_invalidation(unique_database, sync_ddl=False)
 
-  @pytest.mark.timeout(300)
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     start_args=_make_per_impalad_args([True, False]),
@@ -66,7 +62,6 @@
   def test_mixed_catalog_ddls_with_invalidate_metadata_sync_ddl(self, unique_database):
     self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
 
-  @pytest.mark.timeout(120)
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",
@@ -74,7 +69,6 @@
   def test_local_catalog_ddls_with_invalidate_metadata(self, unique_database):
     self._run_ddls_with_invalidation(unique_database, sync_ddl=False)
 
-  @pytest.mark.timeout(300)
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",
@@ -123,10 +117,6 @@
         "values (1), (2), (3), (4), (5)" % tbl_name
       ]:
         try:
-          # TODO(IMPALA-9123): Timeout logic here does not work for DDLs since they are
-          #  usually stuck in CREATED state and execute_async() won't return. We finally
-          #  use timeout in pytest.mark.timeout() but it's not precise. We should find a
-          #  more elegant way to detect timeout of DDLs.
           handle = tls.client.execute_async(query)
           is_finished = tls.client.wait_for_finished_timeout(handle, timeout=60)
           assert is_finished, "Query timeout(60s): " + query
@@ -134,17 +124,29 @@
         except ImpalaBeeswaxException as e:
           # Could raise exception when running with INVALIDATE METADATA
           assert TestConcurrentDdls.is_acceptable_error(str(e), sync_ddl), str(e)
-      # TODO(IMPALA-9123): Detect hangs here instead of using pytest.mark.timeout()
       self.execute_query_expect_success(tls.client, "invalidate metadata")
+      return True
 
     # Run DDLs in single thread first. Some bugs causing DDL hangs can be hidden when run
     # with concurrent DDLs.
-    run_ddls(0)
+    res = pool.apply_async(run_ddls, (0,))
+    try:
+      res.get(timeout=100)
+    except TimeoutError:
+      dump_server_stacktraces()
+      assert False, "Single thread execution timeout!"
 
     # Run DDLs with invalidate metadata in parallel
     NUM_ITERS = 16
-    for i in pool.imap_unordered(run_ddls, xrange(1, NUM_ITERS + 1)):
-      pass
+    worker = [None] * (NUM_ITERS + 1)
+    for i in xrange(1, NUM_ITERS + 1):
+      worker[i] = pool.apply_async(run_ddls, (i,))
+    for i in xrange(1, NUM_ITERS + 1):
+      try:
+        worker[i].get(timeout=100)
+      except TimeoutError:
+        dump_server_stacktraces()
+        assert False, "Timeout in thread run_ddls(%d)" % i
 
   @classmethod
   def is_acceptable_error(cls, err, sync_ddl):
@@ -162,7 +164,6 @@
       return True
     return False
 
-  @pytest.mark.timeout(300)
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",
@@ -191,5 +192,6 @@
         r1.get(timeout=60)
         r2.get(timeout=60)
       except TimeoutError:
+        dump_server_stacktraces()
         assert False, "INVALIDATE METADATA timeout in 60s!"
     pool.terminate()
diff --git a/tests/util/shell_util.py b/tests/util/shell_util.py
index 34856a3..ddbd24b 100644
--- a/tests/util/shell_util.py
+++ b/tests/util/shell_util.py
@@ -21,7 +21,7 @@
 import os
 import shlex
 from select import select
-from subprocess import PIPE, Popen, STDOUT
+from subprocess import PIPE, Popen, STDOUT, call
 from textwrap import dedent
 from time import sleep, time
 
@@ -29,6 +29,11 @@
 
 LOG = logging.getLogger('shell_util')
 
+
+def dump_server_stacktraces():
+  LOG.debug('Dumping stacktraces of running servers')
+  call([os.path.join(os.environ['IMPALA_HOME'], "bin/dump-stacktraces.sh")])
+
 def exec_process(cmd):
   """Executes a subprocess, waiting for completion. The process exit code, stdout and
   stderr are returned as a tuple."""