IMPALA-10154: Fix data race on coord_backend_id in TSAN build
This issue was introduced by the patch for IMPALA-5746.
QueryState::exec_rpc_params_.coord_backend_id is set in function
QuestState::Init(), but it could be accessed by QueryExecMgr object in
QueryExecMgr::CancelQueriesForFailedCoordinators() before or during
QueryState::Init() is called, hence cause data race.
To fix it, move coord_backend_id from class ExecQueryFInstancesRequestPB
to class TQueryCtx. QueryState::query_ctx_ is a constant variable and is
set in QueryState c'tor so that QueryState::query_ctx_.coord_backend_id
is valid and will not be changed once the QuestState object is created.
Testing:
- Passed tests/custom_cluster/test_process_failures.py.
- Passed the core tests for normal build.
- Passed the core tests against a TSAN build.
Change-Id: I1c4b51e741a28b80bf3485adff8c97aabe0a3f67
Reviewed-on: http://gerrit.cloudera.org:8080/16437
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index ee6c039..5250927 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -110,7 +110,6 @@
const FilterRoutingTable& filter_routing_table, ExecQueryFInstancesRequestPB* request,
TExecPlanFragmentInfo* fragment_info) {
request->set_coord_state_idx(state_idx_);
- *request->mutable_coord_backend_id() = ExecEnv::GetInstance()->backend_id();
request->set_min_mem_reservation_bytes(
backend_exec_params_.min_mem_reservation_bytes());
request->set_initial_mem_reservation_total_claims(
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 6874ca4..d12f47b 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -229,7 +229,7 @@
ExecEnv::GetInstance()->query_exec_mgr()->qs_map_.DoFuncForAllEntries(
[&](QueryState* qs) {
if (qs != nullptr && !qs->IsCancelled()) {
- if (current_membership.find(qs->coord_backend_id())
+ if (current_membership.find(qs->GetCoordinatorBackendId())
== current_membership.end()) {
// decremented by ReleaseQueryState()
AcquireQueryStateLocked(qs);
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 8e071f9..5bffa6e 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -213,7 +213,6 @@
// don't copy query_ctx, it's large and we already did that in the c'tor
exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
- *exec_rpc_params_.mutable_coord_backend_id() = exec_rpc_params->coord_backend_id();
exec_rpc_params_.mutable_fragment_ctxs()->Swap(
const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentCtxPB>*>(
&exec_rpc_params->fragment_ctxs()));
@@ -248,6 +247,12 @@
return Status::OK();
}
+UniqueIdPB QueryState::GetCoordinatorBackendId() const {
+ UniqueIdPB backend_id_pb;
+ TUniqueIdToUniqueIdPB(query_ctx_.coord_backend_id, &backend_id_pb);
+ return backend_id_pb;
+}
+
int64_t QueryState::GetMaxReservation() {
int64_t mem_limit = query_mem_tracker_->GetLowestLimit(MemLimit::HARD);
int64_t max_reservation;
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 5f45eff..a72534a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -152,12 +152,10 @@
}
MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
RuntimeProfile* host_profile() const { return host_profile_; }
+ UniqueIdPB GetCoordinatorBackendId() const;
/// The following getters are only valid after Init().
ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
- const UniqueIdPB& coord_backend_id() const {
- return exec_rpc_params_.coord_backend_id();
- }
/// The following getters are only valid after Init() and should be called only from
/// the backend execution (ie. not the coordinator side, since they require holding
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index ddd2a03..9b75002 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -148,6 +148,8 @@
query_ctx.request_pool = "test-pool";
query_ctx.coord_address = exec_env_->configured_backend_address_;
query_ctx.coord_krpc_address = exec_env_->krpc_address_;
+ query_ctx.coord_backend_id.hi = 0;
+ query_ctx.coord_backend_id.lo = 0;
TQueryOptions* query_options_to_use = &query_ctx.client_request.query_options;
int64_t mem_limit =
query_options_to_use->__isset.mem_limit && query_options_to_use->mem_limit > 0 ?
@@ -161,9 +163,7 @@
// param
ExecQueryFInstancesRequestPB rpc_params;
// create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
- UniqueIdPB dummy_backend_id;
rpc_params.set_coord_state_idx(0);
- *rpc_params.mutable_coord_backend_id() = dummy_backend_id;
rpc_params.add_fragment_ctxs();
rpc_params.add_fragment_instance_ctxs();
TExecPlanFragmentInfo fragment_info;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d939ef4..d1d2a48 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1068,6 +1068,9 @@
query_ctx->__set_start_unix_millis(now_us / MICROS_PER_MILLI);
query_ctx->__set_coord_address(backend_addr);
query_ctx->__set_coord_krpc_address(krpc_addr);
+ TUniqueId backend_id;
+ UniqueIdPBToTUniqueId(ExecEnv::GetInstance()->backend_id(), &backend_id);
+ query_ctx->__set_coord_backend_id(backend_id);
query_ctx->__set_local_time_zone(local_tz_name);
query_ctx->__set_status_report_interval_ms(FLAGS_status_report_interval_ms);
query_ctx->__set_status_report_max_retry_s(FLAGS_status_report_max_retry_s);
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 8d97243..52495b8 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -384,9 +384,6 @@
// Execution parameters for specific fragment instances. Corresponds to
// 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
-
- // The Backend ID of the coordinator.
- optional UniqueIdPB coord_backend_id = 9;
}
message ExecQueryFInstancesResponsePB {
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 601a81b..3a9eefd 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -617,6 +617,9 @@
// If mt_dop was overridden by admission control's max mt_dop setting, then this
// is set to the original value. If mt_dop was not overridden, then this is not set.
26: optional i32 overridden_mt_dop_value
+
+ // The initiating coordinator's backend_id.
+ 27: optional Types.TUniqueId coord_backend_id
}
// Descriptor that indicates that a runtime filter is produced by a plan node.