IMPALA-8863: Add support to run tests over HTTP/HS2

This change adds support to run backend tests over HTTP using a new
version of Impyla (0.16.1). It also adds a test that exercises
authentication over HTTP.

Change-Id: I7156558071781378fcb9c8941c0f4dd82eb0d018
Reviewed-on: http://gerrit.cloudera.org:8080/14059
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index b81d290..c34a968 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -144,6 +144,7 @@
           timeout_us - static_cast<int64_t>(
                            round(wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO)));
       if (!consumer_cv_.WaitFor(l, wait_duration_us)) {
+        VLOG_QUERY << "Fetch timed out";
         timed_out = true;
 
         // If the consumer timed out, make sure results_ is set to nullptr because the
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index df4d9f0..d219eb4 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -181,7 +181,7 @@
 
 Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
     SessionState* session, int32_t fetch_size, bool fetch_first,
-    TFetchResultsResp* fetch_results) {
+    TFetchResultsResp* fetch_results, int32_t* num_results) {
   // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
   // ensures that rows are ready to be fetched (e.g., Wait() opens
   // ClientRequestState::output_exprs_, which are evaluated in
@@ -218,6 +218,7 @@
       version, *(request_state->result_metadata()), &(fetch_results->results)));
   RETURN_IF_ERROR(
       request_state->FetchRows(fetch_size, result_set.get(), block_on_wait_time_us));
+  *num_results = result_set->size();
   fetch_results->__isset.results = true;
   fetch_results->__set_hasMoreRows(!request_state->eos());
   return Status::OK();
@@ -355,7 +356,7 @@
         if (status.ok() && iequals(v.first, "idle_session_timeout")) {
           state->session_timeout = state->set_query_options.idle_session_timeout;
           VLOG_QUERY << "OpenSession(): session: " << PrintId(session_id)
-                     <<" idle_session_timeout="
+                     << " idle_session_timeout="
                      << PrettyPrinter::Print(state->session_timeout, TUnit::TIME_S);
         }
       }
@@ -880,9 +881,12 @@
           session_id, SecretArg::Operation(op_secret, query_id), &session),
       SQLSTATE_GENERAL_ERROR);
 
-  Status status = FetchInternal(
-      request_state.get(), session.get(), request.maxRows, fetch_first, &return_val);
-  VLOG_ROW << "FetchResults(): #results=" << return_val.results.rows.size()
+  int32_t num_results = 0;
+  Status status = FetchInternal(request_state.get(), session.get(), request.maxRows,
+      fetch_first, &return_val, &num_results);
+
+  VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id)
+           << " #results=" << num_results
            << " has_more=" << (return_val.hasMoreRows ? "true" : "false");
   if (!status.ok()) {
     // Only unregister the query if the underlying error is unrecoverable.
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index cd66c9a..a05bddc 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -935,13 +935,14 @@
       apache::hive::service::cli::thrift::TOperationHandle* handle,
       apache::hive::service::cli::thrift::TStatus* status);
 
-  /// Executes the fetch logic for HiveServer2 FetchResults. If fetch_first is true, then
-  /// the query's state should be reset to fetch from the beginning of the result set.
-  /// Doesn't clean up 'request_state' if an error occurs.
+  /// Executes the fetch logic for HiveServer2 FetchResults and stores result size in
+  /// 'num_results'. If fetch_first is true, then the query's state should be reset to
+  /// fetch from the beginning of the result set. Doesn't clean up 'request_state' if an
+  /// error occurs.
   Status FetchInternal(ClientRequestState* request_state, SessionState* session,
       int32_t fetch_size, bool fetch_first,
-      apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results)
-      WARN_UNUSED_RESULT;
+      apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results,
+      int32_t* num_results) WARN_UNUSED_RESULT;
 
   /// Helper functions to translate between HiveServer2 and Impala structs
 
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index bfbd50a..d1459ad 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -149,6 +149,10 @@
 # For build types like ASAN, modify the default Kudu rpc timeout.
 KUDU_RPC_TIMEOUT = build_flavor_timeout(0, slow_build_timeout=60000)
 
+# HTTP connections don't keep alive their associated sessions. We increase the timeout
+# during builds to make spurious session expiration less likely.
+DISCONNECTED_SESSION_TIMEOUT = 3600
+
 def check_process_exists(binary, attempts=1):
   """Checks if a process exists given the binary name. The `attempts` count allows us to
   control the time a process needs to settle until it becomes available. After each try
@@ -336,6 +340,11 @@
           kudu_rpc_timeout=KUDU_RPC_TIMEOUT,
           args=args)
 
+    if "disconnected_session_timeout" not in args:
+      args = "-disconnected_session_timeout {timeout} {args}".format(
+          timeout=DISCONNECTED_SESSION_TIMEOUT,
+          args=args)
+
     if i >= num_coordinators:
       args = "-is_coordinator=false {args}".format(args=args)
     elif use_exclusive_coordinators:
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
index cd7c7c3..777bf844 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
@@ -82,40 +82,6 @@
   }
 
   /**
-   * Helper to run a shell command 'cmd'. If 'shouldSucceed' is true, the command
-   * is expected to succeed, failure otherwise. Returns the stdout from the command.
-   */
-  private String runShellCommand(String[] cmd, boolean shouldSucceed, String expectedOut,
-      String expectedErr) throws Exception {
-    Runtime rt = Runtime.getRuntime();
-    Process process = rt.exec(cmd);
-    // Collect the stderr.
-    BufferedReader input = new BufferedReader(
-        new InputStreamReader(process.getErrorStream()));
-    StringBuffer stderrBuf = new StringBuffer();
-    String line;
-    while ((line = input.readLine()) != null) {
-      stderrBuf.append(line);
-      stderrBuf.append('\n');
-    }
-    String stderr = stderrBuf.toString();
-    assertTrue(stderr, stderr.contains(expectedErr));
-    // Collect the stdout (which has the resultsets).
-    input = new BufferedReader(new InputStreamReader(process.getInputStream()));
-    StringBuffer stdoutBuf = new StringBuffer();
-    while ((line = input.readLine()) != null) {
-      stdoutBuf.append(line);
-      stdoutBuf.append('\n');
-    }
-    int expectedReturn = shouldSucceed ? 0 : 1;
-    assertEquals(stderr.toString(), expectedReturn, process.waitFor());
-    // If the query succeeds, assert that the output is correct.
-    String stdout = stdoutBuf.toString();
-    if (shouldSucceed) {
-      assertTrue(stdout, stdout.contains(expectedOut));
-    }
-    return stdout;
-  }
 
   /**
    * Checks if the local python supports SSLContext needed by shell http
@@ -127,7 +93,7 @@
     // python -c "import ssl; print hasattr(ssl, 'create_default_context')"
     String[] cmd =
         {"python", "-c", "import ssl; print hasattr(ssl, 'create_default_context')"};
-    return Boolean.parseBoolean(runShellCommand(cmd, true, "", "").replace("\n", ""));
+    return Boolean.parseBoolean(RunShellCommand.Run(cmd, true, "", "").replace("\n", ""));
   }
 
   /**
@@ -159,13 +125,13 @@
     for (String protocol: protocolsToTest) {
       protocol = String.format(protocolTemplate, protocol);
       validCommand[1] = protocol;
-      runShellCommand(validCommand, /*shouldSucceed*/ true, testUser_,
+      RunShellCommand.Run(validCommand, /*shouldSucceed*/ true, testUser_,
           "Starting Impala Shell using LDAP-based authentication");
       invalidCommand[1] = protocol;
-      runShellCommand(
+      RunShellCommand.Run(
           invalidCommand, /*shouldSucceed*/ false, "", "Not connected to Impala");
       commandWithoutAuth[1] = protocol;
-      runShellCommand(
+      RunShellCommand.Run(
           commandWithoutAuth, /*shouldSucceed*/ false, "", "Not connected to Impala");
     }
   }
@@ -194,23 +160,24 @@
     // Run with an invalid proxy user.
     String[] command = buildCommand(
         query, "hs2-http", testUser2_, testPassword2_, "/?doAs=" + delegateUser_);
-    runShellCommand(command, /* shouldSucceed */ false, "",
+    RunShellCommand.Run(command, /* shouldSucceed */ false, "",
         String.format(errTemplate, testUser2_, delegateUser_));
 
     // Run with a valid proxy user but invalid delegate user.
     command = buildCommand(
         query, "hs2-http", testUser_, testPassword_, "/?doAs=" + invalidDelegateUser);
-    runShellCommand(command, /* shouldSucceed */ false, "",
+    RunShellCommand.Run(command, /* shouldSucceed */ false, "",
         String.format(errTemplate, testUser_, invalidDelegateUser));
 
     // 'doAs' parameter that cannot be decoded.
     command = buildCommand(
         query, "hs2-http", testUser_, testPassword_, "/?doAs=%");
-    runShellCommand(command, /* shouldSucceed */ false, "", "Not connected to Impala");
+    RunShellCommand.Run(command, /* shouldSucceed */ false, "",
+        "Not connected to Impala");
 
     // Successfully delegate.
     command = buildCommand(
         query, "hs2-http", testUser_, testPassword_, "/?doAs=" + delegateUser_);
-    runShellCommand(command, /* shouldSucceed */ true, delegateUser_, "");
+    RunShellCommand.Run(command, /* shouldSucceed */ true, delegateUser_, "");
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapImpylaHttpTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapImpylaHttpTest.java
new file mode 100644
index 0000000..b551ec8
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapImpylaHttpTest.java
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.directory.server.core.annotations.CreateDS;
+import org.apache.directory.server.core.annotations.CreatePartition;
+import org.apache.directory.server.annotations.CreateLdapServer;
+import org.apache.directory.server.annotations.CreateTransport;
+import org.apache.directory.server.core.annotations.ApplyLdifFiles;
+import org.apache.directory.server.core.integ.CreateLdapServerRule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+
+/**
+ * Impyla HTTP connectivity tests with LDAP authentication.
+ */
+@CreateDS(name = "myDS",
+    partitions = { @CreatePartition(name = "test", suffix = "dc=myorg,dc=com") })
+@CreateLdapServer(
+    transports = { @CreateTransport(protocol = "LDAP", address = "localhost") })
+@ApplyLdifFiles({"users.ldif"})
+public class LdapImpylaHttpTest {
+
+  @ClassRule
+  public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
+
+  // Query used by all tests
+  private static String query_ = "select logged_in_user()";
+
+  // These correspond to the values in fe/src/test/resources/users.ldif
+  private static final String testUser_ = "Test1Ldap";
+  private static final String testPassword_ = "12345";
+  private static final String testUser2_ = "Test2Ldap";
+  private static final String testPassword2_ = "abcde";
+
+  private static final String helper_ = System.getenv("IMPALA_HOME") +
+      "/tests/util/run_impyla_http_query.py";
+
+  // The cluster will be set up to allow testUser_ to act as a proxy for delegateUser_.
+  // Includes a special character to test HTTP path encoding.
+  private static final String delegateUser_ = "proxyUser$";
+
+  @Before
+  public void setUp() throws Exception {
+    String uri =
+        String.format("ldap://localhost:%s", serverRule.getLdapServer().getPort());
+    String dn = "cn=#UID,ou=Users,dc=myorg,dc=com";
+    String ldapArgs = String.format(
+        "--enable_ldap_auth --ldap_uri='%s' --ldap_bind_pattern='%s' " +
+        "--ldap_passwords_in_clear_ok --authorized_proxy_user_config=%s=%s",
+        uri, dn, testUser_, delegateUser_);
+    int ret = CustomClusterRunner.StartImpalaCluster(ldapArgs);
+    assertEquals(ret, 0);
+  }
+
+  @After
+  public void cleanUp() throws Exception {
+    CustomClusterRunner.StartImpalaCluster();
+  }
+
+  /**
+   * Tests ldap authentication using impala-shell.
+   */
+  @Test
+  public void testImpylaHttpLdapAuth() throws Exception {
+    // 1. Valid username and password. Should succeed.
+    String[] validCmd = buildCommand(testUser_, testPassword_, null);
+    RunShellCommand.Run(validCmd, /*shouldSucceed*/ true, testUser_, "");
+    // 2. Invalid username password combination. Should fail.
+    String[] invalidCmd = buildCommand("foo", "bar", null);
+    RunShellCommand.Run(invalidCmd, /*shouldSucceed*/ false, "", "EOFError");
+    // 3. Without username and password. Should fail.
+    String[] noAuthCmd = {"impala-python", helper_, "--query", query_};
+    RunShellCommand.Run(noAuthCmd, /*shouldSucceed*/ false, "", "EOFError");
+  }
+
+  private String[] buildCommand(String user, String password, String httpPath) {
+    List<String> command = Lists.newArrayList(Arrays.asList("impala-python", helper_,
+        "--user", user, "--password", password, "--query", query_));
+    if (httpPath != null) command.addAll(Arrays.asList("--http_path", httpPath));
+    return command.toArray(new String[0]);
+  }
+
+  /**
+   * Tests user impersonation over the HTTP protocol by using the HTTP path to specify the
+   * 'doAs' parameter.
+   */
+  @Test
+  public void testImpylaHttpImpersonation() throws Exception {
+    String invalidDelegateUser = "invalid-delegate-user";
+    String query = "select logged_in_user()";
+    String errTemplate = "User '%s' is not authorized to delegate to '%s'";
+
+    // Run with an invalid proxy user.
+    //String[] command = {"impala-python", helper_, "--user", testUser2_, "--password",
+    //    testPassword2_, "--http_path=/?doAs=" + delegateUser_, "--query", query};
+    String[] cmd = buildCommand(testUser2_, testPassword2_, "/?doAs=" + delegateUser_);
+    RunShellCommand.Run(cmd, /*shouldSucceed*/ false, "",
+        String.format(errTemplate, testUser2_, delegateUser_));
+
+    // Run with a valid proxy user but invalid delegate user.
+    cmd = buildCommand(testUser_, testPassword_, "/?doAs=" + invalidDelegateUser);
+    RunShellCommand.Run(cmd, /*shouldSucceed*/ false, "",
+        String.format(errTemplate, testUser_, invalidDelegateUser));
+
+    // 'doAs' parameter that cannot be decoded.
+    cmd = buildCommand(testUser_, testPassword_, "/?doAs=%");
+    RunShellCommand.Run(cmd, /*shouldSucceed*/ false, "", "httplib.BadStatusLine");
+
+    // Successfully delegate.
+    cmd = buildCommand(testUser_, testPassword_, "/?doAs=" + delegateUser_);
+    RunShellCommand.Run(cmd, /*shouldSucceed*/ true, delegateUser_, "");
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/customcluster/RunShellCommand.java b/fe/src/test/java/org/apache/impala/customcluster/RunShellCommand.java
new file mode 100644
index 0000000..4d1291c
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/RunShellCommand.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+/**
+ * Helper class to run a shell command.
+ */
+class RunShellCommand {
+  /**
+   * Run a shell command 'cmd'. If 'shouldSucceed' is true, the command is expected to
+   * succeed, otherwise it is expected to fail. Returns the output (stdout) of the
+   * command.
+   */
+  public static String Run(String[] cmd, boolean shouldSucceed, String expectedOut,
+      String expectedErr) throws Exception {
+    Runtime rt = Runtime.getRuntime();
+    Process process = rt.exec(cmd);
+    // Collect stderr.
+    BufferedReader input = new BufferedReader(
+        new InputStreamReader(process.getErrorStream()));
+    StringBuffer stderrBuf = new StringBuffer();
+    String line;
+    while ((line = input.readLine()) != null) {
+      stderrBuf.append(line);
+      stderrBuf.append('\n');
+    }
+    String stderr = stderrBuf.toString();
+    assertTrue(stderr, stderr.contains(expectedErr));
+    // Collect the stdout (which has the resultsets).
+    input = new BufferedReader(new InputStreamReader(process.getInputStream()));
+    StringBuffer stdoutBuf = new StringBuffer();
+    while ((line = input.readLine()) != null) {
+      stdoutBuf.append(line);
+      stdoutBuf.append('\n');
+    }
+    int expectedReturn = shouldSucceed ? 0 : 1;
+    assertEquals(stderr.toString(), expectedReturn, process.waitFor());
+    // If the query succeeds, assert that the output is correct.
+    String stdout = stdoutBuf.toString();
+    if (shouldSucceed) assertTrue(stdout, stdout.contains(expectedOut));
+    return stdout;
+  }
+}
diff --git a/infra/python/deps/compiled-requirements.txt b/infra/python/deps/compiled-requirements.txt
index 821415d..66a01e7 100644
--- a/infra/python/deps/compiled-requirements.txt
+++ b/infra/python/deps/compiled-requirements.txt
@@ -19,7 +19,7 @@
 # after the toolchain is bootstrapped. Installed after requirements.txt
 
 argparse == 1.4.0
-impyla == 0.15.0
+impyla == 0.16.2
   bitarray == 0.9.0
   sasl == 0.1.3
   six == 1.11.0
diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py
index a0bb624..4f16fd9 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -260,8 +260,11 @@
   plus Impala-specific extensions, e.g. for fetching runtime profiles.
   TODO: implement support for kerberos, SSL, etc.
   """
-  def __init__(self, host_port, use_kerberos=False, is_hive=False):
+  def __init__(self, host_port, use_kerberos=False, is_hive=False,
+               use_http_transport=False, http_path=""):
     self.__host_port = host_port
+    self.__use_http_transport = use_http_transport
+    self.__http_path = http_path
     if use_kerberos:
       raise NotImplementedError("Kerberos support not yet implemented")
     # Impyla connection and cursor is initialised in connect(). We need to reuse the same
@@ -291,7 +294,9 @@
     conn_kwargs = {}
     if self._is_hive:
       conn_kwargs['auth_mechanism'] = 'PLAIN'
-    self.__impyla_conn = impyla.connect(host=host, port=int(port), **conn_kwargs)
+    self.__impyla_conn = impyla.connect(host=host, port=int(port),
+                                        use_http_transport=self.__use_http_transport,
+                                        http_path=self.__http_path, **conn_kwargs)
     # Get the default query options for the session before any modifications are made.
     self.__cursor = self.__impyla_conn.cursor()
     self.__default_query_options = {}
@@ -307,10 +312,16 @@
     try:
       # Explicitly close the cursor so that it will close the session.
       self.__cursor.close()
-    except Exception, e:
+    except Exception as e:
       # The session may no longer be valid if the impalad was restarted during the test.
       pass
-    self.__impyla_conn.close()
+    try:
+      self.__impyla_conn.close()
+    except AttributeError as e:
+      # When the HTTP endpoint restarts, Thrift HTTP will close the endpoint and calling
+      # close() will result in an exception.
+      if not (self.__use_http_transport and 'NoneType' in str(e)):
+        raise
 
   def close_query(self, operation_handle):
     LOG.info("-- closing query for operation handle: {0}".format(operation_handle))
@@ -482,8 +493,8 @@
         is_hive=is_hive)
   else:
     assert protocol == 'hs2-http'
-    raise NotImplementedError("Impyla does not support 'hs2-http' protocol")
-
+    c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
+        is_hive=is_hive, use_http_transport=True, http_path='cliservice')
 
   # A hook in conftest sets tests.common.current_node.
   if hasattr(tests.common, "current_node"):
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index e68e02f..79c3be5 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -152,7 +152,10 @@
   @classmethod
   def setup_class(cls):
     """Setup section that runs before each test suite"""
-    cls.hive_client, cls.client, cls.hs2_client = [None, None, None]
+    cls.client = None
+    cls.hive_client = None
+    cls.hs2_client = None
+    cls.hs2_http_client = None
     # Create a Hive Metastore Client (used for executing some test SETUP steps
     metastore_host, metastore_port = pytest.config.option.metastore_server.split(':')
     trans_type = 'buffered'
@@ -167,14 +170,7 @@
     cls.hive_client = ThriftHiveMetastore.Client(protocol)
     cls.hive_transport.open()
 
-    # Create a connection to Impala, self.client is Beeswax so that existing tests that
-    # assume beeswax do not need modification (yet).
-    cls.client = cls.create_impala_client(protocol='beeswax')
-    try:
-      cls.hs2_client = cls.create_impala_client(protocol='hs2')
-    except Exception, e:
-      # HS2 connection can fail for benign reasons, e.g. running with unsupported auth.
-      LOG.info("HS2 connection setup failed, continuing...: {0}".format(e))
+    cls.create_impala_clients()
 
     # Default query options are populated on demand.
     cls.default_query_options = {}
@@ -246,6 +242,7 @@
     return client
 
   @classmethod
+
   def get_impalad_cluster_size(cls):
     return len(cls.__get_cluster_host_ports('beeswax'))
 
@@ -257,12 +254,37 @@
     return ImpalaTestSuite.create_impala_client(host_port, protocol=protocol)
 
   @classmethod
+  def create_impala_clients(cls):
+    """Creates Impala clients for all supported protocols."""
+    # The default connection (self.client) is Beeswax so that existing tests, which assume
+    # Beeswax do not need modification (yet).
+    cls.client = cls.create_impala_client(protocol='beeswax')
+    cls.hs2_client = None
+    try:
+      cls.hs2_client = cls.create_impala_client(protocol='hs2')
+    except Exception, e:
+      # HS2 connection can fail for benign reasons, e.g. running with unsupported auth.
+      LOG.info("HS2 connection setup failed, continuing...: {0}".format(e))
+    cls.hs2_http_client = None
+    try:
+      cls.hs2_http_client = cls.create_impala_client(protocol='hs2-http')
+    except Exception, e:
+      # HS2 HTTP connection can fail for benign reasons, e.g. running with unsupported
+      # auth.
+      LOG.info("HS2 HTTP connection setup failed, continuing...: {0}".format(e))
+
+  @classmethod
   def close_impala_clients(cls):
-    """Close Impala clients created by setup_class()."""
+    """Closes Impala clients created by create_impala_clients()."""
     if cls.client:
       cls.client.close()
+      cls.client = None
     if cls.hs2_client:
       cls.hs2_client.close()
+      cls.hs2_client = None
+    if cls.hs2_http_client:
+      cls.hs2_http_client.close()
+      cls.hs2_http_client = None
 
   @classmethod
   def __get_default_host_port(cls, protocol):
@@ -480,7 +502,8 @@
         if use_db:
           test_section[section_name] = test_section[section_name].replace('$DATABASE', use_db)
     result_section, type_section = 'RESULTS', 'TYPES'
-    if vector.get_value('protocol') == 'hs2':
+    if vector.get_value('protocol').startswith('hs2'):
+      # hs2 or hs2-http
       if 'DBAPI_RESULTS' in test_section:
         assert 'RESULTS' in test_section,\
             "Base RESULTS section must always be included alongside DBAPI_RESULTS"
@@ -529,6 +552,8 @@
     else:
       if protocol == 'beeswax':
         target_impalad_clients = [self.client]
+      elif protocol == 'hs2-http':
+        target_impalad_clients = [self.hs2_http_client]
       else:
         assert protocol == 'hs2'
         target_impalad_clients = [self.hs2_client]
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index b08fa83..c4e31e9 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -97,44 +97,34 @@
       return '_%s_%s' % (self.file_format, self.compression_codec)
 
 
-
 def create_uncompressed_text_dimension(workload):
   dataset = get_dataset_from_workload(workload)
   return ImpalaTestDimension('table_format',
       TableFormatInfo.create_from_string(dataset, 'text/none'))
 
+
 def create_parquet_dimension(workload):
   dataset = get_dataset_from_workload(workload)
   return ImpalaTestDimension('table_format',
       TableFormatInfo.create_from_string(dataset, 'parquet/none'))
 
+
 def create_avro_snappy_dimension(workload):
   dataset = get_dataset_from_workload(workload)
   return ImpalaTestDimension('table_format',
       TableFormatInfo.create_from_string(dataset, 'avro/snap/block'))
 
 
-def create_beeswax_hs2_dimension():
-  return ImpalaTestDimension('protocol', 'beeswax', 'hs2')
-
-
-# TODO: Get rid of this once Impyla supports http transport. Until then,
-# 'hs2-http' dimension is only covered for shell based tests, since they
-# do not rely on Impyla for connections.
-def create_beeswax_hs2_hs2http_dimension():
-  # Older python versions do not support SSLContext object that the thrift
-  # http client implementation depends on. Falls back to a dimension without
-  # http transport. More context in IMPALA-8864.
+def create_client_protocol_dimension():
+  # IMPALA-8864: Older python versions do not support SSLContext object that the thrift
+  # http client implementation depends on. Falls back to a dimension without http
+  # transport.
   import ssl
   if not hasattr(ssl, "create_default_context"):
-    return create_beeswax_hs2_dimension()
+    return ImpalaTestDimension('protocol', 'beeswax', 'hs2')
   return ImpalaTestDimension('protocol', 'beeswax', 'hs2', 'hs2-http')
 
 
-def create_beeswax_dimension():
-  return ImpalaTestDimension('protocol', 'beeswax')
-
-
 def hs2_parquet_constraint(v):
   """Constraint function, used to only run HS2 against Parquet format, because file format
   and the client protocol are orthogonal."""
diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py
index 0c9ecfa..cf2c212 100644
--- a/tests/custom_cluster/test_client_ssl.py
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -29,7 +29,7 @@
 from tests.common.environ import IS_REDHAT_DERIVATIVE
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_service import ImpaladService
-from tests.common.test_dimensions import create_beeswax_hs2_hs2http_dimension
+from tests.common.test_dimensions import create_client_protocol_dimension
 from tests.shell.util import run_impala_shell_cmd, run_impala_shell_cmd_no_expect, \
     ImpalaShell
 
@@ -128,7 +128,7 @@
 
   @classmethod
   def add_test_dimensions(cls):
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_hs2http_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args=WEBSERVER_SSL_ARGS,
diff --git a/tests/custom_cluster/test_hs2.py b/tests/custom_cluster/test_hs2.py
index 15a9153..6218511 100644
--- a/tests/custom_cluster/test_hs2.py
+++ b/tests/custom_cluster/test_hs2.py
@@ -83,8 +83,7 @@
     disconnected_session_timeout"""
     # Close the default test clients so that they don't expire while the test is running
     # and affect the metric values.
-    self.client.close()
-    self.hs2_client.close()
+    self.close_impala_clients()
     impalad = self.cluster.get_first_impalad()
 
     conn = HS2TestSuite()
diff --git a/tests/custom_cluster/test_session_expiration.py b/tests/custom_cluster/test_session_expiration.py
index 6103447..05f81a2 100644
--- a/tests/custom_cluster/test_session_expiration.py
+++ b/tests/custom_cluster/test_session_expiration.py
@@ -32,7 +32,7 @@
       "--idle_client_poll_period_s=0")
   def test_session_expiration(self, vector):
     impalad = self.cluster.get_any_impalad()
-    self.__close_default_clients()
+    self.close_impala_clients()
     num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
     num_connections = impalad.service.get_metric_value(
         "impala.thrift-server.beeswax-frontend.connections-in-use")
@@ -54,7 +54,7 @@
       "--idle_client_poll_period_s=0")
   def test_session_expiration_with_set(self, vector):
     impalad = self.cluster.get_any_impalad()
-    self.__close_default_clients()
+    self.close_impala_clients()
     num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
 
     # Test if we can set a shorter timeout than the process-wide option
@@ -77,7 +77,7 @@
        "--idle_client_poll_period_s=0")
   def test_unsetting_session_expiration(self, vector):
     impalad = self.cluster.get_any_impalad()
-    self.__close_default_clients()
+    self.close_impala_clients()
     num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
 
     # Test unsetting IDLE_SESSION_TIMEOUT
@@ -123,7 +123,7 @@
     """ IMPALA-7802: verifies that connections of idle sessions are closed
     after the sessions have expired."""
     impalad = self.cluster.get_any_impalad()
-    self.__close_default_clients()
+    self.close_impala_clients()
 
     for protocol in ['beeswax', 'hiveserver2']:
       num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
@@ -162,10 +162,3 @@
     assert num_hs2_connections + 1 == impalad.service.get_metric_value(
         "impala.thrift-server.hiveserver2-frontend.connections-in-use")
     sock.close()
-
-  def __close_default_clients(self):
-    """Close the clients that were automatically created by setup_class(). These clients
-    can expire during test, which results in metrics that tests depend on changing. Each
-    test should create its own clients as needed."""
-    self.client.close()
-    self.hs2_client.close()
diff --git a/tests/custom_cluster/test_shell_interactive.py b/tests/custom_cluster/test_shell_interactive.py
index 916a87d..eb90837 100644
--- a/tests/custom_cluster/test_shell_interactive.py
+++ b/tests/custom_cluster/test_shell_interactive.py
@@ -21,7 +21,7 @@
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.test_vector import ImpalaTestVector
-from tests.common.test_dimensions import create_beeswax_hs2_hs2http_dimension
+from tests.common.test_dimensions import create_client_protocol_dimension
 from tests.shell.util import get_shell_cmd, get_impalad_port
 
 
@@ -40,7 +40,7 @@
 
     # Iterate over test vector within test function to avoid restarting cluster.
     for vector in\
-        [ImpalaTestVector([value]) for value in create_beeswax_hs2_hs2http_dimension()]:
+        [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
       cmd = get_shell_cmd(vector)
       proc = pexpect.spawn(cmd[0], cmd[1:])
       # Check with only live_summary set to true.
diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py b/tests/custom_cluster/test_shell_interactive_reconnect.py
index 9cb98c3..73b6184 100644
--- a/tests/custom_cluster/test_shell_interactive_reconnect.py
+++ b/tests/custom_cluster/test_shell_interactive_reconnect.py
@@ -24,7 +24,7 @@
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_service import ImpaladService
 from tests.common.test_vector import ImpalaTestVector
-from tests.common.test_dimensions import create_beeswax_hs2_hs2http_dimension
+from tests.common.test_dimensions import create_client_protocol_dimension
 from tests.shell.util import ImpalaShell, get_shell_cmd, get_impalad_port
 # Follow tests/shell/test_shell_interactive.py naming.
 from shell.impala_shell import ImpalaShell as ImpalaShellClass
@@ -42,7 +42,7 @@
   def test_manual_reconnect(self):
     # Iterate over test vector within test function to avoid restarting cluster.
     for vector in\
-        [ImpalaTestVector([value]) for value in create_beeswax_hs2_hs2http_dimension()]:
+        [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
       p = ImpalaShell(vector)
       p.send_cmd("USE functional")
       # Connect without arguments works because the custom cluster will have the default
@@ -59,7 +59,7 @@
 
     # Iterate over test vector within test function to avoid restarting cluster.
     for vector in\
-        [ImpalaTestVector([value]) for value in create_beeswax_hs2_hs2http_dimension()]:
+        [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
       p = ImpalaShell(vector)
       # ImpalaShell startup may issue query to get server info - get num queries after
       # starting shell.
@@ -86,7 +86,7 @@
 
     # Iterate over test vector within test function to avoid restarting cluster.
     for vector in\
-        [ImpalaTestVector([value]) for value in create_beeswax_hs2_hs2http_dimension()]:
+        [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
       cmd = get_shell_cmd(vector)
       proc = pexpect.spawn(cmd[0], cmd[1:])
       proc.expect("{0}] default>".format(get_impalad_port(vector)))
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 1928f93..02b51a3 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -18,6 +18,7 @@
 # Client tests for Impala's HiveServer2 interface
 
 from getpass import getuser
+from contextlib import contextmanager
 import json
 import logging
 import pytest
@@ -37,33 +38,38 @@
 
 SQLSTATE_GENERAL_ERROR = "HY000"
 
+
+# Context manager that wraps a session. This is used over 'needs_session' to allow more
+# direct access to the TOpenSessionReq parameters.
+@contextmanager
+def ScopedSession(hs2_client, *args, **kwargs):
+  try:
+    open_session_req = TCLIService.TOpenSessionReq(*args, **kwargs)
+    session = hs2_client.OpenSession(open_session_req)
+    yield session
+  finally:
+    if session.status.statusCode != TCLIService.TStatusCode.SUCCESS_STATUS:
+      return
+    close_session_req = TCLIService.TCloseSessionReq()
+    close_session_req.sessionHandle = session.sessionHandle
+    HS2TestSuite.check_response(hs2_client.CloseSession(close_session_req))
+
+
 class TestHS2(HS2TestSuite):
-  def setup_method(self, method):
-    # Keep track of extra session handles opened by _open_extra_session.
-    self.__extra_sessions = []
-
-  def teardown_method(self, method):
-    for session in self.__extra_sessions:
-      try:
-        close_session_req = TCLIService.TCloseSessionReq(session)
-        self.hs2_client.CloseSession(close_session_req)
-      except Exception:
-        LOG.log_exception("Error closing session.")
-
   def test_open_session(self):
     """Check that a session can be opened"""
     open_session_req = TCLIService.TOpenSessionReq()
-    TestHS2.check_response(self.hs2_client.OpenSession(open_session_req))
+    with ScopedSession(self.hs2_client) as session:
+      TestHS2.check_response(session)
 
   def test_open_session_query_options(self):
     """Check that OpenSession sets query options"""
-    open_session_req = TCLIService.TOpenSessionReq()
-    open_session_req.configuration = {'MAX_ERRORS': '45678',
-        'NUM_NODES': '1234', 'MAX_NUM_RUNTIME_FILTERS': '333'}
-    open_session_resp = self.hs2_client.OpenSession(open_session_req)
-    TestHS2.check_response(open_session_resp)
-    for k, v in open_session_req.configuration.items():
-      assert open_session_resp.configuration[k] == v
+    configuration = {'MAX_ERRORS': '45678', 'NUM_NODES': '1234',
+                     'MAX_NUM_RUNTIME_FILTERS': '333'}
+    with ScopedSession(self.hs2_client, configuration=configuration) as session:
+      TestHS2.check_response(session)
+      for k, v in configuration.items():
+        assert session.configuration[k] == v
 
   def get_session_options(self, setCmd):
     """Returns dictionary of query options."""
@@ -139,43 +145,39 @@
   @SkipIfDockerizedCluster.internal_hostname
   def test_open_session_http_addr(self):
     """Check that OpenSession returns the coordinator's http address."""
-    open_session_req = TCLIService.TOpenSessionReq()
-    open_session_resp = self.hs2_client.OpenSession(open_session_req)
-    TestHS2.check_response(open_session_resp)
-    http_addr = open_session_resp.configuration['http_addr']
-    resp = urlopen("http://%s/queries?json" % http_addr)
-    assert resp.msg == 'OK'
-    queries_json = json.loads(resp.read())
-    assert 'completed_queries' in queries_json
-    assert 'in_flight_queries' in queries_json
+    with ScopedSession(self.hs2_client) as session:
+      TestHS2.check_response(session)
+      http_addr = session.configuration['http_addr']
+      resp = urlopen("http://%s/queries?json" % http_addr)
+      assert resp.msg == 'OK'
+      queries_json = json.loads(resp.read())
+      assert 'completed_queries' in queries_json
+      assert 'in_flight_queries' in queries_json
 
   def test_open_session_unsupported_protocol(self):
     """Test that we get the right protocol version back if we ask for one larger than the
     server supports. This test will fail as we support newer version of HS2, and should be
     updated."""
-    open_session_req = TCLIService.TOpenSessionReq()
-    open_session_req.protocol_version = \
-        TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7
-    open_session_resp = self.hs2_client.OpenSession(open_session_req)
-    TestHS2.check_response(open_session_resp)
-    assert open_session_resp.serverProtocolVersion == \
-        TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6
+    client_protocol = TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7
+    with ScopedSession(self.hs2_client, client_protocol=client_protocol) as session:
+      TestHS2.check_response(session)
+      assert session.serverProtocolVersion == \
+          TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6
 
   def test_open_session_empty_user(self):
     """Test that we get the expected errors back if either impala.doas.user is set but
     username is empty, or username is set but impala.doas.user is empty."""
-    open_session_req = TCLIService.TOpenSessionReq()
-    open_session_req.username = ""
-    open_session_req.configuration = {"impala.doas.user": "do_as_user"}
-    open_session_resp = self.hs2_client.OpenSession(open_session_req)
-    TestHS2.check_response(open_session_resp, TCLIService.TStatusCode.ERROR_STATUS, \
-        "Unable to delegate using empty proxy username.")
+    configuration = {"impala.doas.user": "do_as_user"}
+    with ScopedSession(self.hs2_client, configuration=configuration) as session:
+      TestHS2.check_response(session, TCLIService.TStatusCode.ERROR_STATUS,
+          "Unable to delegate using empty proxy username.")
 
-    open_session_req.username = "user"
-    open_session_req.configuration = {"impala.doas.user": ""}
-    open_session_resp = self.hs2_client.OpenSession(open_session_req)
-    TestHS2.check_response(open_session_resp, TCLIService.TStatusCode.ERROR_STATUS, \
-        "Unable to delegate using empty doAs username.")
+    username = "user"
+    configuration = {"impala.doas.user": ""}
+    with ScopedSession(
+        self.hs2_client, username=username, configuration=configuration) as session:
+      TestHS2.check_response(session, TCLIService.TStatusCode.ERROR_STATUS,
+          "Unable to delegate using empty doAs username.")
 
   def test_close_session(self):
     """Test that an open session can be closed"""
@@ -707,21 +709,22 @@
 
     # Attempt to access query with different user should fail.
     evil_user = getuser() + "_evil_twin"
-    session_handle2 = self._open_extra_session(evil_user)
-    TestHS2.check_profile_access_denied(self.hs2_client.GetExecSummary(
-      ImpalaHiveServer2Service.TGetExecSummaryReq(execute_statement_resp.operationHandle,
-        session_handle2)), user=evil_user)
+    with ScopedSession(self.hs2_client, username=evil_user) as session:
+      session_handle2 = session.sessionHandle
+      TestHS2.check_profile_access_denied(self.hs2_client.GetExecSummary(
+        ImpalaHiveServer2Service.TGetExecSummaryReq(
+          execute_statement_resp.operationHandle, session_handle2)), user=evil_user)
 
-    # Now close the query and verify the exec summary is available.
-    close_operation_req = TCLIService.TCloseOperationReq()
-    close_operation_req.operationHandle = execute_statement_resp.operationHandle
-    TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req))
+      # Now close the query and verify the exec summary is available.
+      close_operation_req = TCLIService.TCloseOperationReq()
+      close_operation_req.operationHandle = execute_statement_resp.operationHandle
+      TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req))
 
-    # Attempt to access query with different user from log should fail.
-    TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
-      ImpalaHiveServer2Service.TGetRuntimeProfileReq(
-        execute_statement_resp.operationHandle, session_handle2)),
-      user=evil_user)
+      # Attempt to access query with different user from log should fail.
+      TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
+        ImpalaHiveServer2Service.TGetRuntimeProfileReq(
+          execute_statement_resp.operationHandle, session_handle2)),
+        user=evil_user)
 
     exec_summary_resp = self.hs2_client.GetExecSummary(exec_summary_req)
     TestHS2.check_response(exec_summary_resp)
@@ -765,21 +768,22 @@
 
     # Attempt to access query with different user should fail.
     evil_user = getuser() + "_evil_twin"
-    session_handle2 = self._open_extra_session(evil_user)
-    TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
-      ImpalaHiveServer2Service.TGetRuntimeProfileReq(
-        execute_statement_resp.operationHandle, session_handle2)),
-        user=evil_user)
+    with ScopedSession(self.hs2_client, username=evil_user) as session:
+      session_handle2 = session.sessionHandle
+      TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
+        ImpalaHiveServer2Service.TGetRuntimeProfileReq(
+          execute_statement_resp.operationHandle, session_handle2)),
+          user=evil_user)
 
-    close_operation_req = TCLIService.TCloseOperationReq()
-    close_operation_req.operationHandle = execute_statement_resp.operationHandle
-    TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req))
+      close_operation_req = TCLIService.TCloseOperationReq()
+      close_operation_req.operationHandle = execute_statement_resp.operationHandle
+      TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req))
 
-    # Attempt to access query with different user from log should fail.
-    TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
-      ImpalaHiveServer2Service.TGetRuntimeProfileReq(
-        execute_statement_resp.operationHandle, session_handle2)),
-        user=evil_user)
+      # Attempt to access query with different user from log should fail.
+      TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
+        ImpalaHiveServer2Service.TGetRuntimeProfileReq(
+          execute_statement_resp.operationHandle, session_handle2)),
+          user=evil_user)
 
     get_profile_resp = self.hs2_client.GetRuntimeProfile(get_profile_req)
     TestHS2.check_response(get_profile_resp)
@@ -867,20 +871,13 @@
     TestHS2.check_response(fetch_results_resp)
     return fetch_results_resp
 
-  def _open_extra_session(self, user_name):
-    """Open an extra session with the provided username that will be automatically
-    closed at the end of the test. Returns the session handle."""
-    resp = self.hs2_client.OpenSession(TCLIService.TOpenSessionReq(username=user_name))
-    TestHS2.check_response(resp)
-    return resp.sessionHandle
-
   def test_close_connection(self):
     """Tests that an hs2 session remains valid even after the connection is dropped."""
     open_session_req = TCLIService.TOpenSessionReq()
     open_session_resp = self.hs2_client.OpenSession(open_session_req)
     TestHS2.check_response(open_session_resp)
     self.session_handle = open_session_resp.sessionHandle
-    # Ren a query, which should succeed.
+    # Run a query, which should succeed.
     self.execute_statement("select 1")
 
     # Reset the connection.
diff --git a/tests/hs2/test_json_endpoints.py b/tests/hs2/test_json_endpoints.py
index 67342dc..102ccca 100644
--- a/tests/hs2/test_json_endpoints.py
+++ b/tests/hs2/test_json_endpoints.py
@@ -98,3 +98,10 @@
     assert queries_json["num_in_flight_queries"] == 0
     assert queries_json["num_executing_queries"] == 0
     assert queries_json["num_waiting_queries"] == 0
+
+    # Close the session so that subsequent tests that rely on an empty session list don't
+    # fail.
+    close_session_req = TCLIService.TCloseSessionReq()
+    close_session_req.sessionHandle = open_session_resp.sessionHandle
+    close_session_resp = self.hs2_client.CloseSession(close_session_req)
+    TestJsonEndpoints.check_response(close_session_resp)
diff --git a/tests/query_test/test_cast_with_format.py b/tests/query_test/test_cast_with_format.py
index 3dee880..5770e0f 100644
--- a/tests/query_test/test_cast_with_format.py
+++ b/tests/query_test/test_cast_with_format.py
@@ -16,7 +16,7 @@
 # under the License.
 
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.test_dimensions import create_beeswax_hs2_dimension
+from tests.common.test_dimensions import create_client_protocol_dimension
 
 
 class TestCastWithFormat(ImpalaTestSuite):
@@ -32,7 +32,7 @@
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format == 'parquet')
 
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
 
   def test_basic_inputs_from_table(self, vector):
     self.run_test_case('QueryTest/cast_format_from_table', vector)
diff --git a/tests/query_test/test_chars.py b/tests/query_test/test_chars.py
index 6473065..de9ab2c 100644
--- a/tests/query_test/test_chars.py
+++ b/tests/query_test/test_chars.py
@@ -19,7 +19,7 @@
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (create_exec_option_dimension,
-    create_beeswax_hs2_dimension, hs2_parquet_constraint, hs2_text_constraint)
+    create_client_protocol_dimension, hs2_parquet_constraint, hs2_text_constraint)
 from tests.util.filesystem_utils import get_fs_path
 
 class TestStringQueries(ImpalaTestSuite):
@@ -37,14 +37,14 @@
         v.get_value('table_format').compression_codec in ['none'])
     # Run these queries through both beeswax and HS2 to get coverage of CHAR/VARCHAR
     # returned via both protocols.
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_constraint(hs2_text_constraint)
 
   def test_chars(self, vector):
     self.run_test_case('QueryTest/chars', vector)
 
   def test_chars_tmp_tables(self, vector, unique_database):
-    if vector.get_value('protocol') == 'hs2':
+    if vector.get_value('protocol') in ['hs2', 'hs2-http']:
       pytest.skip("HS2 does not return row counts for inserts")
     # Tests that create temporary tables and require a unique database.
     self.run_test_case('QueryTest/chars-tmp-tables', vector, unique_database)
@@ -68,7 +68,7 @@
         v.get_value('table_format').compression_codec in ['none']))
     # Run these queries through both beeswax and HS2 to get coverage of CHAR/VARCHAR
     # returned via both protocols.
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_constraint(hs2_parquet_constraint)
 
   def test_char_format(self, vector):
diff --git a/tests/query_test/test_date_queries.py b/tests/query_test/test_date_queries.py
index 99788b3..ae4c169 100644
--- a/tests/query_test/test_date_queries.py
+++ b/tests/query_test/test_date_queries.py
@@ -22,7 +22,7 @@
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfLocal
 from tests.common.test_dimensions import (create_exec_option_dimension_from_dict,
-    create_beeswax_hs2_dimension, hs2_parquet_constraint)
+    create_client_protocol_dimension, hs2_parquet_constraint)
 from tests.shell.util import ImpalaShell
 
 
@@ -47,7 +47,7 @@
 
     # Run these queries through both beeswax and HS2 to get coverage of date returned
     # via both protocols.
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_constraint(hs2_parquet_constraint)
 
   def test_queries(self, vector):
diff --git a/tests/query_test/test_decimal_queries.py b/tests/query_test/test_decimal_queries.py
index a156321..9659062 100644
--- a/tests/query_test/test_decimal_queries.py
+++ b/tests/query_test/test_decimal_queries.py
@@ -21,7 +21,7 @@
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (create_exec_option_dimension_from_dict,
-    create_beeswax_hs2_dimension, hs2_parquet_constraint)
+    create_client_protocol_dimension, hs2_parquet_constraint)
 from tests.common.test_vector import ImpalaTestDimension
 
 class TestDecimalQueries(ImpalaTestSuite):
@@ -48,7 +48,7 @@
 
     # Run these queries through both beeswax and HS2 to get coverage of decimals returned
     # via both protocols.
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_constraint(hs2_parquet_constraint)
 
   def test_queries(self, vector):
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index 8e2ad17..8c5edda 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -25,7 +25,7 @@
 from tests.common.skip import SkipIfEC, SkipIfCatalogV2
 from tests.common.test_dimensions import (
     create_uncompressed_text_dimension, extend_exec_option_dimension,
-    create_beeswax_hs2_dimension, hs2_parquet_constraint)
+    create_client_protocol_dimension, hs2_parquet_constraint)
 from tests.common.test_vector import ImpalaTestVector
 
 class TestQueries(ImpalaTestSuite):
@@ -38,7 +38,7 @@
     # Run these queries through both beeswax and HS2 to get coverage of both protocols.
     # Don't run all combinations of table format and protocol - the dimensions should
     # be orthogonal.
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_constraint(hs2_parquet_constraint)
 
     # Adding a test dimension here to test the small query opt in exhaustive.
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index 3090cf7..9505bb4 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -31,7 +31,7 @@
 from tests.common.impala_service import ImpaladService
 from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
 from tests.common.skip import SkipIf
-from tests.common.test_dimensions import create_beeswax_hs2_hs2http_dimension
+from tests.common.test_dimensions import create_client_protocol_dimension
 from time import sleep, time
 from util import (get_impalad_host_port, assert_var_substitution, run_impala_shell_cmd,
                   ImpalaShell, IMPALA_SHELL_EXECUTABLE)
@@ -120,7 +120,7 @@
   @classmethod
   def add_test_dimensions(cls):
     # Run with both beeswax and HS2 to ensure that behaviour is the same.
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_hs2http_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
 
   def test_no_args(self, vector):
     args = ['-q', DEFAULT_QUERY]
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 65b85a7..d95180e 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -18,6 +18,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import logging
 import os
 import pexpect
 import pytest
@@ -37,7 +38,7 @@
 from tests.common.impala_service import ImpaladService
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfLocal
-from tests.common.test_dimensions import create_beeswax_hs2_hs2http_dimension
+from tests.common.test_dimensions import create_client_protocol_dimension
 from util import (assert_var_substitution, ImpalaShell, get_impalad_port, get_shell_cmd,
                   get_open_sessions_metric)
 
@@ -47,6 +48,7 @@
 # Examples: hostname:21000, hostname:21050, hostname:28000
 PROMPT_REGEX = r'\[[^:]+:2(1|8)0[0-9][0-9]\]'
 
+LOG = logging.getLogger('test_shell_interactive')
 
 @pytest.fixture
 def tmp_history_file(request):
@@ -78,7 +80,7 @@
   @classmethod
   def add_test_dimensions(cls):
     # Run with both beeswax and HS2 to ensure that behaviour is the same.
-    cls.ImpalaTestMatrix.add_dimension(create_beeswax_hs2_hs2http_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
 
   def _expect_with_cmd(self, proc, cmd, vector, expectations=(), db="default"):
     """Executes a command on the expect process instance and verifies a set of
@@ -89,10 +91,15 @@
     for e in expectations:
       assert e in proc.before
 
-  def _wait_for_num_open_sessions(self, vector, impala_service, num, err):
-    """Helper method to wait for the number of open sessions to reach 'num'."""
+  def _wait_for_num_open_sessions(self, vector, impala_service, expected, err):
+    """Helper method to wait for the number of open sessions to reach 'expected'."""
     metric_name = get_open_sessions_metric(vector)
-    assert impala_service.wait_for_metric_value(metric_name, num) == num, err
+    try:
+      actual = impala_service.wait_for_metric_value(metric_name, expected)
+    except AssertionError:
+      LOG.exception("Error: " % err)
+      raise
+    assert actual == expected, err
 
   def test_local_shell_options(self, vector):
     """Test that setting the local shell options works"""
@@ -266,45 +273,48 @@
 
     Verifies that a connect command by the user is honoured.
     """
-    # Disconnect existing clients so there are no open sessions.
-    self.client.close()
-    self.hs2_client.close()
+    try:
+      # Disconnect existing clients so there are no open sessions.
+      self.close_impala_clients()
 
-    hostname = socket.getfqdn()
-    initial_impala_service = ImpaladService(hostname)
-    target_impala_service = ImpaladService(hostname, webserver_port=25001,
-        beeswax_port=21001, be_port=22001, hs2_port=21051, hs2_http_port=28001)
-    protocol = vector.get_value("protocol").lower()
-    if protocol == "hs2":
-      target_port = 21051
-    elif protocol == "hs2-http":
-      target_port = 28001
-    else:
-      assert protocol == "beeswax"
-      target_port = 21001
-    # This test is running serially, so there shouldn't be any open sessions, but wait
-    # here in case a session from a previous test hasn't been fully closed yet.
-    self._wait_for_num_open_sessions(vector, initial_impala_service, 0,
-        "first impalad should not have any remaining open sessions.")
-    self._wait_for_num_open_sessions(vector, target_impala_service, 0,
-        "second impalad should not have any remaining open sessions.")
-    # Connect to the first impalad
-    p = ImpalaShell(vector)
+      hostname = socket.getfqdn()
+      initial_impala_service = ImpaladService(hostname)
+      target_impala_service = ImpaladService(hostname, webserver_port=25001,
+          beeswax_port=21001, be_port=22001, hs2_port=21051, hs2_http_port=28001)
+      protocol = vector.get_value("protocol").lower()
+      if protocol == "hs2":
+        target_port = 21051
+      elif protocol == "hs2-http":
+        target_port = 28001
+      else:
+        assert protocol == "beeswax"
+        target_port = 21001
+      # This test is running serially, so there shouldn't be any open sessions, but wait
+      # here in case a session from a previous test hasn't been fully closed yet.
+      self._wait_for_num_open_sessions(vector, initial_impala_service, 0,
+          "first impalad should not have any remaining open sessions.")
+      self._wait_for_num_open_sessions(vector, target_impala_service, 0,
+          "second impalad should not have any remaining open sessions.")
+      # Connect to the first impalad
+      p = ImpalaShell(vector)
 
-    # Make sure we're connected <hostname>:<port>
-    self._wait_for_num_open_sessions(vector, initial_impala_service, 1,
-        "Not connected to %s:%d" % (hostname, get_impalad_port(vector)))
-    p.send_cmd("connect %s:%d" % (hostname, target_port))
+      # Make sure we're connected <hostname>:<port>
+      self._wait_for_num_open_sessions(vector, initial_impala_service, 1,
+          "Not connected to %s:%d" % (hostname, get_impalad_port(vector)))
+      p.send_cmd("connect %s:%d" % (hostname, target_port))
 
-    # The number of sessions on the target impalad should have been incremented.
-    self._wait_for_num_open_sessions(vector,
-        target_impala_service, 1, "Not connected to %s:%d" % (hostname, target_port))
-    assert "[%s:%d] default>" % (hostname, target_port) in p.get_result().stdout
+      # The number of sessions on the target impalad should have been incremented.
+      self._wait_for_num_open_sessions(vector,
+          target_impala_service, 1, "Not connected to %s:%d" % (hostname, target_port))
+      assert "[%s:%d] default>" % (hostname, target_port) in p.get_result().stdout
 
-    # The number of sessions on the initial impalad should have been decremented.
-    self._wait_for_num_open_sessions(vector, initial_impala_service, 0,
-        "Connection to %s:%d should have been closed" % (
-          hostname, get_impalad_port(vector)))
+      # The number of sessions on the initial impalad should have been decremented.
+      self._wait_for_num_open_sessions(vector, initial_impala_service, 0,
+          "Connection to %s:%d should have been closed" % (
+            hostname, get_impalad_port(vector)))
+
+    finally:
+      self.create_impala_clients()
 
   @pytest.mark.execute_serially
   def test_ddl_queries_are_closed(self, vector):
@@ -315,6 +325,8 @@
     webpage to confirm that they've been closed.
     TODO: Add every statement type.
     """
+    # Disconnect existing clients so there are no open sessions.
+    self.close_impala_clients()
 
     TMP_DB = 'inflight_test_db'
     TMP_TBL = 'tmp_tbl'
@@ -322,6 +334,8 @@
     NUM_QUERIES = 'impala-server.num-queries'
 
     impalad = ImpaladService(socket.getfqdn())
+    self._wait_for_num_open_sessions(vector, impalad, 0,
+        "Open sessions found after closing all clients.")
     p = ImpalaShell(vector)
     try:
       start_num_queries = impalad.get_metric_value(NUM_QUERIES)
@@ -344,6 +358,7 @@
       run_impala_shell_interactive(vector, "drop table if exists %s.%s;" % (
           TMP_DB, TMP_TBL))
       run_impala_shell_interactive(vector, "drop database if exists foo;")
+      self.create_impala_clients()
 
   def test_multiline_queries_in_history(self, vector, tmp_history_file):
     """Test to ensure that multiline queries with comments are preserved in history
diff --git a/tests/util/run_impyla_http_query.py b/tests/util/run_impyla_http_query.py
new file mode 100755
index 0000000..c3edcca
--- /dev/null
+++ b/tests/util/run_impyla_http_query.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env impala-python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script is a thin wrapper to execute a query using Impyla against an HTTP endpoint.
+# It can be used by other tests (e.g. LdapImpylaHttpTest.java) that start a cluster with
+# an LDAP server to validate Impyla's functionality.
+
+import argparse
+import logging
+
+from impala import dbapi as impyla
+
+logging.basicConfig(level=logging.INFO)
+
+
+def run_query(query, args):
+  """Runs a query using Impyla. Args must contain various options that are forwarded to
+  Impyla's connect() method."""
+  auth_mechanism = 'NOSASL'
+  if args.user:
+    auth_mechanism = 'LDAP'
+  conn = impyla.connect(host=args.host, port=args.port, user=args.user,
+                        password=args.password, auth_mechanism=auth_mechanism,
+                        use_http_transport=True, http_path=args.http_path)
+  cursor = conn.cursor()
+  cursor.execute(query)
+  result = cursor.fetchall()
+  # Print the result so that the caller can validate it.
+  print(str(result))
+
+
+def main():
+  parser = argparse.ArgumentParser()
+  parser.add_argument("--host", default="localhost")
+  parser.add_argument("--port", type=int, default=28000)
+  parser.add_argument("--http_path", default="")
+  parser.add_argument("--user")
+  parser.add_argument("--password")
+  parser.add_argument("--query", default="select 42")
+  args = parser.parse_args()
+  run_query(args.query, args)
+
+
+if __name__ == "__main__":
+  main()