package impala;
import "common.proto";
import "planner.proto";
import "kudu/rpc/rpc_header.proto";
message ParquetDmlStatsPB {
// For each column, the on disk byte size
map<string, int64> per_column_size = 1;
message KuduDmlStatsPB {
// The number of reported per-row errors, i.e. this many rows were not modified.
// Note that this aggregate is less useful than a breakdown of the number of errors by
// error type, e.g. number of rows with duplicate key conflicts, number of rows
// violating nullability constraints, etc., but it isn't possible yet to differentiate
// all error types in the KuduTableSink yet.
optional int64 num_row_errors = 1;
// ReportExecStatus
// Per partition DML stats
// TODO: this should include the table stats that we update the metastore with.
message DmlStatsPB {
optional int64 bytes_written = 1;
optional ParquetDmlStatsPB parquet_stats = 2;
optional KuduDmlStatsPB kudu_stats = 3;
// Per-file statistics and metadata resulting from DML statements.
message DmlFileStatusPb {
required string final_path = 1;
required int64 num_rows = 2;
required int64 size = 3;
// Temporaty path where the file was written by the executor.
// The coordinator needs to move the file from staging_path to final_path.
// If not set, the file was already written to its final path (e.g. in
// transactional tables).
// TODO: this could be merged with final_path by storing only the suffix
// of the path and append it to a staging/final prefix from DmlPartitionStatusPB
optional string staging_path = 4;
// Flat buffer encoded Iceberg data file (see FbIcebergDataFile). Contains metadata and
// stats for the iceberg file.
optional bytes iceberg_data_file_fb = 5;
// Per-partition statistics and metadata resulting from DML statements.
message DmlPartitionStatusPB {
// The id of the partition written to (may be -1 if the partition is created by this
// query). See THdfsTable.partitions.
optional int64 id = 1;
// The number of rows modified in this partition
optional int64 num_modified_rows = 2;
// The number of rows deleted in this partition
optional int64 num_deleted_rows = 3;
// Detailed statistics gathered by table writers for this partition
optional DmlStatsPB stats = 4;
// Fully qualified URI to the base directory for this partition.
optional string partition_base_dir = 5;
// The latest observed Kudu timestamp reported by the local KuduSession.
// This value is an unsigned int64.
optional int64 kudu_latest_observed_ts = 6;
// List of files created during the DML statement in this partition.
repeated DmlFileStatusPb created_files = 7;
// List of delete files created during the DML statement in this partition.
repeated DmlFileStatusPb created_delete_files = 8;
// Fully qualified URI to the staging directory for this partition.
optional string staging_dir_to_clean_up = 9;
// The results of a DML statement, sent to the coordinator as part of
// ReportExecStatusRequestPB
message DmlExecStatusPB {
// Per-partition details, used in finalization and reporting.
// The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
// root's key in an unpartitioned table being ROOT_PARTITION_KEY.
// The target table name is recorded in the corresponding TQueryExecRequest
map<string, DmlPartitionStatusPB> per_partition_status = 1;
// In case of Iceberg modify statements it contains the data files referenced
// by position delete records.
repeated string data_files_referenced_by_position_deletes = 2;
// Error message exchange format
message ErrorLogEntryPB {
// Number of error messages reported using the above identifier
optional int32 count = 1;
// Sample messages from the above error code
repeated string messages = 2;
// Represents the states that a fragment instance goes through during its execution. The
// current state gets sent back to the coordinator and will be presented to users through
// the debug webpages. The states are listed in the order to which they are transitioned.
// Not all states are necessarily transitioned through when there are errors.
enum FInstanceExecStatePB {
// Represents any part of the status report that isn't idempotent. If the executor thinks
// the report failed, we'll retransmit these parts, and this allows us to keep them
// associated with their original sequence number so that if the coordinator actually did
// receive the original report it won't reapply them.
message StatefulStatusPB {
// Sequence number prevents out-of-order or duplicated updates from being applied.
// 'report_seq_no' will be <= the 'report_seq_no' in the FragmentInstanceExecStatusPB
// that contains this StatefulStatusPB.
optional int64 report_seq_no = 1;
// Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
// the coordinator by this fragment instance. Not idempotent.
map<int32, ErrorLogEntryPB> error_log = 2;
// Metadata associated with a failed fragment instance. Only set for failed fragment
// instances.
optional AuxErrorInfoPB aux_error_info = 3;
// Per-node stats required for the exec summary.
message ExecSummaryDataPB {
// Plan node ID, set if this is for a PlanNode.
optional int32 plan_node_id = 1;
// Plan node ID, set if this is for a DataSink.
optional int32 data_sink_id = 2;
// Rows returned from this node, if this is a PlanNode.
optional int64 rows_returned = 3;
// Peak memory usage in bytes of this PlanNode or DataSink.
optional int64 peak_mem_usage = 4;
// Local time in nanoseconds spent in this plan node.
optional int64 local_time_ns = 5;
// RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a
// RPC to another node failed.
message RPCErrorInfoPB {
// The address of the RPC's target node.
required NetworkAddressPB dest_node = 1;
// The posix error code of the failed RPC.
required int32 posix_error_code = 2;
// Error metadata that can be associated with a failed fragment instance. Used to store
// extra info about errors encountered during fragment execution. This information is
// used by the Coordinator to blacklist potentially unhealthy nodes.
message AuxErrorInfoPB {
// Set if the fragment instance failed because a RPC to another node failed. Only set
// if the RPC failed due to a network error.
optional RPCErrorInfoPB rpc_error_info = 1;
message FragmentInstanceExecStatusPB {
// Sequence number prevents out-of-order or duplicated updates from being applied.
optional int64 report_seq_no = 1;
// The ID of the fragment instance which this report contains
optional UniqueIdPB fragment_instance_id = 2;
// If true, fragment finished executing.
optional bool done = 3;
// The current state of this fragment instance's execution.
optional FInstanceExecStatePB current_state = 4;
// Cumulative structural changes made by the table sink of this fragment
// instance. This is sent only when 'done' above is true. Not idempotent.
optional DmlExecStatusPB dml_exec_status = 5;
// The non-idempotent parts of the report, and any prior reports that are not known to
// have been received by the coordinator.
repeated StatefulStatusPB stateful_report = 6;
// Per-node stats required for the exec summary.
repeated ExecSummaryDataPB exec_summary_data = 7;
message FragmentExecStatusPB {
// Ordinal number of fragment in the query.
optional int32 fragment_idx = 1;
// The index of the first instance included in this state report. The serialized
// profile (in a corresponding sidecar) includes the number of instances and the
// profiles for those instances. The instance indexes for a backend are sequential.
optional int32 min_per_fragment_instance_idx = 2;
message ReportExecStatusRequestPB {
// The query id which this report is for.
optional UniqueIdPB query_id = 1;
// same as TExecQueryFInstancesParams.coord_state_idx
optional int32 coord_state_idx = 2;
repeated FragmentInstanceExecStatusPB instance_exec_status = 3;
// Sidecar index of the cumulative profiles in instance_exec_status and
// fragment_exec_status
optional int32 thrift_profiles_sidecar_idx = 4;
// Cumulative status for this backend.
// See QueryState::overall_status for details.
optional StatusPB overall_status = 5;
// The fragment instance id of the first failed fragment instance. This corresponds to
// the fragment which sets 'overall_status' above. Not set if 'overall_status' is a
// general error (e.g. failure to start fragment instances).
optional UniqueIdPB fragment_instance_id = 6;
// Peak memory usage for this query on this backend in bytes.
optional int64 peak_mem_consumption = 7;
// User CPU utilization for the query on this backend in ns.
optional int64 cpu_user_ns = 8;
// System CPU utilization for the query on this backend in ns.
optional int64 cpu_sys_ns = 9;
// Sum of BytesRead counters on this backend.
optional int64 bytes_read = 10;
// Total scan ranges completed on this backend.
optional int64 scan_ranges_complete = 11;
// Total bytes sent by instances that did not contain a scan node.
optional int64 exchange_bytes_sent = 12;
// Total bytes sent by instances that contained a scan node.
optional int64 scan_bytes_sent = 13;
// Aggregated status for all instances of a fragment on a backend. Used when
// TQueryCtx.gen_aggregated_profile is true.
repeated FragmentExecStatusPB fragment_exec_status = 14;
// Sequence number prevents out-of-order or duplicated updates from being applied.
// Incremented for each backend status report. 'fragment_exec_status' should be
// disregarded if the sequence number of this report is less than or equal to
// the previous report received.
optional int64 backend_report_seq_no = 15;
// For each join node, sum of RowsReturned counters on this backend.
map<int32, int64> per_join_rows_produced = 16;
// If true, the executor failed to execute query fragments due to local disk IO
// fatal error, like local storage devices for spilling are corrupted.
optional bool local_disk_faulty = 17 [default = false];
message ReportExecStatusResponsePB {
optional StatusPB status = 1;
message CancelQueryFInstancesRequestPB {
// The query id of the query being cancelled.
optional UniqueIdPB query_id = 1;
message CancelQueryFInstancesResponsePB {
optional StatusPB status = 1;
message RemoteShutdownParamsPB {
// Deadline for the shutdown. After this deadline expires (starting at the time when
// this remote shutdown command is received), the Impala daemon exits immediately
// regardless of whether queries are still executing.
optional int64 deadline_s = 1;
// The current status of a shutdown operation.
message ShutdownStatusPB {
// Milliseconds remaining in startup grace period. 0 if the period has expired.
optional int64 grace_remaining_ms = 1;
// Milliseconds remaining in shutdown deadline. 0 if the deadline has expired.
optional int64 deadline_remaining_ms = 2;
// Number of fragment instances still executing.
optional int64 finstances_executing = 3;
// Number of client requests still registered with the Impala server that is being shut
// down.
optional int64 client_requests_registered = 4;
// Number of queries still executing on backend.
optional int64 backend_queries_executing = 5;
message RemoteShutdownResultPB {
// Success or failure of the operation.
optional StatusPB status = 1;
// If status is OK, additional info about the shutdown status.
optional ShutdownStatusPB shutdown_status = 2;
// Specification of one output destination of a plan fragment
message PlanFragmentDestinationPB {
// The globally unique fragment instance id.
optional UniqueIdPB fragment_instance_id = 1;
// Hostname + port of the KRPC backend service on the destination.
optional NetworkAddressPB address = 2;
// IP address + port of the KRPC backend service on the destination.
optional NetworkAddressPB krpc_backend = 3;
// Context to collect information that is shared among all instances of a particular plan
// fragment. Corresponds to a TPlanFragment with the same idx in the
// TExecPlanFragmentInfo.
message PlanFragmentCtxPB {
// Ordinal number of corresponding fragment in the query.
optional int32 fragment_idx = 1;
// Output destinations, one per output partition. The partitioning of the output is
// specified by TPlanFragment.output_sink.output_partition in the corresponding
// TPlanFragment. The number of output partitions is destinations.size().
repeated PlanFragmentDestinationPB destinations = 2;
// A scan range plus the parameters needed to execute that scan.
message ScanRangeParamsPB {
optional ScanRangePB scan_range = 1;
optional int32 volume_id = 2 [default = -1];
optional bool try_hdfs_cache = 3 [default = false];
optional bool is_remote = 4;
// List of ScanRangeParamsPB. This is needed so that per_node_scan_ranges in
// PlanFragmentInstanceCtxPB can be a map since protobuf doesn't support repeated map
// values.
message ScanRangesPB {
repeated ScanRangeParamsPB scan_ranges = 1;
// Information about the input fragment instance of a join node.
message JoinBuildInputPB {
// The join node id that will consume this join build.
optional int32 join_node_id = 1;
// Fragment instance id of the input fragment instance.
optional UniqueIdPB input_finstance_id = 2;
// Protobuf portion of the execution parameters of a single fragment instance. Every
// fragment instance will also have a corresponding TPlanFragmentInstanceCtx with the same
// fragment_idx.
message PlanFragmentInstanceCtxPB {
// Ordinal number of corresponding fragment in the query.
optional int32 fragment_idx = 1;
// Map from plan node id to initial scan ranges for each scan node in
// TPlanFragment.plan_tree
map<int32, ScanRangesPB> per_node_scan_ranges = 2;
// List of input join build finstances for joins in this finstance.
repeated JoinBuildInputPB join_build_inputs = 3;
// List of host addresses.
message FilepathToHostsListPB {
repeated NetworkAddressPB hosts = 1;
// True if the key in the map for this entry is a relative path, false if it is an
// absolute path.
required bool is_relative = 2;
// File path to a list of host addresses mapping.
message FilepathToHostsMapPB {
map<string, FilepathToHostsListPB> filepath_to_hosts = 1;
// ExecQueryFInstances
message ExecQueryFInstancesRequestPB {
// This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
// the coordinator.
optional int32 coord_state_idx = 1;
// Sidecar index of the TQueryCtx.
optional int32 query_ctx_sidecar_idx = 2;
// Sidecar index of the TExecPlanFragmentInfo.
optional int32 plan_fragment_info_sidecar_idx = 3;
// The minimum query-wide memory reservation (in bytes) required for the backend
// executing the instances in fragment_instance_ctxs. This is the peak minimum
// reservation that may be required by the concurrently-executing operators at any
// point in query execution. It may be less than the initial reservation total claims
// (below) if execution of some operators never overlaps, which allows reuse of
// reservations.
optional int64 min_mem_reservation_bytes = 4;
// Total of the initial buffer reservations that we expect to be claimed on this
// backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
// operators in all fragment instances that execute on this backend. This is used for
// an optimization in InitialReservation. Measured in bytes.
optional int64 initial_mem_reservation_total_claims = 5;
// The backend memory limit (in bytes) as set by the admission controller. Used by the
// query mem tracker to enforce the memory limit.
optional int64 per_backend_mem_limit = 6;
// General execution parameters for different fragments. Corresponds to 'fragments' in
// the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentCtxPB fragment_ctxs = 7;
// Execution parameters for specific fragment instances. Corresponds to
// 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
// Mapping to store which data file is read on which host, grouped by scan node ID.
map<int32, FilepathToHostsMapPB> by_node_filepath_to_hosts = 9;
message ExecQueryFInstancesResponsePB {
// Success or failure of the operation.
optional StatusPB status = 1;
service ControlService {
// Override the default authorization method.
option (kudu.rpc.default_authz_method) = "Authorize";
// Called by coord to start asynchronous execution of a query's fragment instances in
// backend. Returns as soon as all incoming data streams have been set up.
rpc ExecQueryFInstances(ExecQueryFInstancesRequestPB)
returns (ExecQueryFInstancesResponsePB);
// Update the coordinator with the query status of the backend.
rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB);
// Called by coordinator to cancel execution of a single query's fragment instances,
// which the coordinator initiated with a prior call to ExecQueryFInstances.
// Cancellation is asynchronous (in the sense that this call may return before the
// fragment instance has completely stopped executing).
rpc CancelQueryFInstances(CancelQueryFInstancesRequestPB)
returns (CancelQueryFInstancesResponsePB);
// Called to initiate shutdown of this backend.
rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB);