IMPALA-10154: Fix data race on coord_backend_id in TSAN build

This issue was introduced by the patch for IMPALA-5746.
QueryState::exec_rpc_params_.coord_backend_id is set in function
QuestState::Init(), but it could be accessed by QueryExecMgr object in
QueryExecMgr::CancelQueriesForFailedCoordinators() before or during
QueryState::Init() is called, hence cause data race.
To fix it, move coord_backend_id from class ExecQueryFInstancesRequestPB
to class TQueryCtx. QueryState::query_ctx_ is a constant variable and is
set in QueryState c'tor so that QueryState::query_ctx_.coord_backend_id
is valid and will not be changed once the QuestState object is created.

Testing:
 - Passed tests/custom_cluster/test_process_failures.py.
 - Passed the core tests for normal build.
 - Passed the core tests against a TSAN build.

Change-Id: I1c4b51e741a28b80bf3485adff8c97aabe0a3f67
Reviewed-on: http://gerrit.cloudera.org:8080/16437
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index ee6c039..5250927 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -110,7 +110,6 @@
     const FilterRoutingTable& filter_routing_table, ExecQueryFInstancesRequestPB* request,
     TExecPlanFragmentInfo* fragment_info) {
   request->set_coord_state_idx(state_idx_);
-  *request->mutable_coord_backend_id() = ExecEnv::GetInstance()->backend_id();
   request->set_min_mem_reservation_bytes(
       backend_exec_params_.min_mem_reservation_bytes());
   request->set_initial_mem_reservation_total_claims(
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 6874ca4..d12f47b 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -229,7 +229,7 @@
   ExecEnv::GetInstance()->query_exec_mgr()->qs_map_.DoFuncForAllEntries(
       [&](QueryState* qs) {
         if (qs != nullptr && !qs->IsCancelled()) {
-          if (current_membership.find(qs->coord_backend_id())
+          if (current_membership.find(qs->GetCoordinatorBackendId())
               == current_membership.end()) {
             // decremented by ReleaseQueryState()
             AcquireQueryStateLocked(qs);
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 8e071f9..5bffa6e 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -213,7 +213,6 @@
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
   exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
-  *exec_rpc_params_.mutable_coord_backend_id() = exec_rpc_params->coord_backend_id();
   exec_rpc_params_.mutable_fragment_ctxs()->Swap(
       const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentCtxPB>*>(
           &exec_rpc_params->fragment_ctxs()));
@@ -248,6 +247,12 @@
   return Status::OK();
 }
 
+UniqueIdPB QueryState::GetCoordinatorBackendId() const {
+  UniqueIdPB backend_id_pb;
+  TUniqueIdToUniqueIdPB(query_ctx_.coord_backend_id, &backend_id_pb);
+  return backend_id_pb;
+}
+
 int64_t QueryState::GetMaxReservation() {
   int64_t mem_limit = query_mem_tracker_->GetLowestLimit(MemLimit::HARD);
   int64_t max_reservation;
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 5f45eff..a72534a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -152,12 +152,10 @@
   }
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
   RuntimeProfile* host_profile() const { return host_profile_; }
+  UniqueIdPB GetCoordinatorBackendId() const;
 
   /// The following getters are only valid after Init().
   ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
-  const UniqueIdPB& coord_backend_id() const {
-    return exec_rpc_params_.coord_backend_id();
-  }
 
   /// The following getters are only valid after Init() and should be called only from
   /// the backend execution (ie. not the coordinator side, since they require holding
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index ddd2a03..9b75002 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -148,6 +148,8 @@
   query_ctx.request_pool = "test-pool";
   query_ctx.coord_address = exec_env_->configured_backend_address_;
   query_ctx.coord_krpc_address = exec_env_->krpc_address_;
+  query_ctx.coord_backend_id.hi = 0;
+  query_ctx.coord_backend_id.lo = 0;
   TQueryOptions* query_options_to_use = &query_ctx.client_request.query_options;
   int64_t mem_limit =
       query_options_to_use->__isset.mem_limit && query_options_to_use->mem_limit > 0 ?
@@ -161,9 +163,7 @@
   // param
   ExecQueryFInstancesRequestPB rpc_params;
   // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
-  UniqueIdPB dummy_backend_id;
   rpc_params.set_coord_state_idx(0);
-  *rpc_params.mutable_coord_backend_id() = dummy_backend_id;
   rpc_params.add_fragment_ctxs();
   rpc_params.add_fragment_instance_ctxs();
   TExecPlanFragmentInfo fragment_info;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d939ef4..d1d2a48 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1068,6 +1068,9 @@
   query_ctx->__set_start_unix_millis(now_us / MICROS_PER_MILLI);
   query_ctx->__set_coord_address(backend_addr);
   query_ctx->__set_coord_krpc_address(krpc_addr);
+  TUniqueId backend_id;
+  UniqueIdPBToTUniqueId(ExecEnv::GetInstance()->backend_id(), &backend_id);
+  query_ctx->__set_coord_backend_id(backend_id);
   query_ctx->__set_local_time_zone(local_tz_name);
   query_ctx->__set_status_report_interval_ms(FLAGS_status_report_interval_ms);
   query_ctx->__set_status_report_max_retry_s(FLAGS_status_report_max_retry_s);
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 8d97243..52495b8 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -384,9 +384,6 @@
   // Execution parameters for specific fragment instances. Corresponds to
   // 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
   repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
-
-  // The Backend ID of the coordinator.
-  optional UniqueIdPB coord_backend_id = 9;
 }
 
 message ExecQueryFInstancesResponsePB {
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 601a81b..3a9eefd 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -617,6 +617,9 @@
   // If mt_dop was overridden by admission control's max mt_dop setting, then this
   // is set to the original value. If mt_dop was not overridden, then this is not set.
   26: optional i32 overridden_mt_dop_value
+
+  // The initiating coordinator's backend_id.
+  27: optional Types.TUniqueId coord_backend_id
 }
 
 // Descriptor that indicates that a runtime filter is produced by a plan node.