IMPALA-6894: Use an internal representation of query states in ClientRequestState

Re-factors the state machine of ClientRequestState so that it uses an
internal state represetation rather than using the one defined by
TOperationState. The possible states are defined in
ClientRequestState::ExecState and the possible state transitions are
outlined in client-request-state.h and enforced in
ClientRequestState::UpdateNonErrorExecState. The states defined in
ClientRequestState::ExecState are the same states currently used in
TOperationState. This patch simply makes it easy to define new states
in the future.

The value of ClientRequestState::ExecState is exposed to clients via the
entry "Impala Query State" in the runtime profile. It is meant to be the
Impala specific version of "Query State" (which is the Beeswax state).
This allows Impala to expose its internal state without breaking existing
clients that might rely on the value of "Query State".

Additional Bug Fixes:
* Previously, UpdateNonErrorOperationState would ignore attempts to make
illegal state transitions, now the method uses DCHECKs to ensure only
valid state transitions are attempted; this required fixing a possible race
condition where a query could transition from RUNNING to PENDING
* The ClientRequestState state is now tracked using an AtomicEnum, which
fixes a few possible race conditions where the state was being read
without holding the ClientRequestState::lock_

Testing:
* Ran core tests
* Added test to make sure "Impala Query State" is populated

Change-Id: I1ce70bd2e964b309ebfc9d6ff6d900485db4d630
Reviewed-on: http://gerrit.cloudera.org:8080/14744
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 8619060..023bddc 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -136,6 +136,8 @@
   summary_profile_->AddInfoString("End Time", "");
   summary_profile_->AddInfoString("Query Type", "N/A");
   summary_profile_->AddInfoString("Query State", PrintThriftEnum(BeeswaxQueryState()));
+  summary_profile_->AddInfoString(
+      "Impala Query State", ExecStateToString(exec_state_.Load()));
   summary_profile_->AddInfoString("Query Status", "OK");
   summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true));
   summary_profile_->AddInfoString("User", effective_user());
@@ -267,17 +269,16 @@
     }
     case TStmtType::ADMIN_FN:
       DCHECK(exec_request_.admin_request.type == TAdminRequestType::SHUTDOWN);
-      return ExecShutdownRequest();
+      RETURN_IF_ERROR(ExecShutdownRequest());
+      break;
     default:
       stringstream errmsg;
       errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type;
       return Status(errmsg.str());
   }
 
-  if (async_exec_thread_.get() != nullptr) {
-    UpdateNonErrorOperationState(TOperationState::PENDING_STATE);
-  } else {
-    UpdateNonErrorOperationState(TOperationState::RUNNING_STATE);
+  if (async_exec_thread_.get() == nullptr) {
+    UpdateNonErrorExecState(ExecState::RUNNING);
   }
   return Status::OK();
 }
@@ -499,6 +500,9 @@
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
   }
+  // Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because
+  // the query should be in the PENDING state before the Exec RPC returns.
+  UpdateNonErrorExecState(ExecState::PENDING);
   RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
       &ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_, true));
   return Status::OK();
@@ -559,7 +563,7 @@
   }
 
   profile_->AddChild(coord_->query_profile());
-  UpdateNonErrorOperationState(TOperationState::RUNNING_STATE);
+  UpdateNonErrorExecState(ExecState::RUNNING);
 }
 
 Status ClientRequestState::ExecDdlRequest() {
@@ -774,6 +778,7 @@
       &metadata_op_result));
   result_metadata_ = metadata_op_result.schema;
   request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows));
+  UpdateNonErrorExecState(ExecState::RUNNING);
   return Status::OK();
 }
 
@@ -825,11 +830,11 @@
     if (stmt_type() == TStmtType::DDL) {
       DCHECK(catalog_op_type() != TCatalogOpType::DDL || request_result_set_ != nullptr);
     }
-    UpdateNonErrorOperationState(TOperationState::FINISHED_STATE);
+    UpdateNonErrorExecState(ExecState::FINISHED);
   }
-  // UpdateQueryStatus() or UpdateNonErrorOperationState() have updated operation_state_.
-  DCHECK(operation_state_ == TOperationState::FINISHED_STATE ||
-      operation_state_ == TOperationState::ERROR_STATE);
+  // UpdateQueryStatus() or UpdateNonErrorExecState() have updated exec_state_.
+  ExecState exec_state = exec_state_.Load();
+  DCHECK(exec_state == ExecState::FINISHED || exec_state == ExecState::ERROR);
   // Notify all the threads blocked on Wait() to finish and then log the query events,
   // if any.
   {
@@ -917,31 +922,47 @@
   return Status::OK();
 }
 
-void ClientRequestState::UpdateNonErrorOperationState(TOperationState::type new_state) {
+void ClientRequestState::UpdateNonErrorExecState(ExecState new_state) {
   lock_guard<mutex> l(lock_);
+  ExecState old_state = exec_state_.Load();
+  static string error_msg = "Illegal state transition: $0 -> $1";
   switch (new_state) {
-    case TOperationState::PENDING_STATE:
-      if (operation_state_ == TOperationState::INITIALIZED_STATE) {
-        UpdateOperationState(new_state);
+    case ExecState::PENDING:
+      DCHECK(old_state == ExecState::INITIALIZED) << Substitute(
+          error_msg, ExecStateToString(old_state), ExecStateToString(new_state));
+      UpdateExecState(new_state);
+      break;
+    case ExecState::RUNNING:
+      // It is possible for FinishExecQueryOrDmlRequest() to attempt a transition to
+      // running, even after the query has been cancelled with an error status (and is
+      // thus in the ERROR ExecState). In this case, just ignore the transition attempt.
+      if (old_state != ExecState::ERROR) {
+        // DDL statements and metadata ops don't use the PENDING state, so a query can
+        // transition directly from the INITIALIZED to RUNNING state.
+        DCHECK(old_state == ExecState::INITIALIZED || old_state == ExecState::PENDING)
+            << Substitute(
+                error_msg, ExecStateToString(old_state), ExecStateToString(new_state));
+        UpdateExecState(new_state);
       }
       break;
-    case TOperationState::RUNNING_STATE:
-    case TOperationState::FINISHED_STATE:
-      if (operation_state_ == TOperationState::INITIALIZED_STATE
-          || operation_state_ == TOperationState::PENDING_STATE
-          || operation_state_ == TOperationState::RUNNING_STATE) {
-        UpdateOperationState(new_state);
-      }
+    case ExecState::FINISHED:
+      // A query can transition from PENDING to FINISHED if it is cancelled by the
+      // client.
+      DCHECK(old_state == ExecState::PENDING || old_state == ExecState::RUNNING)
+          << Substitute(
+              error_msg, ExecStateToString(old_state), ExecStateToString(new_state));
+      UpdateExecState(new_state);
       break;
     default:
-      DCHECK(false) << "A non-error state expected but got: " << new_state;
+      DCHECK(false) << "A non-error state expected but got: "
+                    << ExecStateToString(new_state);
   }
 }
 
 Status ClientRequestState::UpdateQueryStatus(const Status& status) {
   // Preserve the first non-ok status
   if (!status.ok() && query_status_.ok()) {
-    UpdateOperationState(TOperationState::ERROR_STATE);
+    UpdateExecState(ExecState::ERROR);
     query_status_ = status;
     summary_profile_->AddInfoStringRedacted("Query Status", query_status_.GetDetail());
   }
@@ -951,9 +972,9 @@
 
 Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
     QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) {
-  // Wait() guarantees that we've transitioned at least to FINISHED_STATE (and any
+  // Wait() guarantees that we've transitioned at least to FINISHED state (and any
   // state beyond that should have a non-OK query_status_ set).
-  DCHECK(operation_state_ == TOperationState::FINISHED_STATE);
+  DCHECK(exec_state_.Load() == ExecState::FINISHED);
 
   if (eos_) return Status::OK();
 
@@ -1089,12 +1110,12 @@
   {
     lock_guard<mutex> lock(lock_);
     // If the query has reached a terminal state, no need to update the state.
-    bool already_done = eos_ || operation_state_ == TOperationState::ERROR_STATE;
+    bool already_done = eos_ || exec_state_.Load() == ExecState::ERROR;
     if (!already_done && cause != NULL) {
       DCHECK(!cause->ok());
       discard_result(UpdateQueryStatus(*cause));
       query_events_->MarkEvent("Cancelled");
-      DCHECK_EQ(operation_state_, TOperationState::ERROR_STATE);
+      DCHECK(exec_state_.Load() == ExecState::ERROR);
     }
 
     admit_outcome_.Set(AdmissionOutcome::CANCELLED);
@@ -1335,21 +1356,36 @@
   result_cache_.reset();
 }
 
-void ClientRequestState::UpdateOperationState(
-    TOperationState::type operation_state) {
-  operation_state_ = operation_state;
+void ClientRequestState::UpdateExecState(ExecState exec_state) {
+  exec_state_.Store(exec_state);
   summary_profile_->AddInfoString("Query State", PrintThriftEnum(BeeswaxQueryState()));
+  summary_profile_->AddInfoString("Impala Query State", ExecStateToString(exec_state));
+}
+
+apache::hive::service::cli::thrift::TOperationState::type
+ClientRequestState::TOperationState() const {
+  switch (exec_state_.Load()) {
+    case ExecState::INITIALIZED: return TOperationState::INITIALIZED_STATE;
+    case ExecState::PENDING: return TOperationState::PENDING_STATE;
+    case ExecState::RUNNING: return TOperationState::RUNNING_STATE;
+    case ExecState::FINISHED: return TOperationState::FINISHED_STATE;
+    case ExecState::ERROR: return TOperationState::ERROR_STATE;
+    default: {
+      DCHECK(false) << "Add explicit translation for all used ExecState values";
+      return TOperationState::ERROR_STATE;
+    }
+  }
 }
 
 beeswax::QueryState::type ClientRequestState::BeeswaxQueryState() const {
-  switch (operation_state_) {
-    case TOperationState::INITIALIZED_STATE: return beeswax::QueryState::CREATED;
-    case TOperationState::PENDING_STATE: return beeswax::QueryState::COMPILED;
-    case TOperationState::RUNNING_STATE: return beeswax::QueryState::RUNNING;
-    case TOperationState::FINISHED_STATE: return beeswax::QueryState::FINISHED;
-    case TOperationState::ERROR_STATE: return beeswax::QueryState::EXCEPTION;
+  switch (exec_state_.Load()) {
+    case ExecState::INITIALIZED: return beeswax::QueryState::CREATED;
+    case ExecState::PENDING: return beeswax::QueryState::COMPILED;
+    case ExecState::RUNNING: return beeswax::QueryState::RUNNING;
+    case ExecState::FINISHED: return beeswax::QueryState::FINISHED;
+    case ExecState::ERROR: return beeswax::QueryState::EXCEPTION;
     default: {
-      DCHECK(false) << "Add explicit translation for all used TOperationState values";
+      DCHECK(false) << "Add explicit translation for all used ExecState values";
       return beeswax::QueryState::EXCEPTION;
     }
   }
@@ -1616,4 +1652,13 @@
   return Status::OK();
 }
 
+string ClientRequestState::ExecStateToString(ExecState state) const {
+  static const unordered_map<ClientRequestState::ExecState, const char*>
+      exec_state_strings{{ClientRequestState::ExecState::INITIALIZED, "INITIALIZED"},
+          {ClientRequestState::ExecState::PENDING, "PENDING"},
+          {ClientRequestState::ExecState::RUNNING, "RUNNING"},
+          {ClientRequestState::ExecState::FINISHED, "FINISHED"},
+          {ClientRequestState::ExecState::ERROR, "ERROR"}};
+  return exec_state_strings.at(state);
+}
 }
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 57f7a03..3584e67 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -58,6 +58,15 @@
 /// synchronize access explicitly via lock(). See the ImpalaServer class comment for
 /// the required lock acquisition order.
 ///
+/// State Machine:
+/// ExecState represents all possible states and UpdateNonErrorExecState /
+/// UpdateQueryStatus defines all possible state transitions. ExecState starts out in the
+/// INITIALIZED state and eventually transition to either the FINISHED or ERROR state. Any
+/// state can transition to the ERROR state (if an error is hit or a timeout occurs). The
+/// possible state transitions differ for a query depending on its type (e.g. depending on
+/// the TStmtType). Successful QUERY / DML queries transition from INITIALIZED to PENDING
+/// to RUNNING to FINISHED whereas DDL queries skip the PENDING phase.
+///
 /// TODO: Compute stats is the only stmt that requires child queries. Once the
 /// CatalogService performs background stats gathering the concept of child queries
 /// will likely become obsolete. Remove all child-query related code from this class.
@@ -68,6 +77,10 @@
 
   ~ClientRequestState();
 
+  enum class ExecState {
+    INITIALIZED, PENDING, RUNNING, FINISHED, ERROR
+  };
+
   /// Sets the profile that is produced by the frontend. The frontend creates the
   /// profile during planning and returns it to the backend via TExecRequest,
   /// which then sets the frontend profile.
@@ -134,11 +147,10 @@
   Status RestartFetch() WARN_UNUSED_RESULT;
 
   /// Update operation state if the requested state isn't already obsolete. This is
-  /// only for non-error states (PENDING_STATE, RUNNING_STATE and FINISHED_STATE) - if the
-  /// query encounters an error the query status needs to be set with information about
-  /// the error so UpdateQueryStatus() must be used instead. Takes lock_.
-  void UpdateNonErrorOperationState(
-      apache::hive::service::cli::thrift::TOperationState::type operation_state);
+  /// only for non-error states (PENDING, RUNNING and FINISHED) - if the query encounters
+  /// an error the query status needs to be set with information about the error so
+  /// UpdateQueryStatus() must be used instead. Takes lock_.
+  void UpdateNonErrorExecState(ExecState exec_state);
 
   /// Update the query status and the "Query Status" summary profile string.
   /// If current status is already != ok, no update is made (we preserve the first error)
@@ -152,8 +164,8 @@
 
   /// Cancels the child queries and the coordinator with the given cause.
   /// If cause is NULL, it assume this was deliberately cancelled by the user while in
-  /// FINISHED operation state. Otherwise, sets state to ERROR_STATE (TODO: IMPALA-1262:
-  /// use CANCELED_STATE). Does nothing if the query has reached EOS or already cancelled.
+  /// FINISHED state. Otherwise, sets state to ERROR (TODO: IMPALA-1262: use CANCELLED).
+  /// Does nothing if the query has reached EOS or already cancelled.
   ///
   /// Only returns an error if 'check_inflight' is true and the query is not yet
   /// in-flight. Otherwise, proceed and return Status::OK() even if the query isn't
@@ -242,11 +254,11 @@
   }
   boost::mutex* lock() { return &lock_; }
   boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
-  apache::hive::service::cli::thrift::TOperationState::type operation_state() const {
-    return operation_state_;
-  }
-  // Translate operation_state() to a beeswax::QueryState. TODO: remove calls to this
-  // and replace with uses of operation_state() directly.
+  /// ExecState is stored using an AtomicEnum, so reads do not require holding lock_.
+  ExecState exec_state() const { return exec_state_.Load(); }
+  /// Translate exec_state_ to a TOperationState.
+  apache::hive::service::cli::thrift::TOperationState::type TOperationState() const;
+  /// Translate exec_state_ to a beeswax::QueryState.
   beeswax::QueryState::type BeeswaxQueryState() const;
   const Status& query_status() const { return query_status_; }
   void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
@@ -459,11 +471,10 @@
   bool is_cancelled_ = false; // if true, Cancel() was called.
   bool eos_ = false;  // if true, there are no more rows to return
 
-  /// We enforce the invariant that query_status_ is not OK iff operation_state_ is
-  /// ERROR_STATE, given that lock_ is held. operation_state_ should only be updated
-  /// using UpdateOperationState(), to ensure that the query profile is also updated.
-  apache::hive::service::cli::thrift::TOperationState::type operation_state_ =
-      apache::hive::service::cli::thrift::TOperationState::INITIALIZED_STATE;
+  /// We enforce the invariant that query_status_ is not OK iff exec_state_ is ERROR,
+  /// given that lock_ is held. exec_state_ should only be updated using
+  /// UpdateExecState(), to ensure that the query profile is also updated.
+  AtomicEnum<ExecState> exec_state_{ExecState::INITIALIZED};
 
   /// The current status of the query tracked by this ClientRequestState. Updated by
   /// UpdateQueryStatus(Status).
@@ -594,10 +605,9 @@
   void ClearResultCache();
 
   /// Update the operation state and the "Query State" summary profile string.
-  /// Does not take lock_, but requires it: caller must ensure lock_
-  /// is taken before calling UpdateOperationState.
-  void UpdateOperationState(
-      apache::hive::service::cli::thrift::TOperationState::type operation_state);
+  /// Does not take lock_, but requires it: caller must ensure lock_ is taken before
+  /// calling UpdateExecState.
+  void UpdateExecState(ExecState exec_state);
 
   /// Gets the query options, their levels and the values for this client request
   /// and populates the result set with them. It covers the subset of options for
@@ -628,6 +638,9 @@
   /// Logs audit and column lineage events. Expects that Wait() has already finished.
   /// Grabs lock_ for polling the query_status(). Hence do not call it under lock_.
   void LogQueryEvents();
+
+  /// Converts the given ExecState to a string representation.
+  std::string ExecStateToString(ExecState state) const;
 };
 
 }
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 4304ce3..3c90cc2 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -324,10 +324,10 @@
   stringstream error_log_ss;
 
   {
-    // Take the lock to ensure that if the client sees a query_state == EXCEPTION, it is
+    // Take the lock to ensure that if the client sees a exec_state == ERROR, it is
     // guaranteed to see the error query_status.
     lock_guard<mutex> l(*request_state->lock());
-    DCHECK_EQ(request_state->BeeswaxQueryState() == beeswax::QueryState::EXCEPTION,
+    DCHECK_EQ(request_state->exec_state() == ClientRequestState::ExecState::ERROR,
         !request_state->query_status().ok());
     // If the query status is !ok, include the status error message at the top of the log.
     if (!request_state->query_status().ok()) {
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index d219eb4..f8e5dc0 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -162,7 +162,7 @@
     return;
   }
 
-  request_state->UpdateNonErrorOperationState(TOperationState::FINISHED_STATE);
+  request_state->UpdateNonErrorExecState(ClientRequestState::ExecState::FINISHED);
 
   Status inflight_status = SetQueryInflight(session, request_state);
   if (!inflight_status.ok()) {
@@ -723,7 +723,7 @@
 
   {
     lock_guard<mutex> l(*request_state->lock());
-    TOperationState::type operation_state = request_state->operation_state();
+    TOperationState::type operation_state = request_state->TOperationState();
     return_val.__set_operationState(operation_state);
     if (operation_state == TOperationState::ERROR_STATE) {
       DCHECK(!request_state->query_status().ok());
@@ -938,7 +938,7 @@
     // guaranteed to see the error query_status.
     lock_guard<mutex> l(*request_state->lock());
     Status query_status = request_state->query_status();
-    DCHECK_EQ(request_state->operation_state() == TOperationState::ERROR_STATE,
+    DCHECK_EQ(request_state->exec_state() == ClientRequestState::ExecState::ERROR,
         !query_status.ok());
     // If the query status is !ok, include the status error message at the top of the log.
     if (!query_status.ok()) ss << query_status.GetDetail();
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 5ce4280..c3bac80 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -847,7 +847,7 @@
       // state lock to be acquired, since it could potentially be an expensive
       // call, if the table Catalog metadata loading is in progress. Instead
       // update the caller that the plan information is unavailable.
-      if (request_state->operation_state() == TOperationState::INITIALIZED_STATE) {
+      if (request_state->exec_state() == ClientRequestState::ExecState::INITIALIZED) {
         document->AddMember(
             "plan_metadata_unavailable", "true", document->GetAllocator());
         return;
@@ -996,9 +996,9 @@
   server_->client_request_state_map_.DoFuncForAllEntries([&running_queries](
       const std::shared_ptr<ClientRequestState>& request_state) {
     // Make sure only queries past admission control are added.
-    auto query_state = request_state->operation_state();
-    if (query_state != TOperationState::INITIALIZED_STATE
-        && query_state != TOperationState::PENDING_STATE
+    auto query_state = request_state->exec_state();
+    if (query_state != ClientRequestState::ExecState::INITIALIZED
+        && query_state != ClientRequestState::ExecState::PENDING
         && request_state->schedule() != nullptr)
       running_queries[request_state->request_pool()].push_back(
           {request_state->query_id(), request_state->schedule()->per_backend_mem_limit(),
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 76219be..11ea058 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -621,8 +621,8 @@
   {
     shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
     if (request_state.get() != nullptr) {
-      // For queries in INITIALIZED_STATE, the profile information isn't populated yet.
-      if (request_state->operation_state() == TOperationState::INITIALIZED_STATE) {
+      // For queries in INITIALIZED state, the profile information isn't populated yet.
+      if (request_state->exec_state() == ClientRequestState::ExecState::INITIALIZED) {
         return Status::Expected("Query plan is not ready.");
       }
       lock_guard<mutex> l(*request_state->lock());
@@ -691,7 +691,7 @@
       lock_guard<mutex> l(*request_state->lock());
       RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
           request_state->user_has_profile_access()));
-      if (request_state->operation_state() == TOperationState::PENDING_STATE) {
+      if (request_state->exec_state() == ClientRequestState::ExecState::PENDING) {
         const string* admission_result = request_state->summary_profile()->GetInfoString(
             AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT);
         if (admission_result != nullptr) {
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 79c3be5..8f5066e 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -242,7 +242,6 @@
     return client
 
   @classmethod
-
   def get_impalad_cluster_size(cls):
     return len(cls.__get_cluster_host_ports('beeswax'))
 
@@ -1125,19 +1124,23 @@
     raise Exception("Table {0}.{1} didn't show up after {2}s", db_name, table_name,
                     timeout_s)
 
-  def assert_eventually(self, timeout_s, period_s, condition):
+  def assert_eventually(self, timeout_s, period_s, condition, error_msg=None):
     """Assert that the condition (a function with no parameters) returns True within the
     given timeout. The condition is executed every period_s seconds. The check assumes
     that once the condition returns True, it continues to return True. Throws a Timeout
-    if the condition does not return true within timeout_s seconds."""
+    if the condition does not return true within timeout_s seconds. 'error_msg' is an
+    optional function that must return a string. If set, the result of the function will
+    be included in the Timeout error message."""
     count = 0
     start_time = time.time()
     while not condition() and time.time() - start_time < timeout_s:
       time.sleep(period_s)
       count += 1
     if not condition():
-      raise Timeout("Check failed to return True after {0} tries and {1} seconds".format(
-          count, timeout_s))
+      error_msg_str = " error message: " + error_msg() if error_msg else ""
+      raise Timeout(
+        "Check failed to return True after {0} tries and {1} seconds{2}".format(
+          count, timeout_s, error_msg_str))
 
   def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
     """
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 9087852..101f086 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -17,17 +17,17 @@
 
 from collections import defaultdict
 from datetime import datetime
-from tests.common.impala_cluster import ImpalaCluster
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon,
                                SkipIfLocal, SkipIfNotHdfsMinicluster)
 from tests.util.filesystem_utils import IS_EC
-from time import sleep, time
+from time import sleep
 from RuntimeProfile.ttypes import TRuntimeProfileFormat
-import logging
 import pytest
 import re
 
+
 class TestObservability(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -158,27 +158,6 @@
     assert result.exec_summary[0]['peak_mem'] >= 0
     assert result.exec_summary[0]['est_peak_mem'] >= 0
 
-  def test_query_states(self):
-    """Tests that the query profile shows expected query states."""
-    query = "select count(*) from functional.alltypes"
-    handle = self.execute_query_async(query,
-        {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
-    # If ExecuteStatement() has completed and the query is paused in the admission control
-    # phase, then the query must be in COMPILED state.
-    profile = self.client.get_runtime_profile(handle)
-    assert "Query State: COMPILED" in profile
-    # After completion of the admission control phase, the query must have at least
-    # reached RUNNING state.
-    self.client.wait_for_admission_control(handle)
-    profile = self.client.get_runtime_profile(handle)
-    assert "Query State: RUNNING" in profile or \
-      "Query State: FINISHED" in profile, profile
-
-    results = self.client.fetch(query, handle)
-    profile = self.client.get_runtime_profile(handle)
-    # After fetching the results, the query must be in state FINISHED.
-    assert "Query State: FINISHED" in profile, profile
-
   def test_query_options(self):
     """Test that the query profile shows expected non-default query options, both set
     explicitly through client and those set by planner"""
@@ -747,3 +726,64 @@
     result = self.execute_query(query)
     assert result.success
     self.__verify_hashtable_stats_profile(result.runtime_profile)
+
+
+class TestQueryStates(ImpalaTestSuite):
+  """Test that the 'Query State' and 'Impala Query State' are set correctly in the
+  runtime profile."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  def test_query_states(self):
+    """Tests that the query profile shows expected query states."""
+    query = "select count(*) from functional.alltypes where bool_col = sleep(10)"
+    handle = self.execute_query_async(query,
+        {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+    # If ExecuteStatement() has completed and the query is paused in the admission control
+    # phase, then the query must be in COMPILED state.
+    profile = self.client.get_runtime_profile(handle)
+    assert self.__is_line_in_profile("Query State: COMPILED", profile)
+    assert self.__is_line_in_profile("Impala Query State: PENDING", profile)
+    # After completion of the admission control phase, the query must have at least
+    # reached RUNNING state.
+    self.client.wait_for_admission_control(handle)
+    profile = self.client.get_runtime_profile(handle)
+    assert self.__is_line_in_profile("Query State: RUNNING", profile), profile
+    assert self.__is_line_in_profile("Impala Query State: RUNNING", profile), profile
+
+    self.client.fetch(query, handle)
+    profile = self.client.get_runtime_profile(handle)
+    # After fetching the results, the query must be in state FINISHED.
+    assert self.__is_line_in_profile("Query State: FINISHED", profile), profile
+    assert self.__is_line_in_profile("Impala Query State: FINISHED", profile), profile
+
+  def test_error_query_state(self):
+    """Tests that the query profile shows the proper error state."""
+    query = "select * from functional.alltypes limit 10"
+    handle = self.execute_query_async(query, {"abort_on_error": "1",
+                                              "debug_action": "0:GETNEXT:FAIL"})
+
+    def assert_finished():
+      profile = self.client.get_runtime_profile(handle)
+      return self.__is_line_in_profile("Query State: FINISHED", profile) and \
+             self.__is_line_in_profile("Impala Query State: FINISHED", profile)
+
+    self.assert_eventually(30, 1, assert_finished,
+      lambda: self.client.get_runtime_profile(handle))
+
+    try:
+      self.client.fetch(query, handle)
+      assert False
+    except ImpalaBeeswaxException:
+      pass
+
+    profile = self.client.get_runtime_profile(handle)
+    assert self.__is_line_in_profile("Query State: EXCEPTION", profile), profile
+    assert self.__is_line_in_profile("Impala Query State: ERROR", profile), profile
+
+  def __is_line_in_profile(self, line, profile):
+    """Returns true if the given 'line' is in the given 'profile'. A single line of the
+    profile must exactly match the given 'line' (excluding whitespaces)."""
+    return re.search("^\s*{0}\s*$".format(line), profile, re.M)