syntax = "proto2";
package impala;
import "common.proto";
import "control_service.proto";
import "statestore_service.proto";
// Execution parameters for a single fragment instance. Used to assemble the
// TPlanFragmentInstanceCtx/PlanFragmentInstanceCtxPB.
message FInstanceExecParamsPB {
// The fragment instance id.
optional UniqueIdPB instance_id = 1;
// Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
optional int32 fragment_idx = 2;
// Map from plan node id to a list of scan ranges.
map<int32, ScanRangesPB> per_node_scan_ranges = 5;
// 0-based ordinal number of this particular instance. This is within its fragment, not
// query-wide, so eg. there will be one instance '0' for each fragment.
optional int32 per_fragment_instance_idx = 6;
// In its role as a data sender, a fragment instance is assigned a "sender id" to
// uniquely identify it to a receiver. -1 = invalid.
optional int32 sender_id = 7 [default = -1];
// List of input join build finstances for joins in this finstance.
repeated JoinBuildInputPB join_build_inputs = 8;
// If this is a join build fragment, the number of fragment instances that consume the
// join build. -1 = invalid.
optional int32 num_join_build_outputs = 9 [default = -1];
// Execution parameters for a single backend. Used to construct the
// Coordinator::BackendStates.
message BackendExecParamsPB {
// The id of this backend.
optional UniqueIdPB backend_id = 1;
// The hostname + port of the KRPC backend service on this backend.
optional NetworkAddressPB address = 8;
// The IP address + port of the KRPC backend service on this backend.
optional NetworkAddressPB krpc_address = 9;
// The fragment instance params assigned to this backend. All instances of a
// particular fragment are contiguous in this list. This can be empty only for the
// coordinator backend, that is, if 'is_coord_backend' is true.
repeated FInstanceExecParamsPB instance_params = 2;
// The minimum query-wide buffer reservation size (in bytes) required for this backend.
// This is the peak minimum reservation that may be required by the
// concurrently-executing operators at any point in query execution. It may be less
// than the initial reservation total claims (below) if execution of some operators
// never overlaps, which allows reuse of reservations.
optional int64 min_mem_reservation_bytes = 3;
// Total of the initial buffer reservations that we expect to be claimed on this
// backend for all fragment instances in instance_params. I.e. the sum over all
// operators in all fragment instances that execute on this backend. This is used for
// an optimization in InitialReservation. Measured in bytes.
optional int64 initial_mem_reservation_total_claims = 4;
// Total thread reservation for fragment instances scheduled on this backend. This is
// the peak number of required threads that may be required by the
// concurrently-executing fragment instances at any point in query execution.
optional int64 thread_reservation = 5;
// Number of slots that this query should count for in admission control.
// This is calculated as the maximum # of instances of any fragment on this backend.
// I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified
// (but less if the query is not actually running with mt_dop instances on this node).
optional int32 slots_to_use = 6;
// Indicates whether this backend is the coordinator.
optional bool is_coord_backend = 7;
// Information about selected backend that designated as runtime filter preaggregator
// (before final aggregation in the coordinator) and fragment instances that must send
// filter update to them. This is populated by Scheduler::ComputeRandomKrpcForAggregation
// only for fragment having partitioned join that produce bloom filter and only if num
// backend executor (excluding coordinator) is at least 2x num aggregator (which default
// to 2).
message RuntimeFilterAggregatorInfoPB {
// Number of aggregator.
required int32 num_aggregators = 1;
// hostname:port of designated aggregators.
repeated NetworkAddressPB aggregator_krpc_addresses = 2;
// ip:port of designated aggregators.
repeated NetworkAddressPB aggregator_krpc_backends = 3;
// Number of backend executor that report to each designated aggregator + 1
// (including itself).
repeated int32 num_reporter_per_aggregator = 4;
// Size must be equal to the size of FragmentExecParamsPB.instances().
repeated int32 aggregator_idx_to_report = 5;
// Execution parameters shared between fragment instances
message FragmentExecParamsPB {
// Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
optional int32 fragment_idx = 1;
// Output destinations of this fragment.
repeated PlanFragmentDestinationPB destinations = 2;
// map from node id to the number of senders (node id expected to be for an
// ExchangeNode)
map<int32, int32> per_exch_num_senders = 3;
// List of fragment instance ids for all instances of this fragment.
repeated UniqueIdPB instances = 4;
// Total number of backends this fragment is scheduled on. Note that this represents
// the number of individual impalads, not the number of physical hosts.
optional int32 num_hosts = 5;
optional RuntimeFilterAggregatorInfoPB filter_agg_info = 6;
// Contains the output from scheduling and admission control that is used by the
// coordinator to start query execution.
message QuerySchedulePB {
optional UniqueIdPB query_id = 1;
// The per-fragment execution parameters for this schedule.
repeated FragmentExecParamsPB fragment_exec_params = 2;
// The per-backend execution parameters for this schedule.
repeated BackendExecParamsPB backend_exec_params = 3;
// Total number of scan ranges of this query.
optional int64 num_scan_ranges = 4;
// The memory limit per executor that will be imposed on the query.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. -1 means no limit.
optional int64 per_backend_mem_limit = 5;
// The per executor memory used for admission accounting.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. Can be zero if the query is only scheduled to run on the coordinator.
optional int64 per_backend_mem_to_admit = 6;
// The memory limit for the coordinator that will be imposed on the query. Used only if
// the query has a coordinator fragment.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. -1 means no limit.
optional int64 coord_backend_mem_limit = 7;
// The coordinator memory used for admission accounting.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully.
optional int64 coord_backend_mem_to_admit = 8;
/// The cluster wide estimated memory usage of this query.
optional int64 cluster_mem_est = 9;
// Mapping to store which data file is read on which hosts, grouped by scan node ID.
map<int32, FilepathToHostsMapPB> by_node_filepath_to_hosts = 10;
message AdmitQueryRequestPB {
optional UniqueIdPB query_id = 1;
// The BackendId of the coordinator for this query.
optional UniqueIdPB coord_id = 2;
// Idx of the TQueryExecRequest sidecar.
optional int32 query_exec_request_sidecar_idx = 3;
// List of backends this query should not be scheduled on.
repeated NetworkAddressPB blacklisted_executor_addresses = 4;
message AdmitQueryResponsePB {
// Ok if the request was successfully handed off to the admission thread pool for
// processing
optional StatusPB status = 1;
message GetQueryStatusRequestPB {
optional UniqueIdPB query_id = 1;
message GetQueryStatusResponsePB {
// Error if the query was rejected or retrieving the status failed.
optional StatusPB status = 1;
// The results of scheduling and admisison control. WIll only be set if admission was
// successful and the query has not yet been released.
optional QuerySchedulePB query_schedule = 2;
// Idx of the TRuntimeProfileTree sidecar.
optional int32 summary_profile_sidecar_idx = 3;
// Start time of the query queuing, in Unix milliseconds.
optional int64 wait_start_time_ms = 4;
// End time of the query queuing, in Unix milliseconds.
optional int64 wait_end_time_ms = 5;
message ReleaseQueryRequestPB {
optional UniqueIdPB query_id = 1;
// Corresponds to the 'peak_mem_consumption' parameter of
// AdmissionController::ReleaseQuery()
optional int64 peak_mem_consumption = 3;
message ReleaseQueryResponsePB {
optional StatusPB status = 1;
message ReleaseQueryBackendsRequestPB {
optional UniqueIdPB query_id = 1;
// List of backends that have completed. The resources for this query on these backends
// will be released.
repeated NetworkAddressPB host_addr = 2;
message ReleaseQueryBackendsResponsePB {
optional StatusPB status = 1;
message CancelAdmissionRequestPB {
optional UniqueIdPB query_id = 1;
message CancelAdmissionResponsePB {
optional StatusPB status = 1;
message AdmissionHeartbeatRequestPB {
// The backend id for the coordinator sending this heartbeat.
optional UniqueIdPB host_id = 1;
// The version number of this heartbeat. Incremented every time a new heartbeat is sent.
optional int64 version = 2;
// A list of all queries registered at this coordinator.
repeated UniqueIdPB query_ids = 3;
message AdmissionHeartbeatResponsePB {
optional StatusPB status = 1;
service AdmissionControlService {
/// Called by the coordinator to start scheduling. The actual work is done on a thread
/// pool, so this call returns immedately. Idempotent - if the query has already been
/// submitted previously, returns OK without doing anything. TODO: there are some
/// situations where we can return the admission result quickly, eg. if the query is
/// rejected. We should evaluate the benefits of saving a call to GetQueryStatus() in
/// those situations.
rpc AdmitQuery(AdmitQueryRequestPB) returns (AdmitQueryResponsePB);
/// Called by the coordinator after AdmitQuery() to monitor the admission status of the
/// query. The call will block for a configurable amount of time before returning. This
/// call is idempotent and will return the schedule on each call between successful
/// admission and the query getting released.
rpc GetQueryStatus(GetQueryStatusRequestPB) returns (GetQueryStatusResponsePB);
/// Called by the coordinator when the query has completely finished, releases all
/// remaining resources.
rpc ReleaseQuery(ReleaseQueryRequestPB) returns (ReleaseQueryResponsePB);
/// Called after individual backends have finished to release their resources while
/// other backends are running. Due to the use of Coordinator::BackendResourceState,
/// this will be called a max of log(# of backends) times per query. TODO: we can save
/// an rpc if we combine the release of the final batch of backends with the call to
/// ReleaseQuery.
rpc ReleaseQueryBackends(ReleaseQueryBackendsRequestPB)
returns (ReleaseQueryBackendsResponsePB);
/// Called by the coordinator to cancel scheduling of a query for which GetQueryStatus
/// has not yet returned a schedule.
rpc CancelAdmission(CancelAdmissionRequestPB) returns (CancelAdmissionResponsePB);
/// Used to ensure that the admission service and coordinator have a consistent view of
/// what resources are being used even in the face of possible rpc failures.
/// Periodically called by each coordinator with a list of query ids for all queries at
/// that coordinator. If the admissiond has resources allocated to a query that is not
/// included in the list, it assumes the query has completed and releases it's remaining
/// resources. Stale heartbeat messages are ignored.
rpc AdmissionHeartbeat(AdmissionHeartbeatRequestPB)
returns (AdmissionHeartbeatResponsePB);