IMPALA-9181: Serialize TQueryCtx once per query

When issuing Exec() rpcs to backends, we currently serialize the
TQueryCtx once per backend. This is inefficient as the TQueryCtx is
the same for all backends and really only needs to be serialized once.

Serializing the TQueryCtx can be expensive as it contains both the
full text of the original query and the descriptor table, which can be
quite large. In a synthetic dataset I tested with, scanning a table
with 100k partitions leads to a descriptor table size of ~20MB.

This patch serializes the TQueryCtx in the coordinator and then passes
it to each BackendState when calling Exec().

Followup work might consider if we really need all of the info in the
TQueryCtx to be distributed to all backends.

Testing:
- Passed full run of existing tests.
- Single node perf run showed no significant change.

Change-Id: I6a4dd302fd5602ec2775492a041ddd51e7d7a6c6
Reviewed-on: http://gerrit.cloudera.org:8080/14777
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 5ab055b..9c7e5d1 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -26,6 +26,7 @@
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "rpc/rpc-mgr.inline.h"
 #include "runtime/backend-client.h"
@@ -105,7 +106,7 @@
 
 void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
     const FilterRoutingTable& filter_routing_table, ExecQueryFInstancesRequestPB* request,
-    TExecQueryFInstancesSidecar* sidecar) {
+    TExecPlanFragmentInfo* fragment_info) {
   request->set_coord_state_idx(state_idx_);
   request->set_min_mem_reservation_bytes(backend_exec_params_->min_mem_reservation_bytes);
   request->set_initial_mem_reservation_total_claims(
@@ -113,19 +114,20 @@
   request->set_per_backend_mem_limit(schedule_.per_backend_mem_limit());
 
   // set fragment_ctxs and fragment_instance_ctxs
-  sidecar->__isset.fragment_ctxs = true;
-  sidecar->__isset.fragment_instance_ctxs = true;
-  sidecar->fragment_instance_ctxs.resize(backend_exec_params_->instance_params.size());
+  fragment_info->__isset.fragment_ctxs = true;
+  fragment_info->__isset.fragment_instance_ctxs = true;
+  fragment_info->fragment_instance_ctxs.resize(
+      backend_exec_params_->instance_params.size());
   for (int i = 0; i < backend_exec_params_->instance_params.size(); ++i) {
-    TPlanFragmentInstanceCtx& instance_ctx = sidecar->fragment_instance_ctxs[i];
+    TPlanFragmentInstanceCtx& instance_ctx = fragment_info->fragment_instance_ctxs[i];
     const FInstanceExecParams& params = *backend_exec_params_->instance_params[i];
     int fragment_idx = params.fragment_exec_params.fragment.idx;
 
     // add a TPlanFragmentCtx, if we don't already have it
-    if (sidecar->fragment_ctxs.empty()
-        || sidecar->fragment_ctxs.back().fragment.idx != fragment_idx) {
-      sidecar->fragment_ctxs.emplace_back();
-      TPlanFragmentCtx& fragment_ctx = sidecar->fragment_ctxs.back();
+    if (fragment_info->fragment_ctxs.empty()
+        || fragment_info->fragment_ctxs.back().fragment.idx != fragment_idx) {
+      fragment_info->fragment_ctxs.emplace_back();
+      TPlanFragmentCtx& fragment_ctx = fragment_info->fragment_ctxs.back();
       fragment_ctx.__set_fragment(params.fragment_exec_params.fragment);
       fragment_ctx.__set_destinations(params.fragment_exec_params.destinations);
     }
@@ -163,10 +165,9 @@
   status_ = Status::Expected(err_msg);
 }
 
-void Coordinator::BackendState::Exec(
-    const DebugOptions& debug_options,
+void Coordinator::BackendState::Exec(const DebugOptions& debug_options,
     const FilterRoutingTable& filter_routing_table,
-    CountingBarrier* exec_complete_barrier) {
+    const kudu::Slice& serialized_query_ctx, CountingBarrier* exec_complete_barrier) {
   const auto trigger = MakeScopeExitTrigger([&]() {
     // Ensure that 'last_report_time_ms_' is set prior to the barrier being notified.
     last_report_time_ms_ = GenerateReportTimestamp();
@@ -188,9 +189,8 @@
   }
 
   ExecQueryFInstancesRequestPB request;
-  TExecQueryFInstancesSidecar sidecar;
-  sidecar.__set_query_ctx(query_ctx_);
-  SetRpcParams(debug_options, filter_routing_table, &request, &sidecar);
+  TExecPlanFragmentInfo fragment_info;
+  SetRpcParams(debug_options, filter_routing_table, &request, &fragment_info);
 
   RpcController rpc_controller;
   rpc_controller.set_timeout(
@@ -202,7 +202,7 @@
   uint8_t* serialized_buf = nullptr;
   uint32_t serialized_len = 0;
   Status serialize_status =
-      serializer.SerializeToBuffer(&sidecar, &serialized_len, &serialized_buf);
+      serializer.SerializeToBuffer(&fragment_info, &serialized_len, &serialized_buf);
   if (UNLIKELY(!serialize_status.ok())) {
     SetExecError(serialize_status);
     return;
@@ -212,6 +212,7 @@
     return;
   }
 
+  // TODO: eliminate the extra copy here by using a Slice
   unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
   sidecar_buf->assign_copy(serialized_buf, serialized_len);
   unique_ptr<RpcSidecar> rpc_sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
@@ -223,7 +224,19 @@
     SetExecError(FromKuduStatus(sidecar_status, "Failed to add sidecar"));
     return;
   }
-  request.set_sidecar_idx(sidecar_idx);
+  request.set_plan_fragment_info_sidecar_idx(sidecar_idx);
+
+  // Add the serialized TQueryCtx as a sidecar.
+  unique_ptr<RpcSidecar> query_ctx_sidecar = RpcSidecar::FromSlice(serialized_query_ctx);
+  int query_ctx_sidecar_idx;
+  kudu::Status query_ctx_sidecar_status =
+      rpc_controller.AddOutboundSidecar(move(query_ctx_sidecar), &query_ctx_sidecar_idx);
+  if (!query_ctx_sidecar_status.ok()) {
+    SetExecError(
+        FromKuduStatus(query_ctx_sidecar_status, "Failed to add TQueryCtx sidecar"));
+    return;
+  }
+  request.set_query_ctx_sidecar_idx(query_ctx_sidecar_idx);
 
   VLOG_FILE << "making rpc: ExecQueryFInstances"
       << " host=" << TNetworkAddressToString(impalad_address()) << " query_id="
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index ceb7f77..383e70c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -39,6 +39,10 @@
 #include "util/runtime-profile.h"
 #include "util/stopwatch.h"
 
+namespace kudu {
+class Slice;
+}
+
 namespace impala {
 
 class ProgressUpdater;
@@ -78,7 +82,7 @@
   /// on their node_id/instance_idx.
   void Exec(const DebugOptions& debug_options,
       const FilterRoutingTable& filter_routing_table,
-      CountingBarrier* rpc_complete_barrier);
+      const kudu::Slice& serialized_query_ctx, CountingBarrier* rpc_complete_barrier);
 
   /// Update overall execution status, including the instances' exec status/profiles
   /// and the error log, if this backend is not already done. Updates the fragment
@@ -321,11 +325,11 @@
   /// The query id of the Coordinator that owns this BackendState.
   const TUniqueId& query_id_;
 
-  /// Fill in 'request' and 'sidecar' based on state. Uses filter_routing_table to remove
-  /// filters that weren't selected during its construction.
+  /// Fill in 'request' and 'fragment_info' based on state. Uses 'filter_routing_table' to
+  /// remove filters that weren't selected during its construction.
   void SetRpcParams(const DebugOptions& debug_options,
       const FilterRoutingTable& filter_routing_table,
-      ExecQueryFInstancesRequestPB* request, TExecQueryFInstancesSidecar* sidecar);
+      ExecQueryFInstancesRequestPB* request, TExecPlanFragmentInfo* fragment_info);
 
   /// Expects that 'status' is an error. Sets 'status_' to a formatted version of its
   /// message.
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 2ba678d..09efe80 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -136,7 +136,7 @@
   // runtime-related state changes past this point (examples: fragment instance
   // profiles, etc.)
 
-  StartBackendExec();
+  RETURN_IF_ERROR(StartBackendExec());
   RETURN_IF_ERROR(FinishBackendStartup());
 
   // set coord_instance_ and coord_sink_
@@ -358,7 +358,7 @@
   filter_routing_table_->is_complete = true;
 }
 
-void Coordinator::StartBackendExec() {
+Status Coordinator::StartBackendExec() {
   int num_backends = backend_states_.size();
   backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
 
@@ -368,9 +368,21 @@
              << PrintId(query_id());
   query_events_->MarkEvent(Substitute("Ready to start on $0 backends", num_backends));
 
+  // Serialize the TQueryCtx once and pass it to each backend. The serialized buffer must
+  // stay valid until exec_rpcs_complete_barrier_ has been signalled.
+  ThriftSerializer serializer(true);
+  uint8_t* serialized_buf = nullptr;
+  uint32_t serialized_len = 0;
+  Status serialize_status =
+      serializer.SerializeToBuffer(&query_ctx(), &serialized_len, &serialized_buf);
+  if (UNLIKELY(!serialize_status.ok())) {
+    return UpdateExecState(serialize_status, nullptr, FLAGS_hostname);
+  }
+  kudu::Slice query_ctx_slice(serialized_buf, serialized_len);
+
   for (BackendState* backend_state: backend_states_) {
     ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
-        [backend_state, this, &debug_options]() {
+        [backend_state, this, &debug_options, &query_ctx_slice]() {
           DebugActionNoFail(schedule_.query_options(), "COORD_BEFORE_EXEC_RPC");
           // Safe for Exec() to read 'filter_routing_table_' because it is complete
           // at this point and won't be destroyed while this function is executing,
@@ -378,8 +390,8 @@
           // signalled.
           DCHECK(filter_mode_ == TRuntimeFilterMode::OFF
               || filter_routing_table_->is_complete);
-          backend_state->Exec(
-              debug_options, *filter_routing_table_, &exec_rpcs_complete_barrier_);
+          backend_state->Exec(debug_options, *filter_routing_table_, query_ctx_slice,
+              &exec_rpcs_complete_barrier_);
         });
   }
   exec_rpcs_complete_barrier_.Wait();
@@ -389,6 +401,7 @@
   query_events_->MarkEvent(
       Substitute("All $0 execution backends ($1 fragment instances) started",
         num_backends, schedule_.GetNumFragmentInstances()));
+  return Status::OK();
 }
 
 Status Coordinator::FinishBackendStartup() {
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index c8db6d7..0185f99 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -485,7 +485,7 @@
 
   /// Helper for Exec(). Populates backend_states_, starts query execution at all
   /// backends in parallel, and blocks until startup completes.
-  void StartBackendExec();
+  Status StartBackendExec();
 
   /// Helper for Exec(). Checks for errors encountered when starting backend execution,
   /// using any non-OK status, if any, as the overall status. Returns the overall
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index d5da249..5c282d3 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -43,14 +43,14 @@
     "every log_mem_usage_interval'th fragment completion.");
 
 Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
-    const TExecQueryFInstancesSidecar& sidecar) {
-  TUniqueId query_id = sidecar.query_ctx.query_id;
+    const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
+  TUniqueId query_id = query_ctx.query_id;
   VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
-          << " coord=" << TNetworkAddressToString(sidecar.query_ctx.coord_address);
+          << " coord=" << TNetworkAddressToString(query_ctx.coord_address);
   bool dummy;
   QueryState* qs =
-      GetOrCreateQueryState(sidecar.query_ctx, request->per_backend_mem_limit(), &dummy);
-  Status status = qs->Init(request, sidecar);
+      GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
+  Status status = qs->Init(request, fragment_info);
   if (!status.ok()) {
     qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
     ReleaseQueryState(qs);
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 9ca86b6..68c2e4c 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -50,7 +50,7 @@
   /// After this function returns, it is legal to call QueryState::Cancel(), regardless of
   /// the return value of this function.
   Status StartQuery(const ExecQueryFInstancesRequestPB* request,
-      const TExecQueryFInstancesSidecar& sidecar);
+      const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info);
 
   /// Creates a QueryState for the given query with the provided parameters. Only valid
   /// to call if the QueryState does not already exist. The caller must call
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 196a74e..e5a804c 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -135,7 +135,7 @@
 }
 
 Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
-    const TExecQueryFInstancesSidecar& sidecar) {
+    const TExecPlanFragmentInfo& fragment_info) {
   // Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of
   // Init() on failure. We need to do this before any returns because Init() always
   // returns a resource refcount to its caller.
@@ -197,17 +197,18 @@
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
   exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
-  TExecQueryFInstancesSidecar& non_const_params =
-      const_cast<TExecQueryFInstancesSidecar&>(sidecar);
-  exec_rpc_sidecar_.fragment_ctxs.swap(non_const_params.fragment_ctxs);
-  exec_rpc_sidecar_.__isset.fragment_ctxs = true;
-  exec_rpc_sidecar_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
-  exec_rpc_sidecar_.__isset.fragment_instance_ctxs = true;
+  TExecPlanFragmentInfo& non_const_fragment_info =
+      const_cast<TExecPlanFragmentInfo&>(fragment_info);
+  fragment_info_.fragment_ctxs.swap(non_const_fragment_info.fragment_ctxs);
+  fragment_info_.__isset.fragment_ctxs = true;
+  fragment_info_.fragment_instance_ctxs.swap(
+      non_const_fragment_info.fragment_instance_ctxs);
+  fragment_info_.__isset.fragment_instance_ctxs = true;
 
   instances_prepared_barrier_.reset(
-      new CountingBarrier(exec_rpc_sidecar_.fragment_instance_ctxs.size()));
+      new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
   instances_finished_barrier_.reset(
-      new CountingBarrier(exec_rpc_sidecar_.fragment_instance_ctxs.size()));
+      new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
 
   // Claim the query-wide minimum reservation. Do this last so that we don't need
   // to handle releasing it if a later step fails.
@@ -302,8 +303,8 @@
     ReportExecStatusRequestPB* report, TRuntimeProfileForest* profiles_forest) {
   report->Clear();
   TUniqueIdToUniqueIdPB(query_id(), report->mutable_query_id());
-  DCHECK(exec_rpc_params().has_coord_state_idx());
-  report->set_coord_state_idx(exec_rpc_params().coord_state_idx());
+  DCHECK(exec_rpc_params_.has_coord_state_idx());
+  report->set_coord_state_idx(exec_rpc_params_.coord_state_idx());
   {
     std::unique_lock<SpinLock> l(status_lock_);
     overall_status_.ToProto(report->mutable_overall_status());
@@ -504,13 +505,13 @@
 
 bool QueryState::StartFInstances() {
   VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
-          << " #instances=" << exec_rpc_sidecar_.fragment_instance_ctxs.size();
+          << " #instances=" << fragment_info_.fragment_instance_ctxs.size();
   DCHECK_GT(refcnt_.Load(), 0);
   DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
 
-  DCHECK_GT(exec_rpc_sidecar_.fragment_ctxs.size(), 0);
-  TPlanFragmentCtx* fragment_ctx = &exec_rpc_sidecar_.fragment_ctxs[0];
-  int num_unstarted_instances = exec_rpc_sidecar_.fragment_instance_ctxs.size();
+  DCHECK_GT(fragment_info_.fragment_ctxs.size(), 0);
+  TPlanFragmentCtx* fragment_ctx = &fragment_info_.fragment_ctxs[0];
+  int num_unstarted_instances = fragment_info_.fragment_instance_ctxs.size();
   int fragment_ctx_idx = 0;
 
   // set up desc tbl
@@ -523,12 +524,12 @@
 
   fragment_events_start_time_ = MonotonicStopWatch::Now();
   for (const TPlanFragmentInstanceCtx& instance_ctx :
-      exec_rpc_sidecar_.fragment_instance_ctxs) {
+      fragment_info_.fragment_instance_ctxs) {
     // determine corresponding TPlanFragmentCtx
     if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
       ++fragment_ctx_idx;
-      DCHECK_LT(fragment_ctx_idx, exec_rpc_sidecar_.fragment_ctxs.size());
-      fragment_ctx = &exec_rpc_sidecar_.fragment_ctxs[fragment_ctx_idx];
+      DCHECK_LT(fragment_ctx_idx, fragment_info_.fragment_ctxs.size());
+      fragment_ctx = &fragment_info_.fragment_ctxs[fragment_ctx_idx];
       // we expect fragment and instance contexts to follow the same order
       DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx);
     }
@@ -646,7 +647,7 @@
              << " fragment_idx=" << fis->instance_ctx().fragment_idx
              << " per_fragment_instance_idx="
              << fis->instance_ctx().per_fragment_instance_idx
-             << " coord_state_idx=" << exec_rpc_params().coord_state_idx()
+             << " coord_state_idx=" << exec_rpc_params_.coord_state_idx()
              << " #in-flight="
              << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
   Status status = fis->Exec();
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 6e95395..86156f4 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -133,10 +133,6 @@
 
   /// The following getters are only valid after Init().
   ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
-  const ExecQueryFInstancesRequestPB& exec_rpc_params() const { return exec_rpc_params_; }
-  const TExecQueryFInstancesSidecar& exec_rpc_sidecar() const {
-    return exec_rpc_sidecar_;
-  }
 
   /// The following getters are only valid after Init() and should be called only from
   /// the backend execution (ie. not the coordinator side, since they require holding
@@ -173,7 +169,7 @@
   /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
   /// The remaining public functions must be called only after Init().
   Status Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
-      const TExecQueryFInstancesSidecar& sidecar) WARN_UNUSED_RESULT;
+      const TExecPlanFragmentInfo& fragment_info) WARN_UNUSED_RESULT;
 
   /// Performs the runtime-intensive parts of initial setup and starts all fragment
   /// instances belonging to this query. Each instance receives its own execution
@@ -319,11 +315,9 @@
   /// Set in Init().
   std::unique_ptr<ControlServiceProxy> proxy_;
 
-  /// Set in Init(); exec_rpc_sidecar_.query_ctx is *not* set to avoid duplication
-  /// with query_ctx_.
-  /// TODO: find a way not to have to copy this
+  /// Set in Init(). TODO: find a way not to have to copy this
   ExecQueryFInstancesRequestPB exec_rpc_params_;
-  TExecQueryFInstancesSidecar exec_rpc_sidecar_;
+  TExecPlanFragmentInfo fragment_info_;
 
   /// Buffer reservation for this query (owned by obj_pool_). Set in Init().
   ReservationTracker* buffer_reservation_ = nullptr;
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 6e95174..e1c20cb 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -157,15 +157,13 @@
   ExecQueryFInstancesRequestPB rpc_params;
   // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
   rpc_params.set_coord_state_idx(0);
-  TExecQueryFInstancesSidecar sidecar;
-  sidecar.__set_query_ctx(TQueryCtx());
-  sidecar.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
-  sidecar.__set_fragment_instance_ctxs(
+  TExecPlanFragmentInfo fragment_info;
+  fragment_info.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
+  fragment_info.__set_fragment_instance_ctxs(
       vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
-  RETURN_IF_ERROR(qs->Init(&rpc_params, sidecar));
-  FragmentInstanceState* fis = qs->obj_pool()->Add(
-      new FragmentInstanceState(qs, qs->exec_rpc_sidecar().fragment_ctxs[0],
-          qs->exec_rpc_sidecar().fragment_instance_ctxs[0]));
+  RETURN_IF_ERROR(qs->Init(&rpc_params, fragment_info));
+  FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(qs,
+      qs->fragment_info_.fragment_ctxs[0], qs->fragment_info_.fragment_instance_ctxs[0]));
   RuntimeState* rs = qs->obj_pool()->Add(
       new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
   runtime_states_.push_back(rs);
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index 0ba51bc..3c6c4c6 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -113,15 +113,15 @@
   return Status::OK();
 }
 
-Status ControlService::GetExecQueryFInstancesSidecar(
-    const ExecQueryFInstancesRequestPB& request, RpcContext* rpc_context,
-    TExecQueryFInstancesSidecar* sidecar) {
+// Retrieves the sidecar at 'sidecar_idx' from 'rpc_context' and deserializes it into
+// 'thrift_obj'.
+template <typename T>
+static Status GetSidecar(int sidecar_idx, RpcContext* rpc_context, T* thrift_obj) {
   kudu::Slice sidecar_slice;
-  KUDU_RETURN_IF_ERROR(
-      rpc_context->GetInboundSidecar(request.sidecar_idx(), &sidecar_slice),
-      "Failed to get thrift profile sidecar");
+  KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(sidecar_idx, &sidecar_slice),
+      "Failed to get sidecar");
   uint32_t len = sidecar_slice.size();
-  RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, sidecar));
+  RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, thrift_obj));
   return Status::OK();
 }
 
@@ -129,24 +129,35 @@
     ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
   DebugActionNoFail(FLAGS_debug_actions, "EXEC_QUERY_FINSTANCES_DELAY");
   DCHECK(request->has_coord_state_idx());
-  DCHECK(request->has_sidecar_idx());
-  TExecQueryFInstancesSidecar sidecar;
-  const Status& sidecar_status =
-      GetExecQueryFInstancesSidecar(*request, rpc_context, &sidecar);
-  if (!sidecar_status.ok()) {
-    RespondAndReleaseRpc(sidecar_status, response, rpc_context);
+  DCHECK(request->has_plan_fragment_info_sidecar_idx());
+  DCHECK(request->has_query_ctx_sidecar_idx());
+  // Deserialize the sidecars. The QueryState will make a copy of the TQueryCtx and
+  // TExecPlanFragmentInfo, so we can deallocate the deserialized values after
+  // StartQuery(). TODO: can we avoid this extra copy?
+  TExecPlanFragmentInfo fragment_info;
+  const Status& fragment_info_sidecar_status =
+      GetSidecar(request->plan_fragment_info_sidecar_idx(), rpc_context, &fragment_info);
+  if (!fragment_info_sidecar_status.ok()) {
+    RespondAndReleaseRpc(fragment_info_sidecar_status, response, rpc_context);
     return;
   }
-  ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), sidecar.query_ctx.query_id);
+  TQueryCtx query_ctx;
+  const Status& query_ctx_sidecar_status =
+      GetSidecar(request->query_ctx_sidecar_idx(), rpc_context, &query_ctx);
+  if (!query_ctx_sidecar_status.ok()) {
+    RespondAndReleaseRpc(query_ctx_sidecar_status, response, rpc_context);
+    return;
+  }
+  ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_ctx.query_id);
   VLOG_QUERY << "ExecQueryFInstances():"
-             << " query_id=" << PrintId(sidecar.query_ctx.query_id)
-             << " coord=" << TNetworkAddressToString(sidecar.query_ctx.coord_address)
-             << " #instances=" << sidecar.fragment_instance_ctxs.size();
-  Status resp_status =
-      ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(request, sidecar);
+             << " query_id=" << PrintId(query_ctx.query_id)
+             << " coord=" << TNetworkAddressToString(query_ctx.coord_address)
+             << " #instances=" << fragment_info.fragment_instance_ctxs.size();
+  Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
+      request, query_ctx, fragment_info);
   if (!resp_status.ok()) {
-    LOG(INFO) << "ExecQueryFInstances() failed: query_id="
-              << PrintId(sidecar.query_ctx.query_id) << ": " << resp_status.GetDetail();
+    LOG(INFO) << "ExecQueryFInstances() failed: query_id=" << PrintId(query_ctx.query_id)
+              << ": " << resp_status.GetDetail();
   }
   RespondAndReleaseRpc(resp_status, response, rpc_context);
 }
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index 020cba7..1aa26b2 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -92,12 +92,6 @@
       const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
       TRuntimeProfileForest* thrift_profiles);
 
-  /// Helper for deserializing the ExecQueryFInstances sidecar attached in the inbound
-  /// call within 'rpc_context'. On success, returns the deserialized sidecar in
-  /// 'sidecar'. On failure, returns the error status;
-  static Status GetExecQueryFInstancesSidecar(const ExecQueryFInstancesRequestPB& request,
-      RpcContext* rpc_context, TExecQueryFInstancesSidecar* sidecar);
-
   /// Helper for serializing 'status' as part of 'response'. Also releases memory
   /// of the RPC payload previously accounted towards the internal memory tracker.
   template <typename ResponsePBType>
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index b598f39..50cad00 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -222,9 +222,11 @@
   // the coordinator.
   optional int32 coord_state_idx = 1;
 
-  // Sidecar index of the TExecQueryFInstancesSidecar, which contains the query and plan
-  // fragment contexts.
-  optional int32 sidecar_idx = 2;
+  // Sidecar index of the TQueryCtx.
+  optional int32 query_ctx_sidecar_idx = 2;
+
+  // Sidecar index of the TExecPlanFragmentInfo.
+  optional int32 plan_fragment_info_sidecar_idx = 3;
 
   // The minimum query-wide memory reservation (in bytes) required for the backend
   // executing the instances in fragment_instance_ctxs. This is the peak minimum
@@ -232,17 +234,17 @@
   // point in query execution. It may be less than the initial reservation total claims
   // (below) if execution of some operators never overlaps, which allows reuse of
   // reservations.
-  optional int64 min_mem_reservation_bytes = 3;
+  optional int64 min_mem_reservation_bytes = 4;
 
   // Total of the initial buffer reservations that we expect to be claimed on this
   // backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
   // operators in all fragment instances that execute on this backend. This is used for
   // an optimization in InitialReservation. Measured in bytes.
-  optional int64 initial_mem_reservation_total_claims = 4;
+  optional int64 initial_mem_reservation_total_claims = 5;
 
   // The backend memory limit (in bytes) as set by the admission controller. Used by the
   // query mem tracker to enforce the memory limit.
-  optional int64 per_backend_mem_limit = 5;
+  optional int64 per_backend_mem_limit = 6;
 }
 
 message ExecQueryFInstancesResponsePB {
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 3ba97c9..2285d4b 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -658,14 +658,15 @@
 
 // The following contains the per-rpc structs for the parameters and the result.
 
-// TODO: convert this fully to protobuf.
-struct TExecQueryFInstancesSidecar {
-  1: optional TQueryCtx query_ctx
-
-  2: optional list<TPlanFragmentCtx> fragment_ctxs
+// Contains info about plan fragment execution needed for the ExecQueryFInstances rpc.
+// Rather than fully coverting this to protobuf, which would be a large change, for now we
+// serialize it ourselves and send it with ExecQueryFInstances as a sidecar.
+// TODO: investigate if it's worth converting this fully to protobuf
+struct TExecPlanFragmentInfo {
+  1: optional list<TPlanFragmentCtx> fragment_ctxs
 
   // the order corresponds to the order of fragments in fragment_ctxs
-  3: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
+  2: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
 }
 
 // Parameters for RequestPoolService.resolveRequestPool()