IMPALA-9930 (part 1): Initial refactor for admission control service

This patch contains the following refactors that are needed for the
admission control service, in order to make the main patch easier to
review:
- Adds a new class AdmissionControlClient which will be used to
  abstract the logic for submitting queries to either a local or
  remote admission controller out from ClientRequestState/Coordinator.
  Currently only local submission is supported.
- SubmitForAdmission now takes a BackendId representing the
  coordinator instead of assuming that the local impalad will be the
  coordinator.
- The CRS_BEFORE_ADMISSION debug action is moved into
  SubmitForAdmission() so that it will be executed on whichever daemon
  is performing admission control rather than always on the
  coordinator (needed for TestAdmissionController.test_cancellation).
- ShardedQueryMap is extended to allow keys to be either TUniqueId or
  UniqueIdPB and Add(), Get(), and Delete() convenience functions are
  added.
- Some utils related to seralizing Thrift objects into sidecars are
  added.

Testing:
- Passed a run of existing core tests.

Change-Id: I7974a979cf05ed569f31e1ab20694e29fd3e4508
Reviewed-on: http://gerrit.cloudera.org:8080/16411
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/rpc/sidecar-util.h b/be/src/rpc/sidecar-util.h
new file mode 100644
index 0000000..47b15f6
--- /dev/null
+++ b/be/src/rpc/sidecar-util.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/faststring.h"
+#include "rpc/thrift-util.h"
+#include "util/kudu-status-util.h"
+
+DECLARE_int64(rpc_max_message_size);
+
+namespace impala {
+
+class KrpcSerializer {
+ public:
+  KrpcSerializer() : serializer_(/* compact */ true) {}
+
+  /// Serialize obj and set it as a sidecar on 'rpc_controller', returning the idx in
+  /// 'sidecar_idx'. The memory for the sidecar is owned by this object and must remain
+  /// valid until the rpc has completed.
+  template <class T>
+  Status SerializeToSidecar(
+      const T* obj, kudu::rpc::RpcController* rpc_controller, int* sidecar_idx) {
+    uint8_t* serialized_buf = nullptr;
+    uint32_t serialized_len = 0;
+    RETURN_IF_ERROR(serializer_.SerializeToBuffer(obj, &serialized_len, &serialized_buf));
+    std::unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
+        kudu::rpc::RpcSidecar::FromSlice(kudu::Slice(serialized_buf, serialized_len));
+    KUDU_RETURN_IF_ERROR(
+        rpc_controller->AddOutboundSidecar(move(rpc_sidecar), sidecar_idx),
+        "Failed to add sidecar");
+    return Status::OK();
+  }
+
+ private:
+  ThriftSerializer serializer_;
+};
+
+// Retrieves the sidecar at 'sidecar_idx' from 'rpc_context' and deserializes it into
+// 'thrift_obj'. 'rpc' can be either an RpcContext or an RpcController.
+template <typename RPC, typename T>
+Status GetSidecar(int sidecar_idx, RPC* rpc, T* thrift_obj) {
+  kudu::Slice sidecar_slice;
+  KUDU_RETURN_IF_ERROR(
+      rpc->GetInboundSidecar(sidecar_idx, &sidecar_slice), "Failed to get sidecar");
+  uint32_t len = sidecar_slice.size();
+  RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, thrift_obj));
+  return Status::OK();
+}
+
+// Serializes 'obj' and sets is as a sidecar on 'rpc' using a faststring, which transfers
+// ownership of the sidecar memory to the RPC layer. This introduces another copy that
+// wouldn't be necessary if using a Slice sidecar, but it's convenient for situations
+// where it would be tricky to manually ensure that the sidecar's memory is released after
+// the RPC has completed. 'rpc' may be either an RpcContext or an RpcController.
+//
+// If successful, the sidecar idx is returned in 'sidecar_idx'.
+template <typename RPC, typename T>
+Status SetFaststringSidecar(const T& obj, RPC* rpc, int* sidecar_idx) {
+  ThriftSerializer serializer(/* compact */ true);
+  uint8_t* serialized_buf = nullptr;
+  uint32_t serialized_len = 0;
+  RETURN_IF_ERROR(serializer.SerializeToBuffer(&obj, &serialized_len, &serialized_buf));
+  if (serialized_len > FLAGS_rpc_max_message_size) {
+    return Status("Serialized sidecar exceeds --rpc_max_message_size.");
+  }
+  std::unique_ptr<kudu::faststring> sidecar_str = std::make_unique<kudu::faststring>();
+  sidecar_str->assign_copy(serialized_buf, serialized_len);
+  std::unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
+      kudu::rpc::RpcSidecar::FromFaststring(std::move(sidecar_str));
+  RETURN_IF_ERROR(FromKuduStatus(
+      rpc->AddOutboundSidecar(move(rpc_sidecar), sidecar_idx), "Failed to add sidecar"));
+  return Status::OK();
+}
+
+} // namespace impala
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index fdf21d4..1b969a1 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -30,6 +30,7 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "rpc/rpc-mgr.inline.h"
+#include "rpc/sidecar-util.h"
 #include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator-filter-state.h"
@@ -268,38 +269,20 @@
     exec_rpc_controller_.set_timeout(
         MonoDelta::FromMilliseconds(FLAGS_backend_client_rpc_timeout_ms));
 
-    // Serialize the sidecar and add it to the rpc controller. The serialized buffer is
-    // owned by 'serializer' and is freed when it is destructed.
-    ThriftSerializer serializer(true);
-    uint8_t* serialized_buf = nullptr;
-    uint32_t serialized_len = 0;
-    Status serialize_status =
+    // Serialize the sidecar and add it to the rpc controller.
+    Status serialize_debug_status =
         DebugAction(exec_params_.query_options(), "EXEC_SERIALIZE_FRAGMENT_INFO");
-    if (LIKELY(serialize_status.ok())) {
-      serialize_status =
-          serializer.SerializeToBuffer(&fragment_info, &serialized_len, &serialized_buf);
-    }
-    if (UNLIKELY(!serialize_status.ok())) {
-      SetExecError(serialize_status, exec_status_barrier);
-      goto done;
-    } else if (serialized_len > FLAGS_rpc_max_message_size) {
-      SetExecError(
-          Status::Expected("Serialized Exec() request exceeds --rpc_max_message_size."),
-          exec_status_barrier);
+    if (UNLIKELY(!serialize_debug_status.ok())) {
+      SetExecError(serialize_debug_status, exec_status_barrier);
       goto done;
     }
 
-    // TODO: eliminate the extra copy here by using a Slice
-    unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
-    sidecar_buf->assign_copy(serialized_buf, serialized_len);
-    unique_ptr<RpcSidecar> rpc_sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
-
     int sidecar_idx;
-    kudu::Status sidecar_status =
-        exec_rpc_controller_.AddOutboundSidecar(move(rpc_sidecar), &sidecar_idx);
-    if (!sidecar_status.ok()) {
-      SetExecError(
-          FromKuduStatus(sidecar_status, "Failed to add sidecar"), exec_status_barrier);
+    // TODO: eliminate the extra copy here by using a Slice
+    Status sidecar_status =
+        SetFaststringSidecar(fragment_info, &exec_rpc_controller_, &sidecar_idx);
+    if (UNLIKELY(!sidecar_status.ok())) {
+      SetExecError(sidecar_status, exec_status_barrier);
       goto done;
     }
     request.set_plan_fragment_info_sidecar_idx(sidecar_idx);
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7bf5ec7..8a90007 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -47,7 +47,7 @@
 #include "runtime/query-driver.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/query-state.h"
-#include "scheduling/admission-controller.h"
+#include "scheduling/admission-control-client.h"
 #include "scheduling/scheduler.h"
 #include "service/client-request-state.h"
 #include "util/bit-util.h"
@@ -1270,24 +1270,24 @@
   // AdmissionController::ReleaseQuery.
   backend_released_barrier_.Wait();
   LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
-  AdmissionController* admission_controller =
-      ExecEnv::GetInstance()->admission_controller();
-  DCHECK(admission_controller != nullptr);
-  admission_controller->ReleaseQuery(exec_params_.query_id(),
+  AdmissionControlClient* admission_control_client =
+      parent_request_state_->admission_control_client();
+  DCHECK(admission_control_client != nullptr);
+  admission_control_client->ReleaseQuery(
       ComputeQueryResourceUtilization().peak_per_host_mem_consumption);
   query_events_->MarkEvent("Released admission control resources");
 }
 
 void Coordinator::ReleaseBackendAdmissionControlResources(
     const vector<BackendState*>& backend_states) {
-  AdmissionController* admission_controller =
-      ExecEnv::GetInstance()->admission_controller();
-  DCHECK(admission_controller != nullptr);
+  AdmissionControlClient* admission_control_client =
+      parent_request_state_->admission_control_client();
+  DCHECK(admission_control_client != nullptr);
   vector<NetworkAddressPB> host_addrs;
   for (auto backend_state : backend_states) {
     host_addrs.push_back(backend_state->impalad_address());
   }
-  admission_controller->ReleaseQueryBackends(exec_params_.query_id(), host_addrs);
+  admission_control_client->ReleaseQueryBackends(host_addrs);
 }
 
 Coordinator::ResourceUtilization Coordinator::ComputeQueryResourceUtilization() {
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index d178f43..604054c 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -367,27 +367,27 @@
   return Status::OK();
 }
 
-Status QueryDriver::Unregister(QueryDriverMap* query_driver_map) {
+Status QueryDriver::Unregister(ImpalaServer::QueryDriverMap* query_driver_map) {
   DCHECK(finalized_.Load());
   const TUniqueId* query_id = nullptr;
   const TUniqueId* retry_query_id = nullptr;
   {
     // In order to preserve a consistent lock ordering, client_request_state_lock_ is
-    // released before DeleteQueryDriver is called, as DeleteQueryDriver requires taking
+    // released before QueryDriverMap::Delete() is called, as Delete() requires taking
     // a ScopedShardedMapRef (a sharded map lock). Methods in ImpalaServer (such as
     // UnresponsiveBackendThread) require taking a ScopedShardedMapRef and then calling
     // Get*ClientRequestState methods. So in order to define a consistent lock ordering
     // (e.g. acquire ScopedShardedMapRef before client_request_state_lock_)
-    // client_request_state_lock_ is released before calling DeleteQueryDriver.
+    // client_request_state_lock_ is released before calling Delete().
     lock_guard<SpinLock> l(client_request_state_lock_);
     query_id = &client_request_state_->query_id();
     if (retried_client_request_state_ != nullptr) {
       retry_query_id = &retried_client_request_state_->query_id();
     }
   }
-  RETURN_IF_ERROR(query_driver_map->DeleteQueryDriver(*query_id));
+  RETURN_IF_ERROR(query_driver_map->Delete(*query_id));
   if (retry_query_id != nullptr) {
-    RETURN_IF_ERROR(query_driver_map->DeleteQueryDriver(*retry_query_id));
+    RETURN_IF_ERROR(query_driver_map->Delete(*retry_query_id));
   }
   return Status::OK();
 }
diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h
index e785572..bc6264a 100644
--- a/be/src/runtime/query-driver.h
+++ b/be/src/runtime/query-driver.h
@@ -182,7 +182,7 @@
   Status Finalize(QueryHandle* query_handle, bool check_inflight, const Status* cause);
 
   /// Delete this query from the given QueryDriverMap.
-  Status Unregister(QueryDriverMap* query_driver_map) WARN_UNUSED_RESULT;
+  Status Unregister(ImpalaServer::QueryDriverMap* query_driver_map) WARN_UNUSED_RESULT;
 
   /// True if Finalize() was called.
   bool finalized() const { return finalized_.Load(); }
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 4f1f1d3..90f5357 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -29,11 +29,13 @@
 add_library(Scheduling STATIC
   ${ADMISSION_CONTROL_SERVICE_PROTO_SRCS}
   admission-controller.cc
+  admission-control-client.cc
   executor-blacklist.cc
   cluster-membership-mgr.cc
   cluster-membership-test-util.cc
   executor-group.cc
   hash-ring.cc
+  local-admission-control-client.cc
   request-pool-service.cc
   scheduler-test-util.cc
   schedule-state.cc
diff --git a/be/src/scheduling/admission-control-client.cc b/be/src/scheduling/admission-control-client.cc
new file mode 100644
index 0000000..e5f4c51
--- /dev/null
+++ b/be/src/scheduling/admission-control-client.cc
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/admission-control-client.h"
+
+#include "common/names.h"
+#include "scheduling/local-admission-control-client.h"
+
+namespace impala {
+
+void AdmissionControlClient::Create(
+    const TUniqueId& query_id, unique_ptr<AdmissionControlClient>* client) {
+  client->reset(new LocalAdmissionControlClient(query_id));
+}
+
+} // namespace impala
diff --git a/be/src/scheduling/admission-control-client.h b/be/src/scheduling/admission-control-client.h
new file mode 100644
index 0000000..6d6dbb6
--- /dev/null
+++ b/be/src/scheduling/admission-control-client.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "gen-cpp/Types_types.h"
+#include "gen-cpp/admission_control_service.pb.h"
+#include "gen-cpp/common.pb.h"
+#include "scheduling/admission-controller.h"
+
+namespace impala {
+
+// Base class used to abstract out the logic for submitting queries to an admission
+// controller running locally or to one running remotely.
+class AdmissionControlClient {
+ public:
+  // Creates a new AdmissionControlClient and returns it in 'client'.
+  static void Create(
+      const TUniqueId& query_id, std::unique_ptr<AdmissionControlClient>* client);
+
+  virtual ~AdmissionControlClient() {}
+
+  // Called to schedule and admit the query. Blocks until an admission decision is made.
+  virtual Status SubmitForAdmission(const AdmissionController::AdmissionRequest& request,
+      std::unique_ptr<QuerySchedulePB>* schedule_result) = 0;
+
+  // Called when the query has completed to release all of its resources.
+  virtual void ReleaseQuery(int64_t peak_mem_consumption) = 0;
+
+  // Called with a list of backends the query has completed on, to release the resources
+  // for the query on those backends.
+  virtual void ReleaseQueryBackends(const std::vector<NetworkAddressPB>& host_addr) = 0;
+
+  // Called to cancel admission for the query.
+  virtual void CancelAdmission() = 0;
+};
+
+} // namespace impala
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 4f58495..593ba8b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -210,7 +210,8 @@
     "THREAD_RESERVATION_AGGREGATE_LIMIT query option value: $1 > $2.";
 // $0 is the error message returned by the scheduler.
 const string REASON_SCHEDULER_ERROR = "Error during scheduling: $0";
-const string REASON_LOCAL_BACKEND_NOT_STARTED = "Local backend has not started up yet.";
+const string REASON_COORDINATOR_NOT_FOUND =
+    "Coordinator not registered with the statestore.";
 const string REASON_NO_EXECUTOR_GROUPS =
     "Waiting for executors to start. Only DDL queries and queries scheduled only on the "
     "coordinator (either NUM_NODES set to 1 or when small query optimization is "
@@ -1114,6 +1115,7 @@
 Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
     Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
     unique_ptr<QuerySchedulePB>* schedule_result) {
+  DebugActionNoFail(request.query_options, "AC_BEFORE_ADMISSION");
   DCHECK(schedule_result->get() == nullptr);
 
   ClusterMembershipMgr::SnapshotPtr membership_snapshot =
@@ -1519,15 +1521,19 @@
   std::vector<GroupScheduleState>* output_schedules = &queue_node->group_states;
   output_schedules->clear();
 
-  // If the first statestore update arrives before the local backend has finished starting
-  // up, we might not have a local backend descriptor yet. We return no schedules, which
-  // will result in the query being queued.
-  if (membership_snapshot->local_be_desc == nullptr) {
-    queue_node->not_admitted_reason = REASON_LOCAL_BACKEND_NOT_STARTED;
+  // Queries may arrive before we've gotten a statestore update containing the descriptor
+  // for their coordinator, in which case we queue the query until it arrives. It's also
+  // possible (though very unlikely) that the coordinator was removed from the cluster
+  // membership after submitting this query for admission. Currently in this case the
+  // query will remain queued until it times out, but we can consider detecting failed
+  // coordinators and cleaning up their queued queries.
+  auto it = membership_snapshot->current_backends.find(PrintId(request.coord_id));
+  if (it == membership_snapshot->current_backends.end()) {
+    queue_node->not_admitted_reason = REASON_COORDINATOR_NOT_FOUND;
     LOG(WARNING) << queue_node->not_admitted_reason;
     return Status::OK();
   }
-  const BackendDescriptorPB& local_be_desc = *membership_snapshot->local_be_desc;
+  const BackendDescriptorPB& coord_desc = it->second;
 
   vector<const ExecutorGroup*> executor_groups =
       GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request);
@@ -1568,7 +1574,7 @@
     const string& group_name = executor_group->name();
     VLOG(3) << "Scheduling for executor group: " << group_name << " with "
             << executor_group->NumExecutors() << " executors";
-    const Scheduler::ExecutorConfig group_config = {*executor_group, local_be_desc};
+    const Scheduler::ExecutorConfig group_config = {*executor_group, coord_desc};
     RETURN_IF_ERROR(
         ExecEnv::GetInstance()->scheduler()->Schedule(group_config, group_state.get()));
     DCHECK(!group_state->executor_group().empty());
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index a9c3805..8c20fcf 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -317,6 +317,7 @@
   /// are owned by the ClientRequestState).
   struct AdmissionRequest {
     const UniqueIdPB& query_id;
+    const UniqueIdPB& coord_id;
     const TQueryExecRequest& request;
     const TQueryOptions& query_options;
     RuntimeProfile* summary_profile;
diff --git a/be/src/scheduling/local-admission-control-client.cc b/be/src/scheduling/local-admission-control-client.cc
new file mode 100644
index 0000000..0f785da
--- /dev/null
+++ b/be/src/scheduling/local-admission-control-client.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/local-admission-control-client.h"
+
+#include "runtime/exec-env.h"
+#include "util/uid-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+LocalAdmissionControlClient::LocalAdmissionControlClient(const TUniqueId& query_id) {
+  TUniqueIdToUniqueIdPB(query_id, &query_id_);
+}
+
+Status LocalAdmissionControlClient::SubmitForAdmission(
+    const AdmissionController::AdmissionRequest& request,
+    std::unique_ptr<QuerySchedulePB>* schedule_result) {
+  return ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
+      request, &admit_outcome_, schedule_result);
+}
+
+void LocalAdmissionControlClient::ReleaseQuery(int64_t peak_mem_consumption) {
+  ExecEnv::GetInstance()->admission_controller()->ReleaseQuery(
+      query_id_, peak_mem_consumption);
+}
+
+void LocalAdmissionControlClient::ReleaseQueryBackends(
+    const vector<NetworkAddressPB>& host_addrs) {
+  ExecEnv::GetInstance()->admission_controller()->ReleaseQueryBackends(
+      query_id_, host_addrs);
+}
+
+void LocalAdmissionControlClient::CancelAdmission() {
+  admit_outcome_.Set(AdmissionOutcome::CANCELLED);
+}
+
+} // namespace impala
diff --git a/be/src/scheduling/local-admission-control-client.h b/be/src/scheduling/local-admission-control-client.h
new file mode 100644
index 0000000..8f3da3c
--- /dev/null
+++ b/be/src/scheduling/local-admission-control-client.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "gen-cpp/Types_types.h"
+#include "gen-cpp/admission_control_service.pb.h"
+#include "gen-cpp/common.pb.h"
+#include "scheduling/admission-control-client.h"
+#include "scheduling/admission-controller.h"
+
+namespace impala {
+
+/// Implementation of AdmissionControlClient used to submit queries for admission to an
+/// AdmissionController running locally on the coordinator.
+class LocalAdmissionControlClient : public AdmissionControlClient {
+ public:
+  LocalAdmissionControlClient(const TUniqueId& query_id);
+
+  virtual Status SubmitForAdmission(const AdmissionController::AdmissionRequest& request,
+      std::unique_ptr<QuerySchedulePB>* schedule_result) override;
+  virtual void ReleaseQuery(int64_t peak_mem_consumption) override;
+  virtual void ReleaseQueryBackends(
+      const std::vector<NetworkAddressPB>& host_addr) override;
+  virtual void CancelAdmission() override;
+
+ private:
+  // The id of the query being considered for admission.
+  UniqueIdPB query_id_;
+
+  /// Promise used by the admission controller. AdmissionController:SubmitForAdmission()
+  /// will block on this promise until the query is either rejected, admitted, times out,
+  /// or is cancelled. Can be set to CANCELLED by CancelAdmission() in order to cancel,
+  /// but otherwise is set by AdmissionController with the admission decision.
+  Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER> admit_outcome_;
+};
+
+} // namespace impala
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index ec79bd5..c28dda3 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -90,11 +90,11 @@
     const ExecutorConfig& executor_config, const NetworkAddressPB& host) {
   const BackendDescriptorPB* desc = executor_config.group.LookUpBackendDesc(host);
   if (desc == nullptr) {
-    // Local host may not be in executor_config's executor group if it's a dedicated
+    // Coordinator host may not be in executor_config's executor group if it's a dedicated
     // coordinator, or if it is configured to be in a different executor group.
-    const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
-    DCHECK(host == local_be_desc.address());
-    desc = &local_be_desc;
+    const BackendDescriptorPB& coord_desc = executor_config.coord_desc;
+    DCHECK(host == coord_desc.address());
+    desc = &coord_desc;
   }
   return *desc;
 }
@@ -286,10 +286,10 @@
     if (fragment_state->is_coord_fragment || executor_config.group.NumExecutors() == 0) {
       // The coordinator fragment must be scheduled on the coordinator. Otherwise if
       // no executors are available, we need to schedule on the coordinator.
-      const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
-      host = local_be_desc.address();
-      DCHECK(local_be_desc.has_krpc_address());
-      krpc_host = local_be_desc.krpc_address();
+      const BackendDescriptorPB& coord_desc = executor_config.coord_desc;
+      host = coord_desc.address();
+      DCHECK(coord_desc.has_krpc_address());
+      krpc_host = coord_desc.krpc_address();
     } else if (fragment_state->exchange_input_fragments.size() > 0) {
       // Interior unpartitioned fragments can be scheduled on an arbitrary executor.
       // Pick a random instance from the first input fragment.
@@ -410,8 +410,7 @@
   DCHECK(has_union || scan_node_ids.size() == 1) << "This method may need revisiting "
       << "for plans with no union and multiple scans per fragment";
   vector<NetworkAddressPB> scan_hosts;
-  GetScanHosts(
-      executor_config.local_be_desc, scan_node_ids, *fragment_state, &scan_hosts);
+  GetScanHosts(executor_config.coord_desc, scan_node_ids, *fragment_state, &scan_hosts);
   for (const NetworkAddressPB& host_addr : scan_hosts) {
     // Ensure that the num instances is at least as many as input fragments. We don't
     // want to increment if there were already some instances from the input fragment,
@@ -625,8 +624,8 @@
   // TODO: Either get this from the ExecutorConfig or modify the AssignmentCtx interface
   // to handle this case.
   ExecutorGroup coord_only_executor_group("coordinator-only-group");
-  const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
-  coord_only_executor_group.AddExecutor(local_be_desc);
+  const BackendDescriptorPB& coord_desc = executor_config.coord_desc;
+  coord_only_executor_group.AddExecutor(coord_desc);
   VLOG_ROW << "Exec at coord is " << (exec_at_coord ? "true" : "false");
   AssignmentCtx assignment_ctx(exec_at_coord ? coord_only_executor_group : executor_group,
       total_assignments_, total_local_assignments_, rng);
@@ -642,9 +641,9 @@
     // Select executor for the current scan range.
     if (exec_at_coord) {
       DCHECK(assignment_ctx.executor_group().LookUpExecutorIp(
-          local_be_desc.address().hostname(), nullptr));
-      assignment_ctx.RecordScanRangeAssignment(local_be_desc, node_id, host_list,
-          scan_range_locations, assignment);
+          coord_desc.address().hostname(), nullptr));
+      assignment_ctx.RecordScanRangeAssignment(
+          coord_desc, node_id, host_list, scan_range_locations, assignment);
     } else {
       // Collect executor candidates with smallest memory distance.
       vector<IpAddr> executor_candidates;
@@ -786,7 +785,7 @@
   return FindNodes(plan, SCAN_NODE_TYPES);
 }
 
-void Scheduler::GetScanHosts(const BackendDescriptorPB& local_be_desc,
+void Scheduler::GetScanHosts(const BackendDescriptorPB& coord_desc,
     const vector<TPlanNodeId>& scan_ids, const FragmentScheduleState& fragment_state,
     vector<NetworkAddressPB>* scan_hosts) {
   for (const TPlanNodeId& scan_id : scan_ids) {
@@ -804,7 +803,7 @@
       // TODO: we'll need to revisit this strategy once we can partition joins
       // (in which case this fragment might be executing a right outer join
       // with a large build table)
-      scan_hosts->push_back(local_be_desc.address());
+      scan_hosts->push_back(coord_desc.address());
     }
   }
 }
@@ -890,7 +889,7 @@
 
   // This also ensures an entry always exists for the coordinator backend.
   int64_t coord_min_reservation = 0;
-  const NetworkAddressPB& coord_addr = executor_config.local_be_desc.address();
+  const NetworkAddressPB& coord_addr = executor_config.coord_desc.address();
   BackendScheduleState& coord_be_state =
       state->GetOrCreateBackendScheduleState(coord_addr);
   coord_be_state.exec_params->set_is_coord_backend(true);
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index ccf8948..a9093cb 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -65,7 +65,7 @@
   /// Current snapshot of executors to be used for scheduling a scan.
   struct ExecutorConfig {
     const ExecutorGroup& group;
-    const BackendDescriptorPB& local_be_desc;
+    const BackendDescriptorPB& coord_desc;
   };
 
   /// Populates given query schedule and assigns fragments to hosts based on scan
@@ -415,7 +415,7 @@
 
   /// Add all hosts that the scans identified by 'scan_ids' are executed on to
   /// 'scan_hosts'.
-  void GetScanHosts(const BackendDescriptorPB& local_be_desc,
+  void GetScanHosts(const BackendDescriptorPB& coord_desc,
       const std::vector<TPlanNodeId>& scan_ids,
       const FragmentScheduleState& fragment_state,
       std::vector<NetworkAddressPB>* scan_hosts);
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index e140b61..8f030ad 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -41,7 +41,6 @@
   impala-internal-service.cc
   impalad-main.cc
   impala-server.cc
-  query-driver-map.cc
   query-options.cc
   query-result-set.cc
 )
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 84c250b..931f865 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -39,7 +39,7 @@
 #include "runtime/query-driver.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
-#include "scheduling/admission-controller.h"
+#include "scheduling/admission-control-client.h"
 #include "scheduling/scheduler.h"
 #include "service/frontend.h"
 #include "service/impala-server.h"
@@ -161,6 +161,8 @@
       TNetworkAddressToString(ExecEnv::GetInstance()->GetThriftBackendAddress()));
 
   summary_profile_->AddChild(frontend_profile_);
+
+  AdmissionControlClient::Create(query_ctx.query_id, &admission_control_client_);
 }
 
 ClientRequestState::~ClientRequestState() {
@@ -539,17 +541,14 @@
 }
 
 void ClientRequestState::FinishExecQueryOrDmlRequest() {
-  DebugActionNoFail(exec_request_->query_options, "CRS_BEFORE_ADMISSION");
-
-  DCHECK(ExecEnv::GetInstance()->admission_controller() != nullptr);
   DCHECK(exec_request_->__isset.query_exec_request);
   UniqueIdPB query_id_pb;
   TUniqueIdToUniqueIdPB(query_id(), &query_id_pb);
-  Status admit_status =
-      ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
-          {query_id_pb, exec_request_->query_exec_request, exec_request_->query_options,
-              summary_profile_, query_events_, blacklisted_executor_addresses_},
-          &admit_outcome_, &schedule_);
+  Status admit_status = admission_control_client_->SubmitForAdmission(
+      {query_id_pb, ExecEnv::GetInstance()->backend_id(),
+          exec_request_->query_exec_request, exec_request_->query_options,
+          summary_profile_, query_events_, blacklisted_executor_addresses_},
+      &schedule_);
   {
     lock_guard<mutex> l(lock_);
     if (!UpdateQueryStatus(admit_status).ok()) return;
@@ -1189,7 +1188,7 @@
           || retry_state() == RetryState::RETRYING);
     }
 
-    admit_outcome_.Set(AdmissionOutcome::CANCELLED);
+    admission_control_client_->CancelAdmission();
     is_cancelled_ = true;
   } // Release lock_ before doing cancellation work.
 
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 9cbbd2b..0c7bb6a 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -34,6 +34,7 @@
 
 namespace impala {
 
+class AdmissionControlClient;
 class ClientRequestStateCleaner;
 class Coordinator;
 class Expr;
@@ -44,7 +45,6 @@
 class Thread;
 class TRuntimeProfileTree;
 class TupleRow;
-enum class AdmissionOutcome;
 class QuerySchedulePB;
 
 /// Execution state of the client-facing side of a query. This captures everything
@@ -390,6 +390,10 @@
   /// Returns the QueryDriver that owns this ClientRequestState.
   QueryDriver* parent_driver() const { return parent_driver_; }
 
+  AdmissionControlClient* admission_control_client() const {
+    return admission_control_client_.get();
+  }
+
   /// Returns true if results cacheing is enabled, false otherwise.
   bool IsResultCacheingEnabled() const { return result_cache_max_size_ >= 0; }
 
@@ -454,12 +458,6 @@
   /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
   const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
 
-  /// Promise used by the admission controller. AdmissionController:AdmitQuery() will
-  /// block on this promise until the query is either rejected, admitted, times out, or is
-  /// cancelled. Can be set to CANCELLED by the ClientRequestState in order to cancel, but
-  /// otherwise is set by AdmissionController with the admission decision.
-  Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER> admit_outcome_;
-
   /// Protects all following fields. Acquirers should be careful not to hold it for too
   /// long, e.g. during RPCs because this lock is required to make progress on various
   /// ImpalaServer requests. If held for too long it can block progress of client
@@ -650,6 +648,10 @@
   /// constructor. It always outlives the ClientRequestState.
   QueryDriver* parent_driver_;
 
+  /// Manages interactions with the AdmissionController, which may be either local or
+  /// remote.
+  std::unique_ptr<AdmissionControlClient> admission_control_client_;
+
   /// Executes a local catalog operation (an operation that does not need to execute
   /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
   Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op) WARN_UNUSED_RESULT;
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index 33a3f84..34c3800 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -24,11 +24,12 @@
 #include "kudu/rpc/rpc_controller.h"
 #include "rpc/rpc-mgr.h"
 #include "rpc/rpc-mgr.inline.h"
+#include "rpc/sidecar-util.h"
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/query-exec-mgr.h"
 #include "runtime/query-driver.h"
+#include "runtime/query-exec-mgr.h"
 #include "runtime/query-state.h"
 #include "service/client-request-state.h"
 #include "service/impala-server.h"
@@ -114,18 +115,6 @@
   return Status::OK();
 }
 
-// Retrieves the sidecar at 'sidecar_idx' from 'rpc_context' and deserializes it into
-// 'thrift_obj'.
-template <typename T>
-static Status GetSidecar(int sidecar_idx, RpcContext* rpc_context, T* thrift_obj) {
-  kudu::Slice sidecar_slice;
-  KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(sidecar_idx, &sidecar_slice),
-      "Failed to get sidecar");
-  uint32_t len = sidecar_slice.size();
-  RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, thrift_obj));
-  return Status::OK();
-}
-
 void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request,
     ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
   DebugActionNoFail(FLAGS_debug_actions, "EXEC_QUERY_FINSTANCES_DELAY");
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index b8ce8ac..3c75851 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1215,8 +1215,7 @@
     return Status::Expected("Session has been closed, ignoring query.");
   }
   DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
-  RETURN_IF_ERROR(
-      query_driver_map_.AddQueryDriver(query_id, query_handle->query_driver()));
+  RETURN_IF_ERROR(query_driver_map_.Add(query_id, query_handle->query_driver()));
   // Metric is decremented in UnregisterQuery().
   ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(1L);
   VLOG_QUERY << "Registered query query_id=" << PrintId(query_id) << " session_id="
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8408c90..52e0a85 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -37,12 +37,12 @@
 #include "kudu/util/random.h"
 #include "rpc/thrift-server.h"
 #include "runtime/types.h"
-#include "service/query-driver-map.h"
 #include "service/query-options.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/condition-variable.h"
 #include "util/container-util.h"
 #include "util/runtime-profile.h"
+#include "util/sharded-query-map-util.h"
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
 #include "util/time.h"
@@ -1244,6 +1244,7 @@
   /// 3. Additional cleanup work is done by CloseClientRequestState(), and an entry
   ///    is added to 'query_log_' for this query.
   /// 4. The QueryDriver is removed from this map.
+  typedef ShardedQueryMap<std::shared_ptr<QueryDriver>> QueryDriverMap;
   QueryDriverMap query_driver_map_;
 
   /// Default query options in the form of TQueryOptions and beeswax::ConfigVariable
diff --git a/be/src/service/query-driver-map.cc b/be/src/service/query-driver-map.cc
deleted file mode 100644
index 14bf89f..0000000
--- a/be/src/service/query-driver-map.cc
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "service/query-driver-map.h"
-
-#include "gutil/strings/substitute.h"
-#include "util/container-util.h"
-#include "util/debug-util.h"
-#include "util/uid-util.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-Status QueryDriverMap::AddQueryDriver(
-    const TUniqueId& query_id, std::shared_ptr<QueryDriver> query_driver) {
-  ScopedShardedMapRef<std::shared_ptr<QueryDriver>> map_ref(query_id, this);
-  DCHECK(map_ref.get() != nullptr);
-
-  auto entry = map_ref->find(query_id);
-  if (entry != map_ref->end()) {
-    // There shouldn't be an active query with that same id.
-    // (query_id is globally unique)
-    return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
-        strings::Substitute("query id $0 already exists", PrintId(query_id))));
-  }
-  map_ref->insert(make_pair(query_id, query_driver));
-  return Status::OK();
-}
-
-Status QueryDriverMap::DeleteQueryDriver(const TUniqueId& query_id) {
-  ScopedShardedMapRef<std::shared_ptr<QueryDriver>> map_ref(query_id, this);
-  DCHECK(map_ref.get() != nullptr);
-  auto entry = map_ref->find(query_id);
-  if (entry == map_ref->end()) {
-    Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
-    VLOG(1) << err.GetDetail();
-    return err;
-  }
-  map_ref->erase(entry);
-  return Status::OK();
-}
-
-} // namespace impala
diff --git a/be/src/service/query-driver-map.h b/be/src/service/query-driver-map.h
deleted file mode 100644
index 572170a..0000000
--- a/be/src/service/query-driver-map.h
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/status.h"
-#include "util/sharded-query-map-util.h"
-#include "util/unique-id-hash.h"
-
-namespace impala {
-
-class QueryDriver;
-
-/// A ShardedQueryMap for QueryDrivers. Maps a query_id to its corresponding
-/// QueryDriver. Provides helper methods to easily add and delete
-/// QueryDrivers from a ShardedQueryMap. The QueryDrivers are non-const, so
-/// users of this class can synchronize access to the QueryDrivers by creating a
-/// ScopedShardedMapRef.
-class QueryDriverMap : public ShardedQueryMap<std::shared_ptr<QueryDriver>> {
- public:
-  /// Adds the given (query_id, query_driver) pair to the map. Returns an error Status
-  /// if the query id already exists in the map.
-  Status AddQueryDriver(
-      const TUniqueId& query_id, std::shared_ptr<QueryDriver> request_state);
-
-  /// Deletes the specified (query_id, query_driver) pair from the map. Returns an error
-  /// Status if the query_id cannot be found in the map.
-  Status DeleteQueryDriver(const TUniqueId& query_id);
-};
-}
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index b56d1a3..f04d1c1 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -84,6 +84,7 @@
   process-state-info.cc
   redactor.cc
   runtime-profile.cc
+  sharded-query-map-util.cc
   simple-logger.cc
   string-parser.cc
   string-util.cc
diff --git a/be/src/util/sharded-query-map-util.cc b/be/src/util/sharded-query-map-util.cc
new file mode 100644
index 0000000..5d5317e
--- /dev/null
+++ b/be/src/util/sharded-query-map-util.cc
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/sharded-query-map-util.h"
+
+#include "runtime/query-driver.h"
+#include "util/debug-util.h"
+
+namespace impala {
+
+template <typename K, typename V>
+Status GenericShardedQueryMap<K, V>::Add(const K& query_id, const V& obj) {
+  GenericScopedShardedMapRef<K, V> map_ref(query_id, this);
+  DCHECK(map_ref.get() != nullptr);
+
+  auto entry = map_ref->find(query_id);
+  if (entry != map_ref->end()) {
+    // There shouldn't be an active query with that same id.
+    // (query_id is globally unique)
+    return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
+        strings::Substitute("query id $0 already exists", PrintId(query_id))));
+  }
+  map_ref->insert(make_pair(query_id, obj));
+  return Status::OK();
+}
+
+template <typename K, typename V>
+Status GenericShardedQueryMap<K, V>::Get(const K& query_id, V* obj) {
+  GenericScopedShardedMapRef<K, V> map_ref(query_id, this);
+  DCHECK(map_ref.get() != nullptr);
+
+  auto entry = map_ref->find(query_id);
+  if (entry == map_ref->end()) {
+    Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
+    VLOG(1) << err.GetDetail();
+    return err;
+  }
+  *obj = entry->second;
+  return Status::OK();
+}
+
+template <typename K, typename V>
+Status GenericShardedQueryMap<K, V>::Delete(const K& query_id) {
+  GenericScopedShardedMapRef<K, V> map_ref(query_id, this);
+  DCHECK(map_ref.get() != nullptr);
+  auto entry = map_ref->find(query_id);
+  if (entry == map_ref->end()) {
+    Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
+    VLOG(1) << err.GetDetail();
+    return err;
+  }
+  map_ref->erase(entry);
+  return Status::OK();
+}
+
+// Needed by ImpalaServer
+template Status GenericShardedQueryMap<TUniqueId, std::shared_ptr<QueryDriver>>::Add(
+    TUniqueId const&, const std::shared_ptr<QueryDriver>&);
+template Status GenericShardedQueryMap<TUniqueId, std::shared_ptr<QueryDriver>>::Get(
+    TUniqueId const&, std::shared_ptr<QueryDriver>*);
+template Status GenericShardedQueryMap<TUniqueId, std::shared_ptr<QueryDriver>>::Delete(
+    TUniqueId const&);
+
+} // namespace impala
diff --git a/be/src/util/sharded-query-map-util.h b/be/src/util/sharded-query-map-util.h
index 5b8ae14..4e2836f 100644
--- a/be/src/util/sharded-query-map-util.h
+++ b/be/src/util/sharded-query-map-util.h
@@ -20,6 +20,7 @@
 #include <mutex>
 #include <unordered_map>
 
+#include "common/status.h"
 #include "gen-cpp/Types_types.h"
 #include "util/aligned-new.h"
 #include "util/spinlock.h"
@@ -27,25 +28,25 @@
 
 namespace impala {
 
-/// This is a template that can be used for any map that maps from a query ID (TUniqueId)
-/// to some object, and that needs to be sharded. It provides a SpinLock per shard to
-/// synchronize access to each shard of the map. The underlying shard is locked and
-/// accessed by instantiating a ScopedShardedMapRef.
+/// This is a template that can be used for any map that maps from a query ID (TUniqueId
+/// or UniqueIdPB) to some object, and that needs to be sharded. It provides a SpinLock
+/// per shard to synchronize access to each shard of the map. The underlying shard is
+/// locked and accessed by instantiating a GenericScopedShardedMapRef.
 //
 /// Usage pattern:
 //
 ///   typedef ShardedQueryMap<QueryState*> QueryStateMap;
 ///   QueryStateMap qs_map_;
 //
-template<typename T>
-class ShardedQueryMap {
+template <typename K, typename V>
+class GenericShardedQueryMap {
  public:
 
   // This function takes a lambda which should take a parameter of object 'T' and
   // runs the lambda for all the entries in the map. The lambda should have a return
   // type of 'void'..
   // TODO: If necessary, refactor the lambda signature to allow returning Status objects.
-  void DoFuncForAllEntries(const std::function<void(const T&)>& call) {
+  void DoFuncForAllEntries(const std::function<void(const V&)>& call) {
     for (int i = 0; i < NUM_QUERY_BUCKETS; ++i) {
       std::lock_guard<SpinLock> l(shards_[i].map_lock_);
       for (const auto& map_value_ref: shards_[i].map_) {
@@ -54,9 +55,19 @@
     }
   }
 
+  // Adds ('key', 'value') to the map, returning an error if 'key' already exists.
+  Status Add(const K& key, const V& value);
+
+  // Returns the value associated with 'key' in 'value', returning an error if 'key'
+  // doesn't exist.
+  Status Get(const K& key, V* value);
+
+  // Removes 'key' from the map, returning an error if 'key' doesn't exist.
+  Status Delete(const K& key);
+
  private:
-  template <typename T2>
-  friend class ScopedShardedMapRef;
+  template <typename K2, typename V2>
+  friend class GenericScopedShardedMapRef;
 
   // Number of buckets to split the containers of query IDs into.
   static constexpr uint32_t NUM_QUERY_BUCKETS = 4;
@@ -65,14 +76,20 @@
   // we will always access a map and its corresponding lock together, it's better if
   // they can be allocated on the same cache line.
   struct MapShard : public CacheLineAligned {
-    std::unordered_map<TUniqueId, T> map_;
+    std::unordered_map<K, V> map_;
     SpinLock map_lock_;
   };
   struct MapShard shards_[NUM_QUERY_BUCKETS];
 };
 
+template <typename T>
+class ShardedQueryMap : public GenericShardedQueryMap<TUniqueId, T> {};
+
+template <typename T>
+class ShardedQueryPBMap : public GenericShardedQueryMap<UniqueIdPB, T> {};
+
 /// Use this class to obtain a locked reference to the underlying map shard
-/// of a ShardedQueryMap, corresponding to the 'query_id'.
+/// of a GenericShardedQueryMap, corresponding to the 'query_id'.
 //
 /// Pattern:
 /// {
@@ -83,13 +100,13 @@
 //
 /// The caller should ensure that the lifetime of the ShardedQueryMap should be longer
 /// than the lifetime of this scoped class.
-template <typename T>
-class ScopedShardedMapRef {
+template <typename K, typename V>
+class GenericScopedShardedMapRef {
  public:
 
   // Finds the appropriate map that could/should contain 'query_id' and locks it.
-  ScopedShardedMapRef(
-      const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map) {
+  GenericScopedShardedMapRef(
+      const K& query_id, class GenericShardedQueryMap<K, V>* sharded_map) {
     DCHECK(sharded_map != nullptr);
     int qs_map_bucket = QueryIdToBucket(query_id);
     shard_ = &sharded_map->shards_[qs_map_bucket];
@@ -98,19 +115,19 @@
     shard_->map_lock_.lock();
   }
 
-  ~ScopedShardedMapRef() {
+  ~GenericScopedShardedMapRef() {
     shard_->map_lock_.DCheckLocked();
     shard_->map_lock_.unlock();
   }
 
   // Returns the shard (map) for the 'query_id' passed to the constructor.
   // Should never return nullptr.
-  std::unordered_map<TUniqueId, T>* get() {
+  std::unordered_map<K, V>* get() {
     shard_->map_lock_.DCheckLocked();
     return &shard_->map_;
   }
 
-  std::unordered_map<TUniqueId, T>* operator->() {
+  std::unordered_map<K, V>* operator->() {
     shard_->map_lock_.DCheckLocked();
     return get();
   }
@@ -119,14 +136,33 @@
 
   // Return the correct bucket that a query ID would belong to.
   inline int QueryIdToBucket(const TUniqueId& query_id) {
-    int bucket =
-        static_cast<int>(query_id.hi) % ShardedQueryMap<T>::NUM_QUERY_BUCKETS;
-    DCHECK(bucket < ShardedQueryMap<T>::NUM_QUERY_BUCKETS && bucket >= 0);
+    int bucket = static_cast<int>(query_id.hi) % ShardedQueryMap<V>::NUM_QUERY_BUCKETS;
+    DCHECK(bucket < ShardedQueryMap<V>::NUM_QUERY_BUCKETS && bucket >= 0);
     return bucket;
   }
 
-  typename ShardedQueryMap<T>::MapShard* shard_;
-  DISALLOW_COPY_AND_ASSIGN(ScopedShardedMapRef);
+  inline int QueryIdToBucket(const UniqueIdPB& query_id) {
+    int bucket = static_cast<int>(query_id.hi()) % ShardedQueryMap<V>::NUM_QUERY_BUCKETS;
+    DCHECK(bucket < ShardedQueryMap<V>::NUM_QUERY_BUCKETS && bucket >= 0);
+    return bucket;
+  }
+
+  typename GenericShardedQueryMap<K, V>::MapShard* shard_;
+  DISALLOW_COPY_AND_ASSIGN(GenericScopedShardedMapRef);
+};
+
+template <typename T>
+class ScopedShardedMapRef : public GenericScopedShardedMapRef<TUniqueId, T> {
+ public:
+  ScopedShardedMapRef(const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map)
+    : GenericScopedShardedMapRef<TUniqueId, T>(query_id, sharded_map) {}
+};
+
+template <typename T>
+class ScopedShardedMapPBRef : public GenericScopedShardedMapRef<UniqueIdPB, T> {
+ public:
+  ScopedShardedMapPBRef(const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map)
+    : GenericScopedShardedMapRef<TUniqueId, T>(query_id, sharded_map) {}
 };
 
 } // namespace impala
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index f8a7249..09b74af 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -106,7 +106,7 @@
   //    The code executing the debug action may respond to different error messages by
   //    exercising different error paths.
   // Examples:
-  // - CRS_BEFORE_ADMISSION:SLEEP@1000
+  // - AC_BEFORE_ADMISSION:SLEEP@1000
   //   Causes a 1 second sleep before queries are submitted to the admission controller.
   // - IMPALA_SERVICE_POOL:127.0.0.1:27002:TransmitData:FAIL@0.1@REJECT_TOO_BUSY
   //   Causes TransmitData rpcs to the third minicluster impalad to fail 10% of the time
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 39e690c..4edb18f 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -745,7 +745,7 @@
     impalad = self.cluster.impalads[0]
     client = impalad.service.create_beeswax_client()
     try:
-      client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
+      client.set_configuration_option("debug_action", "AC_BEFORE_ADMISSION:SLEEP@2000")
       client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1)
       handle = client.execute_async("select 1")
       sleep(1)
@@ -754,7 +754,7 @@
           "Ready to be Rejected but already cancelled, query id=")
       client.clear_configuration()
 
-      client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
+      client.set_configuration_option("debug_action", "AC_BEFORE_ADMISSION:SLEEP@2000")
       handle = client.execute_async("select 2")
       sleep(1)
       client.close_query(handle)
@@ -1239,7 +1239,7 @@
     profile = result.runtime_profile
     reasons = self.__extract_init_queue_reasons([profile])
     assert len(reasons) == 1
-    assert "Local backend has not started up yet." in reasons[0]
+    assert "Coordinator not registered with the statestore." in reasons[0]
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 566e8ec..95ee8df 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -300,7 +300,7 @@
     query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
-                       'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@18000'})
+                       'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
     self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 80)
 
     # Validate that the query was retried.
@@ -362,7 +362,7 @@
     query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
-                       'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@18000'})
+                       'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
     # Wait until the query fails.
     self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 140)
 
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index 90191ed..22bbbf9 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -374,7 +374,7 @@
     # the shutdown grace period expires. This demonstrates that queries don't get
     # cancelled if the cluster membership changes while they're waiting for admission.
     before_shutdown_admission_handle = self.execute_query_async(QUERY,
-        {'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@30000'})
+        {'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@30000'})
 
     # Shut down and wait for the shutdown state to propagate through statestore.
     result = self.execute_query_expect_success(self.client, SHUTDOWN_EXEC2)
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 9b16384..fe5d2f8 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -196,7 +196,7 @@
     """Test that the exec summary is populated correctly in every query state"""
     query = "select count(*) from functional.alltypes"
     handle = self.execute_query_async(query,
-        {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+        {"debug_action": "AC_BEFORE_ADMISSION:SLEEP@1000"})
     # If ExecuteStatement() has completed and the query is paused in the admission control
     # phase, then the coordinator has not started yet and exec_summary should be empty.
     exec_summary = self.client.get_exec_summary(handle)
@@ -218,7 +218,7 @@
     query state"""
     query = "select count(*) from functional.alltypes"
     handle = self.execute_query_async(query,
-        {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+        {"debug_action": "AC_BEFORE_ADMISSION:SLEEP@1000"})
 
     # If ExecuteStatement() has completed and the query is paused in the admission control
     # phase, then the coordinator has not started yet and exec_summary should be empty.
@@ -743,7 +743,7 @@
     """Tests that the query profile shows expected query states."""
     query = "select count(*) from functional.alltypes where bool_col = sleep(10)"
     handle = self.execute_query_async(query,
-        {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+        {"debug_action": "AC_BEFORE_ADMISSION:SLEEP@1000"})
     # If ExecuteStatement() has completed and the query is paused in the admission control
     # phase, then the query must be in COMPILED state.
     profile = self.client.get_runtime_profile(handle)