IMPALA-9930 (part 1): Initial refactor for admission control service
This patch contains the following refactors that are needed for the
admission control service, in order to make the main patch easier to
review:
- Adds a new class AdmissionControlClient which will be used to
abstract the logic for submitting queries to either a local or
remote admission controller out from ClientRequestState/Coordinator.
Currently only local submission is supported.
- SubmitForAdmission now takes a BackendId representing the
coordinator instead of assuming that the local impalad will be the
coordinator.
- The CRS_BEFORE_ADMISSION debug action is moved into
SubmitForAdmission() so that it will be executed on whichever daemon
is performing admission control rather than always on the
coordinator (needed for TestAdmissionController.test_cancellation).
- ShardedQueryMap is extended to allow keys to be either TUniqueId or
UniqueIdPB and Add(), Get(), and Delete() convenience functions are
added.
- Some utils related to seralizing Thrift objects into sidecars are
added.
Testing:
- Passed a run of existing core tests.
Change-Id: I7974a979cf05ed569f31e1ab20694e29fd3e4508
Reviewed-on: http://gerrit.cloudera.org:8080/16411
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/rpc/sidecar-util.h b/be/src/rpc/sidecar-util.h
new file mode 100644
index 0000000..47b15f6
--- /dev/null
+++ b/be/src/rpc/sidecar-util.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/faststring.h"
+#include "rpc/thrift-util.h"
+#include "util/kudu-status-util.h"
+
+DECLARE_int64(rpc_max_message_size);
+
+namespace impala {
+
+class KrpcSerializer {
+ public:
+ KrpcSerializer() : serializer_(/* compact */ true) {}
+
+ /// Serialize obj and set it as a sidecar on 'rpc_controller', returning the idx in
+ /// 'sidecar_idx'. The memory for the sidecar is owned by this object and must remain
+ /// valid until the rpc has completed.
+ template <class T>
+ Status SerializeToSidecar(
+ const T* obj, kudu::rpc::RpcController* rpc_controller, int* sidecar_idx) {
+ uint8_t* serialized_buf = nullptr;
+ uint32_t serialized_len = 0;
+ RETURN_IF_ERROR(serializer_.SerializeToBuffer(obj, &serialized_len, &serialized_buf));
+ std::unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
+ kudu::rpc::RpcSidecar::FromSlice(kudu::Slice(serialized_buf, serialized_len));
+ KUDU_RETURN_IF_ERROR(
+ rpc_controller->AddOutboundSidecar(move(rpc_sidecar), sidecar_idx),
+ "Failed to add sidecar");
+ return Status::OK();
+ }
+
+ private:
+ ThriftSerializer serializer_;
+};
+
+// Retrieves the sidecar at 'sidecar_idx' from 'rpc_context' and deserializes it into
+// 'thrift_obj'. 'rpc' can be either an RpcContext or an RpcController.
+template <typename RPC, typename T>
+Status GetSidecar(int sidecar_idx, RPC* rpc, T* thrift_obj) {
+ kudu::Slice sidecar_slice;
+ KUDU_RETURN_IF_ERROR(
+ rpc->GetInboundSidecar(sidecar_idx, &sidecar_slice), "Failed to get sidecar");
+ uint32_t len = sidecar_slice.size();
+ RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, thrift_obj));
+ return Status::OK();
+}
+
+// Serializes 'obj' and sets is as a sidecar on 'rpc' using a faststring, which transfers
+// ownership of the sidecar memory to the RPC layer. This introduces another copy that
+// wouldn't be necessary if using a Slice sidecar, but it's convenient for situations
+// where it would be tricky to manually ensure that the sidecar's memory is released after
+// the RPC has completed. 'rpc' may be either an RpcContext or an RpcController.
+//
+// If successful, the sidecar idx is returned in 'sidecar_idx'.
+template <typename RPC, typename T>
+Status SetFaststringSidecar(const T& obj, RPC* rpc, int* sidecar_idx) {
+ ThriftSerializer serializer(/* compact */ true);
+ uint8_t* serialized_buf = nullptr;
+ uint32_t serialized_len = 0;
+ RETURN_IF_ERROR(serializer.SerializeToBuffer(&obj, &serialized_len, &serialized_buf));
+ if (serialized_len > FLAGS_rpc_max_message_size) {
+ return Status("Serialized sidecar exceeds --rpc_max_message_size.");
+ }
+ std::unique_ptr<kudu::faststring> sidecar_str = std::make_unique<kudu::faststring>();
+ sidecar_str->assign_copy(serialized_buf, serialized_len);
+ std::unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
+ kudu::rpc::RpcSidecar::FromFaststring(std::move(sidecar_str));
+ RETURN_IF_ERROR(FromKuduStatus(
+ rpc->AddOutboundSidecar(move(rpc_sidecar), sidecar_idx), "Failed to add sidecar"));
+ return Status::OK();
+}
+
+} // namespace impala
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index fdf21d4..1b969a1 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -30,6 +30,7 @@
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "rpc/rpc-mgr.inline.h"
+#include "rpc/sidecar-util.h"
#include "runtime/backend-client.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator-filter-state.h"
@@ -268,38 +269,20 @@
exec_rpc_controller_.set_timeout(
MonoDelta::FromMilliseconds(FLAGS_backend_client_rpc_timeout_ms));
- // Serialize the sidecar and add it to the rpc controller. The serialized buffer is
- // owned by 'serializer' and is freed when it is destructed.
- ThriftSerializer serializer(true);
- uint8_t* serialized_buf = nullptr;
- uint32_t serialized_len = 0;
- Status serialize_status =
+ // Serialize the sidecar and add it to the rpc controller.
+ Status serialize_debug_status =
DebugAction(exec_params_.query_options(), "EXEC_SERIALIZE_FRAGMENT_INFO");
- if (LIKELY(serialize_status.ok())) {
- serialize_status =
- serializer.SerializeToBuffer(&fragment_info, &serialized_len, &serialized_buf);
- }
- if (UNLIKELY(!serialize_status.ok())) {
- SetExecError(serialize_status, exec_status_barrier);
- goto done;
- } else if (serialized_len > FLAGS_rpc_max_message_size) {
- SetExecError(
- Status::Expected("Serialized Exec() request exceeds --rpc_max_message_size."),
- exec_status_barrier);
+ if (UNLIKELY(!serialize_debug_status.ok())) {
+ SetExecError(serialize_debug_status, exec_status_barrier);
goto done;
}
- // TODO: eliminate the extra copy here by using a Slice
- unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
- sidecar_buf->assign_copy(serialized_buf, serialized_len);
- unique_ptr<RpcSidecar> rpc_sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
-
int sidecar_idx;
- kudu::Status sidecar_status =
- exec_rpc_controller_.AddOutboundSidecar(move(rpc_sidecar), &sidecar_idx);
- if (!sidecar_status.ok()) {
- SetExecError(
- FromKuduStatus(sidecar_status, "Failed to add sidecar"), exec_status_barrier);
+ // TODO: eliminate the extra copy here by using a Slice
+ Status sidecar_status =
+ SetFaststringSidecar(fragment_info, &exec_rpc_controller_, &sidecar_idx);
+ if (UNLIKELY(!sidecar_status.ok())) {
+ SetExecError(sidecar_status, exec_status_barrier);
goto done;
}
request.set_plan_fragment_info_sidecar_idx(sidecar_idx);
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7bf5ec7..8a90007 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -47,7 +47,7 @@
#include "runtime/query-driver.h"
#include "runtime/query-exec-mgr.h"
#include "runtime/query-state.h"
-#include "scheduling/admission-controller.h"
+#include "scheduling/admission-control-client.h"
#include "scheduling/scheduler.h"
#include "service/client-request-state.h"
#include "util/bit-util.h"
@@ -1270,24 +1270,24 @@
// AdmissionController::ReleaseQuery.
backend_released_barrier_.Wait();
LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
- AdmissionController* admission_controller =
- ExecEnv::GetInstance()->admission_controller();
- DCHECK(admission_controller != nullptr);
- admission_controller->ReleaseQuery(exec_params_.query_id(),
+ AdmissionControlClient* admission_control_client =
+ parent_request_state_->admission_control_client();
+ DCHECK(admission_control_client != nullptr);
+ admission_control_client->ReleaseQuery(
ComputeQueryResourceUtilization().peak_per_host_mem_consumption);
query_events_->MarkEvent("Released admission control resources");
}
void Coordinator::ReleaseBackendAdmissionControlResources(
const vector<BackendState*>& backend_states) {
- AdmissionController* admission_controller =
- ExecEnv::GetInstance()->admission_controller();
- DCHECK(admission_controller != nullptr);
+ AdmissionControlClient* admission_control_client =
+ parent_request_state_->admission_control_client();
+ DCHECK(admission_control_client != nullptr);
vector<NetworkAddressPB> host_addrs;
for (auto backend_state : backend_states) {
host_addrs.push_back(backend_state->impalad_address());
}
- admission_controller->ReleaseQueryBackends(exec_params_.query_id(), host_addrs);
+ admission_control_client->ReleaseQueryBackends(host_addrs);
}
Coordinator::ResourceUtilization Coordinator::ComputeQueryResourceUtilization() {
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index d178f43..604054c 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -367,27 +367,27 @@
return Status::OK();
}
-Status QueryDriver::Unregister(QueryDriverMap* query_driver_map) {
+Status QueryDriver::Unregister(ImpalaServer::QueryDriverMap* query_driver_map) {
DCHECK(finalized_.Load());
const TUniqueId* query_id = nullptr;
const TUniqueId* retry_query_id = nullptr;
{
// In order to preserve a consistent lock ordering, client_request_state_lock_ is
- // released before DeleteQueryDriver is called, as DeleteQueryDriver requires taking
+ // released before QueryDriverMap::Delete() is called, as Delete() requires taking
// a ScopedShardedMapRef (a sharded map lock). Methods in ImpalaServer (such as
// UnresponsiveBackendThread) require taking a ScopedShardedMapRef and then calling
// Get*ClientRequestState methods. So in order to define a consistent lock ordering
// (e.g. acquire ScopedShardedMapRef before client_request_state_lock_)
- // client_request_state_lock_ is released before calling DeleteQueryDriver.
+ // client_request_state_lock_ is released before calling Delete().
lock_guard<SpinLock> l(client_request_state_lock_);
query_id = &client_request_state_->query_id();
if (retried_client_request_state_ != nullptr) {
retry_query_id = &retried_client_request_state_->query_id();
}
}
- RETURN_IF_ERROR(query_driver_map->DeleteQueryDriver(*query_id));
+ RETURN_IF_ERROR(query_driver_map->Delete(*query_id));
if (retry_query_id != nullptr) {
- RETURN_IF_ERROR(query_driver_map->DeleteQueryDriver(*retry_query_id));
+ RETURN_IF_ERROR(query_driver_map->Delete(*retry_query_id));
}
return Status::OK();
}
diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h
index e785572..bc6264a 100644
--- a/be/src/runtime/query-driver.h
+++ b/be/src/runtime/query-driver.h
@@ -182,7 +182,7 @@
Status Finalize(QueryHandle* query_handle, bool check_inflight, const Status* cause);
/// Delete this query from the given QueryDriverMap.
- Status Unregister(QueryDriverMap* query_driver_map) WARN_UNUSED_RESULT;
+ Status Unregister(ImpalaServer::QueryDriverMap* query_driver_map) WARN_UNUSED_RESULT;
/// True if Finalize() was called.
bool finalized() const { return finalized_.Load(); }
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 4f1f1d3..90f5357 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -29,11 +29,13 @@
add_library(Scheduling STATIC
${ADMISSION_CONTROL_SERVICE_PROTO_SRCS}
admission-controller.cc
+ admission-control-client.cc
executor-blacklist.cc
cluster-membership-mgr.cc
cluster-membership-test-util.cc
executor-group.cc
hash-ring.cc
+ local-admission-control-client.cc
request-pool-service.cc
scheduler-test-util.cc
schedule-state.cc
diff --git a/be/src/scheduling/admission-control-client.cc b/be/src/scheduling/admission-control-client.cc
new file mode 100644
index 0000000..e5f4c51
--- /dev/null
+++ b/be/src/scheduling/admission-control-client.cc
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/admission-control-client.h"
+
+#include "common/names.h"
+#include "scheduling/local-admission-control-client.h"
+
+namespace impala {
+
+void AdmissionControlClient::Create(
+ const TUniqueId& query_id, unique_ptr<AdmissionControlClient>* client) {
+ client->reset(new LocalAdmissionControlClient(query_id));
+}
+
+} // namespace impala
diff --git a/be/src/scheduling/admission-control-client.h b/be/src/scheduling/admission-control-client.h
new file mode 100644
index 0000000..6d6dbb6
--- /dev/null
+++ b/be/src/scheduling/admission-control-client.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "gen-cpp/Types_types.h"
+#include "gen-cpp/admission_control_service.pb.h"
+#include "gen-cpp/common.pb.h"
+#include "scheduling/admission-controller.h"
+
+namespace impala {
+
+// Base class used to abstract out the logic for submitting queries to an admission
+// controller running locally or to one running remotely.
+class AdmissionControlClient {
+ public:
+ // Creates a new AdmissionControlClient and returns it in 'client'.
+ static void Create(
+ const TUniqueId& query_id, std::unique_ptr<AdmissionControlClient>* client);
+
+ virtual ~AdmissionControlClient() {}
+
+ // Called to schedule and admit the query. Blocks until an admission decision is made.
+ virtual Status SubmitForAdmission(const AdmissionController::AdmissionRequest& request,
+ std::unique_ptr<QuerySchedulePB>* schedule_result) = 0;
+
+ // Called when the query has completed to release all of its resources.
+ virtual void ReleaseQuery(int64_t peak_mem_consumption) = 0;
+
+ // Called with a list of backends the query has completed on, to release the resources
+ // for the query on those backends.
+ virtual void ReleaseQueryBackends(const std::vector<NetworkAddressPB>& host_addr) = 0;
+
+ // Called to cancel admission for the query.
+ virtual void CancelAdmission() = 0;
+};
+
+} // namespace impala
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 4f58495..593ba8b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -210,7 +210,8 @@
"THREAD_RESERVATION_AGGREGATE_LIMIT query option value: $1 > $2.";
// $0 is the error message returned by the scheduler.
const string REASON_SCHEDULER_ERROR = "Error during scheduling: $0";
-const string REASON_LOCAL_BACKEND_NOT_STARTED = "Local backend has not started up yet.";
+const string REASON_COORDINATOR_NOT_FOUND =
+ "Coordinator not registered with the statestore.";
const string REASON_NO_EXECUTOR_GROUPS =
"Waiting for executors to start. Only DDL queries and queries scheduled only on the "
"coordinator (either NUM_NODES set to 1 or when small query optimization is "
@@ -1114,6 +1115,7 @@
Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
unique_ptr<QuerySchedulePB>* schedule_result) {
+ DebugActionNoFail(request.query_options, "AC_BEFORE_ADMISSION");
DCHECK(schedule_result->get() == nullptr);
ClusterMembershipMgr::SnapshotPtr membership_snapshot =
@@ -1519,15 +1521,19 @@
std::vector<GroupScheduleState>* output_schedules = &queue_node->group_states;
output_schedules->clear();
- // If the first statestore update arrives before the local backend has finished starting
- // up, we might not have a local backend descriptor yet. We return no schedules, which
- // will result in the query being queued.
- if (membership_snapshot->local_be_desc == nullptr) {
- queue_node->not_admitted_reason = REASON_LOCAL_BACKEND_NOT_STARTED;
+ // Queries may arrive before we've gotten a statestore update containing the descriptor
+ // for their coordinator, in which case we queue the query until it arrives. It's also
+ // possible (though very unlikely) that the coordinator was removed from the cluster
+ // membership after submitting this query for admission. Currently in this case the
+ // query will remain queued until it times out, but we can consider detecting failed
+ // coordinators and cleaning up their queued queries.
+ auto it = membership_snapshot->current_backends.find(PrintId(request.coord_id));
+ if (it == membership_snapshot->current_backends.end()) {
+ queue_node->not_admitted_reason = REASON_COORDINATOR_NOT_FOUND;
LOG(WARNING) << queue_node->not_admitted_reason;
return Status::OK();
}
- const BackendDescriptorPB& local_be_desc = *membership_snapshot->local_be_desc;
+ const BackendDescriptorPB& coord_desc = it->second;
vector<const ExecutorGroup*> executor_groups =
GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request);
@@ -1568,7 +1574,7 @@
const string& group_name = executor_group->name();
VLOG(3) << "Scheduling for executor group: " << group_name << " with "
<< executor_group->NumExecutors() << " executors";
- const Scheduler::ExecutorConfig group_config = {*executor_group, local_be_desc};
+ const Scheduler::ExecutorConfig group_config = {*executor_group, coord_desc};
RETURN_IF_ERROR(
ExecEnv::GetInstance()->scheduler()->Schedule(group_config, group_state.get()));
DCHECK(!group_state->executor_group().empty());
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index a9c3805..8c20fcf 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -317,6 +317,7 @@
/// are owned by the ClientRequestState).
struct AdmissionRequest {
const UniqueIdPB& query_id;
+ const UniqueIdPB& coord_id;
const TQueryExecRequest& request;
const TQueryOptions& query_options;
RuntimeProfile* summary_profile;
diff --git a/be/src/scheduling/local-admission-control-client.cc b/be/src/scheduling/local-admission-control-client.cc
new file mode 100644
index 0000000..0f785da
--- /dev/null
+++ b/be/src/scheduling/local-admission-control-client.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/local-admission-control-client.h"
+
+#include "runtime/exec-env.h"
+#include "util/uid-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+LocalAdmissionControlClient::LocalAdmissionControlClient(const TUniqueId& query_id) {
+ TUniqueIdToUniqueIdPB(query_id, &query_id_);
+}
+
+Status LocalAdmissionControlClient::SubmitForAdmission(
+ const AdmissionController::AdmissionRequest& request,
+ std::unique_ptr<QuerySchedulePB>* schedule_result) {
+ return ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
+ request, &admit_outcome_, schedule_result);
+}
+
+void LocalAdmissionControlClient::ReleaseQuery(int64_t peak_mem_consumption) {
+ ExecEnv::GetInstance()->admission_controller()->ReleaseQuery(
+ query_id_, peak_mem_consumption);
+}
+
+void LocalAdmissionControlClient::ReleaseQueryBackends(
+ const vector<NetworkAddressPB>& host_addrs) {
+ ExecEnv::GetInstance()->admission_controller()->ReleaseQueryBackends(
+ query_id_, host_addrs);
+}
+
+void LocalAdmissionControlClient::CancelAdmission() {
+ admit_outcome_.Set(AdmissionOutcome::CANCELLED);
+}
+
+} // namespace impala
diff --git a/be/src/scheduling/local-admission-control-client.h b/be/src/scheduling/local-admission-control-client.h
new file mode 100644
index 0000000..8f3da3c
--- /dev/null
+++ b/be/src/scheduling/local-admission-control-client.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "common/status.h"
+#include "gen-cpp/Types_types.h"
+#include "gen-cpp/admission_control_service.pb.h"
+#include "gen-cpp/common.pb.h"
+#include "scheduling/admission-control-client.h"
+#include "scheduling/admission-controller.h"
+
+namespace impala {
+
+/// Implementation of AdmissionControlClient used to submit queries for admission to an
+/// AdmissionController running locally on the coordinator.
+class LocalAdmissionControlClient : public AdmissionControlClient {
+ public:
+ LocalAdmissionControlClient(const TUniqueId& query_id);
+
+ virtual Status SubmitForAdmission(const AdmissionController::AdmissionRequest& request,
+ std::unique_ptr<QuerySchedulePB>* schedule_result) override;
+ virtual void ReleaseQuery(int64_t peak_mem_consumption) override;
+ virtual void ReleaseQueryBackends(
+ const std::vector<NetworkAddressPB>& host_addr) override;
+ virtual void CancelAdmission() override;
+
+ private:
+ // The id of the query being considered for admission.
+ UniqueIdPB query_id_;
+
+ /// Promise used by the admission controller. AdmissionController:SubmitForAdmission()
+ /// will block on this promise until the query is either rejected, admitted, times out,
+ /// or is cancelled. Can be set to CANCELLED by CancelAdmission() in order to cancel,
+ /// but otherwise is set by AdmissionController with the admission decision.
+ Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER> admit_outcome_;
+};
+
+} // namespace impala
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index ec79bd5..c28dda3 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -90,11 +90,11 @@
const ExecutorConfig& executor_config, const NetworkAddressPB& host) {
const BackendDescriptorPB* desc = executor_config.group.LookUpBackendDesc(host);
if (desc == nullptr) {
- // Local host may not be in executor_config's executor group if it's a dedicated
+ // Coordinator host may not be in executor_config's executor group if it's a dedicated
// coordinator, or if it is configured to be in a different executor group.
- const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
- DCHECK(host == local_be_desc.address());
- desc = &local_be_desc;
+ const BackendDescriptorPB& coord_desc = executor_config.coord_desc;
+ DCHECK(host == coord_desc.address());
+ desc = &coord_desc;
}
return *desc;
}
@@ -286,10 +286,10 @@
if (fragment_state->is_coord_fragment || executor_config.group.NumExecutors() == 0) {
// The coordinator fragment must be scheduled on the coordinator. Otherwise if
// no executors are available, we need to schedule on the coordinator.
- const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
- host = local_be_desc.address();
- DCHECK(local_be_desc.has_krpc_address());
- krpc_host = local_be_desc.krpc_address();
+ const BackendDescriptorPB& coord_desc = executor_config.coord_desc;
+ host = coord_desc.address();
+ DCHECK(coord_desc.has_krpc_address());
+ krpc_host = coord_desc.krpc_address();
} else if (fragment_state->exchange_input_fragments.size() > 0) {
// Interior unpartitioned fragments can be scheduled on an arbitrary executor.
// Pick a random instance from the first input fragment.
@@ -410,8 +410,7 @@
DCHECK(has_union || scan_node_ids.size() == 1) << "This method may need revisiting "
<< "for plans with no union and multiple scans per fragment";
vector<NetworkAddressPB> scan_hosts;
- GetScanHosts(
- executor_config.local_be_desc, scan_node_ids, *fragment_state, &scan_hosts);
+ GetScanHosts(executor_config.coord_desc, scan_node_ids, *fragment_state, &scan_hosts);
for (const NetworkAddressPB& host_addr : scan_hosts) {
// Ensure that the num instances is at least as many as input fragments. We don't
// want to increment if there were already some instances from the input fragment,
@@ -625,8 +624,8 @@
// TODO: Either get this from the ExecutorConfig or modify the AssignmentCtx interface
// to handle this case.
ExecutorGroup coord_only_executor_group("coordinator-only-group");
- const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
- coord_only_executor_group.AddExecutor(local_be_desc);
+ const BackendDescriptorPB& coord_desc = executor_config.coord_desc;
+ coord_only_executor_group.AddExecutor(coord_desc);
VLOG_ROW << "Exec at coord is " << (exec_at_coord ? "true" : "false");
AssignmentCtx assignment_ctx(exec_at_coord ? coord_only_executor_group : executor_group,
total_assignments_, total_local_assignments_, rng);
@@ -642,9 +641,9 @@
// Select executor for the current scan range.
if (exec_at_coord) {
DCHECK(assignment_ctx.executor_group().LookUpExecutorIp(
- local_be_desc.address().hostname(), nullptr));
- assignment_ctx.RecordScanRangeAssignment(local_be_desc, node_id, host_list,
- scan_range_locations, assignment);
+ coord_desc.address().hostname(), nullptr));
+ assignment_ctx.RecordScanRangeAssignment(
+ coord_desc, node_id, host_list, scan_range_locations, assignment);
} else {
// Collect executor candidates with smallest memory distance.
vector<IpAddr> executor_candidates;
@@ -786,7 +785,7 @@
return FindNodes(plan, SCAN_NODE_TYPES);
}
-void Scheduler::GetScanHosts(const BackendDescriptorPB& local_be_desc,
+void Scheduler::GetScanHosts(const BackendDescriptorPB& coord_desc,
const vector<TPlanNodeId>& scan_ids, const FragmentScheduleState& fragment_state,
vector<NetworkAddressPB>* scan_hosts) {
for (const TPlanNodeId& scan_id : scan_ids) {
@@ -804,7 +803,7 @@
// TODO: we'll need to revisit this strategy once we can partition joins
// (in which case this fragment might be executing a right outer join
// with a large build table)
- scan_hosts->push_back(local_be_desc.address());
+ scan_hosts->push_back(coord_desc.address());
}
}
}
@@ -890,7 +889,7 @@
// This also ensures an entry always exists for the coordinator backend.
int64_t coord_min_reservation = 0;
- const NetworkAddressPB& coord_addr = executor_config.local_be_desc.address();
+ const NetworkAddressPB& coord_addr = executor_config.coord_desc.address();
BackendScheduleState& coord_be_state =
state->GetOrCreateBackendScheduleState(coord_addr);
coord_be_state.exec_params->set_is_coord_backend(true);
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index ccf8948..a9093cb 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -65,7 +65,7 @@
/// Current snapshot of executors to be used for scheduling a scan.
struct ExecutorConfig {
const ExecutorGroup& group;
- const BackendDescriptorPB& local_be_desc;
+ const BackendDescriptorPB& coord_desc;
};
/// Populates given query schedule and assigns fragments to hosts based on scan
@@ -415,7 +415,7 @@
/// Add all hosts that the scans identified by 'scan_ids' are executed on to
/// 'scan_hosts'.
- void GetScanHosts(const BackendDescriptorPB& local_be_desc,
+ void GetScanHosts(const BackendDescriptorPB& coord_desc,
const std::vector<TPlanNodeId>& scan_ids,
const FragmentScheduleState& fragment_state,
std::vector<NetworkAddressPB>* scan_hosts);
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index e140b61..8f030ad 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -41,7 +41,6 @@
impala-internal-service.cc
impalad-main.cc
impala-server.cc
- query-driver-map.cc
query-options.cc
query-result-set.cc
)
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 84c250b..931f865 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -39,7 +39,7 @@
#include "runtime/query-driver.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
-#include "scheduling/admission-controller.h"
+#include "scheduling/admission-control-client.h"
#include "scheduling/scheduler.h"
#include "service/frontend.h"
#include "service/impala-server.h"
@@ -161,6 +161,8 @@
TNetworkAddressToString(ExecEnv::GetInstance()->GetThriftBackendAddress()));
summary_profile_->AddChild(frontend_profile_);
+
+ AdmissionControlClient::Create(query_ctx.query_id, &admission_control_client_);
}
ClientRequestState::~ClientRequestState() {
@@ -539,17 +541,14 @@
}
void ClientRequestState::FinishExecQueryOrDmlRequest() {
- DebugActionNoFail(exec_request_->query_options, "CRS_BEFORE_ADMISSION");
-
- DCHECK(ExecEnv::GetInstance()->admission_controller() != nullptr);
DCHECK(exec_request_->__isset.query_exec_request);
UniqueIdPB query_id_pb;
TUniqueIdToUniqueIdPB(query_id(), &query_id_pb);
- Status admit_status =
- ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
- {query_id_pb, exec_request_->query_exec_request, exec_request_->query_options,
- summary_profile_, query_events_, blacklisted_executor_addresses_},
- &admit_outcome_, &schedule_);
+ Status admit_status = admission_control_client_->SubmitForAdmission(
+ {query_id_pb, ExecEnv::GetInstance()->backend_id(),
+ exec_request_->query_exec_request, exec_request_->query_options,
+ summary_profile_, query_events_, blacklisted_executor_addresses_},
+ &schedule_);
{
lock_guard<mutex> l(lock_);
if (!UpdateQueryStatus(admit_status).ok()) return;
@@ -1189,7 +1188,7 @@
|| retry_state() == RetryState::RETRYING);
}
- admit_outcome_.Set(AdmissionOutcome::CANCELLED);
+ admission_control_client_->CancelAdmission();
is_cancelled_ = true;
} // Release lock_ before doing cancellation work.
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 9cbbd2b..0c7bb6a 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -34,6 +34,7 @@
namespace impala {
+class AdmissionControlClient;
class ClientRequestStateCleaner;
class Coordinator;
class Expr;
@@ -44,7 +45,6 @@
class Thread;
class TRuntimeProfileTree;
class TupleRow;
-enum class AdmissionOutcome;
class QuerySchedulePB;
/// Execution state of the client-facing side of a query. This captures everything
@@ -390,6 +390,10 @@
/// Returns the QueryDriver that owns this ClientRequestState.
QueryDriver* parent_driver() const { return parent_driver_; }
+ AdmissionControlClient* admission_control_client() const {
+ return admission_control_client_.get();
+ }
+
/// Returns true if results cacheing is enabled, false otherwise.
bool IsResultCacheingEnabled() const { return result_cache_max_size_ >= 0; }
@@ -454,12 +458,6 @@
/// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
- /// Promise used by the admission controller. AdmissionController:AdmitQuery() will
- /// block on this promise until the query is either rejected, admitted, times out, or is
- /// cancelled. Can be set to CANCELLED by the ClientRequestState in order to cancel, but
- /// otherwise is set by AdmissionController with the admission decision.
- Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER> admit_outcome_;
-
/// Protects all following fields. Acquirers should be careful not to hold it for too
/// long, e.g. during RPCs because this lock is required to make progress on various
/// ImpalaServer requests. If held for too long it can block progress of client
@@ -650,6 +648,10 @@
/// constructor. It always outlives the ClientRequestState.
QueryDriver* parent_driver_;
+ /// Manages interactions with the AdmissionController, which may be either local or
+ /// remote.
+ std::unique_ptr<AdmissionControlClient> admission_control_client_;
+
/// Executes a local catalog operation (an operation that does not need to execute
/// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op) WARN_UNUSED_RESULT;
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index 33a3f84..34c3800 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -24,11 +24,12 @@
#include "kudu/rpc/rpc_controller.h"
#include "rpc/rpc-mgr.h"
#include "rpc/rpc-mgr.inline.h"
+#include "rpc/sidecar-util.h"
#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
-#include "runtime/query-exec-mgr.h"
#include "runtime/query-driver.h"
+#include "runtime/query-exec-mgr.h"
#include "runtime/query-state.h"
#include "service/client-request-state.h"
#include "service/impala-server.h"
@@ -114,18 +115,6 @@
return Status::OK();
}
-// Retrieves the sidecar at 'sidecar_idx' from 'rpc_context' and deserializes it into
-// 'thrift_obj'.
-template <typename T>
-static Status GetSidecar(int sidecar_idx, RpcContext* rpc_context, T* thrift_obj) {
- kudu::Slice sidecar_slice;
- KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(sidecar_idx, &sidecar_slice),
- "Failed to get sidecar");
- uint32_t len = sidecar_slice.size();
- RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, thrift_obj));
- return Status::OK();
-}
-
void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request,
ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
DebugActionNoFail(FLAGS_debug_actions, "EXEC_QUERY_FINSTANCES_DELAY");
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index b8ce8ac..3c75851 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1215,8 +1215,7 @@
return Status::Expected("Session has been closed, ignoring query.");
}
DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
- RETURN_IF_ERROR(
- query_driver_map_.AddQueryDriver(query_id, query_handle->query_driver()));
+ RETURN_IF_ERROR(query_driver_map_.Add(query_id, query_handle->query_driver()));
// Metric is decremented in UnregisterQuery().
ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(1L);
VLOG_QUERY << "Registered query query_id=" << PrintId(query_id) << " session_id="
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8408c90..52e0a85 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -37,12 +37,12 @@
#include "kudu/util/random.h"
#include "rpc/thrift-server.h"
#include "runtime/types.h"
-#include "service/query-driver-map.h"
#include "service/query-options.h"
#include "statestore/statestore-subscriber.h"
#include "util/condition-variable.h"
#include "util/container-util.h"
#include "util/runtime-profile.h"
+#include "util/sharded-query-map-util.h"
#include "util/simple-logger.h"
#include "util/thread-pool.h"
#include "util/time.h"
@@ -1244,6 +1244,7 @@
/// 3. Additional cleanup work is done by CloseClientRequestState(), and an entry
/// is added to 'query_log_' for this query.
/// 4. The QueryDriver is removed from this map.
+ typedef ShardedQueryMap<std::shared_ptr<QueryDriver>> QueryDriverMap;
QueryDriverMap query_driver_map_;
/// Default query options in the form of TQueryOptions and beeswax::ConfigVariable
diff --git a/be/src/service/query-driver-map.cc b/be/src/service/query-driver-map.cc
deleted file mode 100644
index 14bf89f..0000000
--- a/be/src/service/query-driver-map.cc
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "service/query-driver-map.h"
-
-#include "gutil/strings/substitute.h"
-#include "util/container-util.h"
-#include "util/debug-util.h"
-#include "util/uid-util.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-Status QueryDriverMap::AddQueryDriver(
- const TUniqueId& query_id, std::shared_ptr<QueryDriver> query_driver) {
- ScopedShardedMapRef<std::shared_ptr<QueryDriver>> map_ref(query_id, this);
- DCHECK(map_ref.get() != nullptr);
-
- auto entry = map_ref->find(query_id);
- if (entry != map_ref->end()) {
- // There shouldn't be an active query with that same id.
- // (query_id is globally unique)
- return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
- strings::Substitute("query id $0 already exists", PrintId(query_id))));
- }
- map_ref->insert(make_pair(query_id, query_driver));
- return Status::OK();
-}
-
-Status QueryDriverMap::DeleteQueryDriver(const TUniqueId& query_id) {
- ScopedShardedMapRef<std::shared_ptr<QueryDriver>> map_ref(query_id, this);
- DCHECK(map_ref.get() != nullptr);
- auto entry = map_ref->find(query_id);
- if (entry == map_ref->end()) {
- Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
- VLOG(1) << err.GetDetail();
- return err;
- }
- map_ref->erase(entry);
- return Status::OK();
-}
-
-} // namespace impala
diff --git a/be/src/service/query-driver-map.h b/be/src/service/query-driver-map.h
deleted file mode 100644
index 572170a..0000000
--- a/be/src/service/query-driver-map.h
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/status.h"
-#include "util/sharded-query-map-util.h"
-#include "util/unique-id-hash.h"
-
-namespace impala {
-
-class QueryDriver;
-
-/// A ShardedQueryMap for QueryDrivers. Maps a query_id to its corresponding
-/// QueryDriver. Provides helper methods to easily add and delete
-/// QueryDrivers from a ShardedQueryMap. The QueryDrivers are non-const, so
-/// users of this class can synchronize access to the QueryDrivers by creating a
-/// ScopedShardedMapRef.
-class QueryDriverMap : public ShardedQueryMap<std::shared_ptr<QueryDriver>> {
- public:
- /// Adds the given (query_id, query_driver) pair to the map. Returns an error Status
- /// if the query id already exists in the map.
- Status AddQueryDriver(
- const TUniqueId& query_id, std::shared_ptr<QueryDriver> request_state);
-
- /// Deletes the specified (query_id, query_driver) pair from the map. Returns an error
- /// Status if the query_id cannot be found in the map.
- Status DeleteQueryDriver(const TUniqueId& query_id);
-};
-}
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index b56d1a3..f04d1c1 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -84,6 +84,7 @@
process-state-info.cc
redactor.cc
runtime-profile.cc
+ sharded-query-map-util.cc
simple-logger.cc
string-parser.cc
string-util.cc
diff --git a/be/src/util/sharded-query-map-util.cc b/be/src/util/sharded-query-map-util.cc
new file mode 100644
index 0000000..5d5317e
--- /dev/null
+++ b/be/src/util/sharded-query-map-util.cc
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/sharded-query-map-util.h"
+
+#include "runtime/query-driver.h"
+#include "util/debug-util.h"
+
+namespace impala {
+
+template <typename K, typename V>
+Status GenericShardedQueryMap<K, V>::Add(const K& query_id, const V& obj) {
+ GenericScopedShardedMapRef<K, V> map_ref(query_id, this);
+ DCHECK(map_ref.get() != nullptr);
+
+ auto entry = map_ref->find(query_id);
+ if (entry != map_ref->end()) {
+ // There shouldn't be an active query with that same id.
+ // (query_id is globally unique)
+ return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
+ strings::Substitute("query id $0 already exists", PrintId(query_id))));
+ }
+ map_ref->insert(make_pair(query_id, obj));
+ return Status::OK();
+}
+
+template <typename K, typename V>
+Status GenericShardedQueryMap<K, V>::Get(const K& query_id, V* obj) {
+ GenericScopedShardedMapRef<K, V> map_ref(query_id, this);
+ DCHECK(map_ref.get() != nullptr);
+
+ auto entry = map_ref->find(query_id);
+ if (entry == map_ref->end()) {
+ Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
+ VLOG(1) << err.GetDetail();
+ return err;
+ }
+ *obj = entry->second;
+ return Status::OK();
+}
+
+template <typename K, typename V>
+Status GenericShardedQueryMap<K, V>::Delete(const K& query_id) {
+ GenericScopedShardedMapRef<K, V> map_ref(query_id, this);
+ DCHECK(map_ref.get() != nullptr);
+ auto entry = map_ref->find(query_id);
+ if (entry == map_ref->end()) {
+ Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
+ VLOG(1) << err.GetDetail();
+ return err;
+ }
+ map_ref->erase(entry);
+ return Status::OK();
+}
+
+// Needed by ImpalaServer
+template Status GenericShardedQueryMap<TUniqueId, std::shared_ptr<QueryDriver>>::Add(
+ TUniqueId const&, const std::shared_ptr<QueryDriver>&);
+template Status GenericShardedQueryMap<TUniqueId, std::shared_ptr<QueryDriver>>::Get(
+ TUniqueId const&, std::shared_ptr<QueryDriver>*);
+template Status GenericShardedQueryMap<TUniqueId, std::shared_ptr<QueryDriver>>::Delete(
+ TUniqueId const&);
+
+} // namespace impala
diff --git a/be/src/util/sharded-query-map-util.h b/be/src/util/sharded-query-map-util.h
index 5b8ae14..4e2836f 100644
--- a/be/src/util/sharded-query-map-util.h
+++ b/be/src/util/sharded-query-map-util.h
@@ -20,6 +20,7 @@
#include <mutex>
#include <unordered_map>
+#include "common/status.h"
#include "gen-cpp/Types_types.h"
#include "util/aligned-new.h"
#include "util/spinlock.h"
@@ -27,25 +28,25 @@
namespace impala {
-/// This is a template that can be used for any map that maps from a query ID (TUniqueId)
-/// to some object, and that needs to be sharded. It provides a SpinLock per shard to
-/// synchronize access to each shard of the map. The underlying shard is locked and
-/// accessed by instantiating a ScopedShardedMapRef.
+/// This is a template that can be used for any map that maps from a query ID (TUniqueId
+/// or UniqueIdPB) to some object, and that needs to be sharded. It provides a SpinLock
+/// per shard to synchronize access to each shard of the map. The underlying shard is
+/// locked and accessed by instantiating a GenericScopedShardedMapRef.
//
/// Usage pattern:
//
/// typedef ShardedQueryMap<QueryState*> QueryStateMap;
/// QueryStateMap qs_map_;
//
-template<typename T>
-class ShardedQueryMap {
+template <typename K, typename V>
+class GenericShardedQueryMap {
public:
// This function takes a lambda which should take a parameter of object 'T' and
// runs the lambda for all the entries in the map. The lambda should have a return
// type of 'void'..
// TODO: If necessary, refactor the lambda signature to allow returning Status objects.
- void DoFuncForAllEntries(const std::function<void(const T&)>& call) {
+ void DoFuncForAllEntries(const std::function<void(const V&)>& call) {
for (int i = 0; i < NUM_QUERY_BUCKETS; ++i) {
std::lock_guard<SpinLock> l(shards_[i].map_lock_);
for (const auto& map_value_ref: shards_[i].map_) {
@@ -54,9 +55,19 @@
}
}
+ // Adds ('key', 'value') to the map, returning an error if 'key' already exists.
+ Status Add(const K& key, const V& value);
+
+ // Returns the value associated with 'key' in 'value', returning an error if 'key'
+ // doesn't exist.
+ Status Get(const K& key, V* value);
+
+ // Removes 'key' from the map, returning an error if 'key' doesn't exist.
+ Status Delete(const K& key);
+
private:
- template <typename T2>
- friend class ScopedShardedMapRef;
+ template <typename K2, typename V2>
+ friend class GenericScopedShardedMapRef;
// Number of buckets to split the containers of query IDs into.
static constexpr uint32_t NUM_QUERY_BUCKETS = 4;
@@ -65,14 +76,20 @@
// we will always access a map and its corresponding lock together, it's better if
// they can be allocated on the same cache line.
struct MapShard : public CacheLineAligned {
- std::unordered_map<TUniqueId, T> map_;
+ std::unordered_map<K, V> map_;
SpinLock map_lock_;
};
struct MapShard shards_[NUM_QUERY_BUCKETS];
};
+template <typename T>
+class ShardedQueryMap : public GenericShardedQueryMap<TUniqueId, T> {};
+
+template <typename T>
+class ShardedQueryPBMap : public GenericShardedQueryMap<UniqueIdPB, T> {};
+
/// Use this class to obtain a locked reference to the underlying map shard
-/// of a ShardedQueryMap, corresponding to the 'query_id'.
+/// of a GenericShardedQueryMap, corresponding to the 'query_id'.
//
/// Pattern:
/// {
@@ -83,13 +100,13 @@
//
/// The caller should ensure that the lifetime of the ShardedQueryMap should be longer
/// than the lifetime of this scoped class.
-template <typename T>
-class ScopedShardedMapRef {
+template <typename K, typename V>
+class GenericScopedShardedMapRef {
public:
// Finds the appropriate map that could/should contain 'query_id' and locks it.
- ScopedShardedMapRef(
- const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map) {
+ GenericScopedShardedMapRef(
+ const K& query_id, class GenericShardedQueryMap<K, V>* sharded_map) {
DCHECK(sharded_map != nullptr);
int qs_map_bucket = QueryIdToBucket(query_id);
shard_ = &sharded_map->shards_[qs_map_bucket];
@@ -98,19 +115,19 @@
shard_->map_lock_.lock();
}
- ~ScopedShardedMapRef() {
+ ~GenericScopedShardedMapRef() {
shard_->map_lock_.DCheckLocked();
shard_->map_lock_.unlock();
}
// Returns the shard (map) for the 'query_id' passed to the constructor.
// Should never return nullptr.
- std::unordered_map<TUniqueId, T>* get() {
+ std::unordered_map<K, V>* get() {
shard_->map_lock_.DCheckLocked();
return &shard_->map_;
}
- std::unordered_map<TUniqueId, T>* operator->() {
+ std::unordered_map<K, V>* operator->() {
shard_->map_lock_.DCheckLocked();
return get();
}
@@ -119,14 +136,33 @@
// Return the correct bucket that a query ID would belong to.
inline int QueryIdToBucket(const TUniqueId& query_id) {
- int bucket =
- static_cast<int>(query_id.hi) % ShardedQueryMap<T>::NUM_QUERY_BUCKETS;
- DCHECK(bucket < ShardedQueryMap<T>::NUM_QUERY_BUCKETS && bucket >= 0);
+ int bucket = static_cast<int>(query_id.hi) % ShardedQueryMap<V>::NUM_QUERY_BUCKETS;
+ DCHECK(bucket < ShardedQueryMap<V>::NUM_QUERY_BUCKETS && bucket >= 0);
return bucket;
}
- typename ShardedQueryMap<T>::MapShard* shard_;
- DISALLOW_COPY_AND_ASSIGN(ScopedShardedMapRef);
+ inline int QueryIdToBucket(const UniqueIdPB& query_id) {
+ int bucket = static_cast<int>(query_id.hi()) % ShardedQueryMap<V>::NUM_QUERY_BUCKETS;
+ DCHECK(bucket < ShardedQueryMap<V>::NUM_QUERY_BUCKETS && bucket >= 0);
+ return bucket;
+ }
+
+ typename GenericShardedQueryMap<K, V>::MapShard* shard_;
+ DISALLOW_COPY_AND_ASSIGN(GenericScopedShardedMapRef);
+};
+
+template <typename T>
+class ScopedShardedMapRef : public GenericScopedShardedMapRef<TUniqueId, T> {
+ public:
+ ScopedShardedMapRef(const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map)
+ : GenericScopedShardedMapRef<TUniqueId, T>(query_id, sharded_map) {}
+};
+
+template <typename T>
+class ScopedShardedMapPBRef : public GenericScopedShardedMapRef<UniqueIdPB, T> {
+ public:
+ ScopedShardedMapPBRef(const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map)
+ : GenericScopedShardedMapRef<TUniqueId, T>(query_id, sharded_map) {}
};
} // namespace impala
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index f8a7249..09b74af 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -106,7 +106,7 @@
// The code executing the debug action may respond to different error messages by
// exercising different error paths.
// Examples:
- // - CRS_BEFORE_ADMISSION:SLEEP@1000
+ // - AC_BEFORE_ADMISSION:SLEEP@1000
// Causes a 1 second sleep before queries are submitted to the admission controller.
// - IMPALA_SERVICE_POOL:127.0.0.1:27002:TransmitData:FAIL@0.1@REJECT_TOO_BUSY
// Causes TransmitData rpcs to the third minicluster impalad to fail 10% of the time
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 39e690c..4edb18f 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -745,7 +745,7 @@
impalad = self.cluster.impalads[0]
client = impalad.service.create_beeswax_client()
try:
- client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
+ client.set_configuration_option("debug_action", "AC_BEFORE_ADMISSION:SLEEP@2000")
client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1)
handle = client.execute_async("select 1")
sleep(1)
@@ -754,7 +754,7 @@
"Ready to be Rejected but already cancelled, query id=")
client.clear_configuration()
- client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
+ client.set_configuration_option("debug_action", "AC_BEFORE_ADMISSION:SLEEP@2000")
handle = client.execute_async("select 2")
sleep(1)
client.close_query(handle)
@@ -1239,7 +1239,7 @@
profile = result.runtime_profile
reasons = self.__extract_init_queue_reasons([profile])
assert len(reasons) == 1
- assert "Local backend has not started up yet." in reasons[0]
+ assert "Coordinator not registered with the statestore." in reasons[0]
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 566e8ec..95ee8df 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -300,7 +300,7 @@
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
- 'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@18000'})
+ 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 80)
# Validate that the query was retried.
@@ -362,7 +362,7 @@
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
- 'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@18000'})
+ 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
# Wait until the query fails.
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 140)
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index 90191ed..22bbbf9 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -374,7 +374,7 @@
# the shutdown grace period expires. This demonstrates that queries don't get
# cancelled if the cluster membership changes while they're waiting for admission.
before_shutdown_admission_handle = self.execute_query_async(QUERY,
- {'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@30000'})
+ {'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@30000'})
# Shut down and wait for the shutdown state to propagate through statestore.
result = self.execute_query_expect_success(self.client, SHUTDOWN_EXEC2)
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 9b16384..fe5d2f8 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -196,7 +196,7 @@
"""Test that the exec summary is populated correctly in every query state"""
query = "select count(*) from functional.alltypes"
handle = self.execute_query_async(query,
- {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+ {"debug_action": "AC_BEFORE_ADMISSION:SLEEP@1000"})
# If ExecuteStatement() has completed and the query is paused in the admission control
# phase, then the coordinator has not started yet and exec_summary should be empty.
exec_summary = self.client.get_exec_summary(handle)
@@ -218,7 +218,7 @@
query state"""
query = "select count(*) from functional.alltypes"
handle = self.execute_query_async(query,
- {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+ {"debug_action": "AC_BEFORE_ADMISSION:SLEEP@1000"})
# If ExecuteStatement() has completed and the query is paused in the admission control
# phase, then the coordinator has not started yet and exec_summary should be empty.
@@ -743,7 +743,7 @@
"""Tests that the query profile shows expected query states."""
query = "select count(*) from functional.alltypes where bool_col = sleep(10)"
handle = self.execute_query_async(query,
- {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+ {"debug_action": "AC_BEFORE_ADMISSION:SLEEP@1000"})
# If ExecuteStatement() has completed and the query is paused in the admission control
# phase, then the query must be in COMPILED state.
profile = self.client.get_runtime_profile(handle)