// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package impala;
import "common.proto";
import "kudu/rpc/rpc_header.proto";
message ParquetDmlStatsPB {
// For each column, the on disk byte size
map<string, int64> per_column_size = 1;
message KuduDmlStatsPB {
// The number of reported per-row errors, i.e. this many rows were not modified.
// Note that this aggregate is less useful than a breakdown of the number of errors by
// error type, e.g. number of rows with duplicate key conflicts, number of rows
// violating nullability constraints, etc., but it isn't possible yet to differentiate
// all error types in the KuduTableSink yet.
optional int64 num_row_errors = 1;
// ReportExecStatus
// Per partition DML stats
// TODO: this should include the table stats that we update the metastore with.
message DmlStatsPB {
optional int64 bytes_written = 1;
optional ParquetDmlStatsPB parquet_stats = 2;
optional KuduDmlStatsPB kudu_stats = 3;
// Per-partition statistics and metadata resulting from DML statements.
message DmlPartitionStatusPB {
// The id of the partition written to (may be -1 if the partition is created by this
// query). See THdfsTable.partitions.
optional int64 id = 1;
// The number of rows modified in this partition
optional int64 num_modified_rows = 2;
// Detailed statistics gathered by table writers for this partition
optional DmlStatsPB stats = 3;
// Fully qualified URI to the base directory for this partition.
optional string partition_base_dir = 4;
// The latest observed Kudu timestamp reported by the local KuduSession.
// This value is an unsigned int64.
optional int64 kudu_latest_observed_ts = 5;
// The results of a DML statement, sent to the coordinator as part of
// ReportExecStatusRequestPB
message DmlExecStatusPB {
// A map from temporary absolute file path to final absolute destination. The
// coordinator performs these updates after the query completes.
map<string, string> files_to_move = 1;
// Per-partition details, used in finalization and reporting.
// The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
// root's key in an unpartitioned table being ROOT_PARTITION_KEY.
// The target table name is recorded in the corresponding TQueryExecRequest
map<string, DmlPartitionStatusPB> per_partition_status = 2;
// Error message exchange format
message ErrorLogEntryPB {
// Number of error messages reported using the above identifier
optional int32 count = 1;
// Sample messages from the above error code
repeated string messages = 2;
// Represents the states that a fragment instance goes through during its execution. The
// current state gets sent back to the coordinator and will be presented to users through
// the debug webpages. The states are listed in the order to which they are transitioned.
// Not all states are necessarily transitioned through when there are errors.
enum FInstanceExecStatePB {
// Represents any part of the status report that isn't idempotent. If the executor thinks
// the report failed, we'll retransmit these parts, and this allows us to keep them
// associated with their original sequence number so that if the coordinator actually did
// receive the original report it won't reapply them.
message StatefulStatusPB {
// Sequence number prevents out-of-order or duplicated updates from being applied.
// 'report_seq_no' will be <= the 'report_seq_no' in the FragmentInstanceExecStatusPB
// that contains this StatefulStatusPB.
optional int64 report_seq_no = 1;
// Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
// the coordinator by this fragment instance. Not idempotent.
map<int32, ErrorLogEntryPB> error_log = 2;
message FragmentInstanceExecStatusPB {
// Sequence number prevents out-of-order or duplicated updates from being applied.
optional int64 report_seq_no = 1;
// The ID of the fragment instance which this report contains
optional UniqueIdPB fragment_instance_id = 2;
// If true, fragment finished executing.
optional bool done = 3;
// The current state of this fragment instance's execution.
optional FInstanceExecStatePB current_state = 4;
// Cumulative structural changes made by the table sink of this fragment
// instance. This is sent only when 'done' above is true. Not idempotent.
optional DmlExecStatusPB dml_exec_status = 5;
// The non-idempotent parts of the report, and any prior reports that are not known to
// have been received by the coordinator.
repeated StatefulStatusPB stateful_report = 6;
message ReportExecStatusRequestPB {
// The query id which this report is for.
optional UniqueIdPB query_id = 1;
// same as TExecQueryFInstancesParams.coord_state_idx
optional int32 coord_state_idx = 2;
repeated FragmentInstanceExecStatusPB instance_exec_status = 3;
// Sidecar index of the cumulative profiles of all fragment instances
// in instance_exec_status.
optional int32 thrift_profiles_sidecar_idx = 4;
// Cumulative status for this backend.
// See QueryState::overall_status for details.
optional StatusPB overall_status = 5;
// The fragment instance id of the first failed fragment instance. This corresponds to
// the fragment which sets 'overall_status' above. Not set if 'overall_status' is a
// general error (e.g. failure to start fragment instances).
optional UniqueIdPB fragment_instance_id = 6;
message ReportExecStatusResponsePB {
optional StatusPB status = 1;
message CancelQueryFInstancesRequestPB {
// The query id of the query being cancelled.
optional UniqueIdPB query_id = 1;
message CancelQueryFInstancesResponsePB {
optional StatusPB status = 1;
message RemoteShutdownParamsPB {
// Deadline for the shutdown. After this deadline expires (starting at the time when
// this remote shutdown command is received), the Impala daemon exits immediately
// regardless of whether queries are still executing.
optional int64 deadline_s = 1;
// The current status of a shutdown operation.
message ShutdownStatusPB {
// Milliseconds remaining in startup grace period. 0 if the period has expired.
optional int64 grace_remaining_ms = 1;
// Milliseconds remaining in shutdown deadline. 0 if the deadline has expired.
optional int64 deadline_remaining_ms = 2;
// Number of fragment instances still executing.
optional int64 finstances_executing = 3;
// Number of client requests still registered with the Impala server that is being shut
// down.
optional int64 client_requests_registered = 4;
// Number of queries still executing on backend.
optional int64 backend_queries_executing = 5;
message RemoteShutdownResultPB {
// Success or failure of the operation.
optional StatusPB status = 1;
// If status is OK, additional info about the shutdown status.
optional ShutdownStatusPB shutdown_status = 2;
// ExecQueryFInstances
message ExecQueryFInstancesRequestPB {
// This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
// the coordinator.
optional int32 coord_state_idx = 1;
// Sidecar index of the TQueryCtx.
optional int32 query_ctx_sidecar_idx = 2;
// Sidecar index of the TExecPlanFragmentInfo.
optional int32 plan_fragment_info_sidecar_idx = 3;
// The minimum query-wide memory reservation (in bytes) required for the backend
// executing the instances in fragment_instance_ctxs. This is the peak minimum
// reservation that may be required by the concurrently-executing operators at any
// point in query execution. It may be less than the initial reservation total claims
// (below) if execution of some operators never overlaps, which allows reuse of
// reservations.
optional int64 min_mem_reservation_bytes = 4;
// Total of the initial buffer reservations that we expect to be claimed on this
// backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
// operators in all fragment instances that execute on this backend. This is used for
// an optimization in InitialReservation. Measured in bytes.
optional int64 initial_mem_reservation_total_claims = 5;
// The backend memory limit (in bytes) as set by the admission controller. Used by the
// query mem tracker to enforce the memory limit.
optional int64 per_backend_mem_limit = 6;
message ExecQueryFInstancesResponsePB {
// Success or failure of the operation.
optional StatusPB status = 1;
service ControlService {
// Override the default authorization method.
option (kudu.rpc.default_authz_method) = "Authorize";
// Called by coord to start asynchronous execution of a query's fragment instances in
// backend. Returns as soon as all incoming data streams have been set up.
rpc ExecQueryFInstances(ExecQueryFInstancesRequestPB)
returns (ExecQueryFInstancesResponsePB);
// Update the coordinator with the query status of the backend.
rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB);
// Called by coordinator to cancel execution of a single query's fragment instances,
// which the coordinator initiated with a prior call to ExecQueryFInstances.
// Cancellation is asynchronous (in the sense that this call may return before the
// fragment instance has completely stopped executing).
rpc CancelQueryFInstances(CancelQueryFInstancesRequestPB)
returns (CancelQueryFInstancesResponsePB);
// Called to initiate shutdown of this backend.
rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB);