| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| |
| syntax="proto2"; |
| |
| package impala; |
| |
| import "common.proto"; |
| import "planner.proto"; |
| |
| import "kudu/rpc/rpc_header.proto"; |
| |
| message ParquetDmlStatsPB { |
| // For each column, the on disk byte size |
| map<string, int64> per_column_size = 1; |
| } |
| |
| message KuduDmlStatsPB { |
| // The number of reported per-row errors, i.e. this many rows were not modified. |
| // Note that this aggregate is less useful than a breakdown of the number of errors by |
| // error type, e.g. number of rows with duplicate key conflicts, number of rows |
| // violating nullability constraints, etc., but it isn't possible yet to differentiate |
| // all error types in the KuduTableSink yet. |
| optional int64 num_row_errors = 1; |
| } |
| |
| // ReportExecStatus |
| |
| // Per partition DML stats |
| // TODO: this should include the table stats that we update the metastore with. |
| message DmlStatsPB { |
| optional int64 bytes_written = 1; |
| |
| optional ParquetDmlStatsPB parquet_stats = 2; |
| |
| optional KuduDmlStatsPB kudu_stats = 3; |
| } |
| |
| // Per-partition statistics and metadata resulting from DML statements. |
| message DmlPartitionStatusPB { |
| // The id of the partition written to (may be -1 if the partition is created by this |
| // query). See THdfsTable.partitions. |
| optional int64 id = 1; |
| |
| // The number of rows modified in this partition |
| optional int64 num_modified_rows = 2; |
| |
| // Detailed statistics gathered by table writers for this partition |
| optional DmlStatsPB stats = 3; |
| |
| // Fully qualified URI to the base directory for this partition. |
| optional string partition_base_dir = 4; |
| |
| // The latest observed Kudu timestamp reported by the local KuduSession. |
| // This value is an unsigned int64. |
| optional int64 kudu_latest_observed_ts = 5; |
| } |
| |
| // The results of a DML statement, sent to the coordinator as part of |
| // ReportExecStatusRequestPB |
| message DmlExecStatusPB { |
| // A map from temporary absolute file path to final absolute destination. The |
| // coordinator performs these updates after the query completes. |
| map<string, string> files_to_move = 1; |
| |
| // Per-partition details, used in finalization and reporting. |
| // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the |
| // root's key in an unpartitioned table being ROOT_PARTITION_KEY. |
| // The target table name is recorded in the corresponding TQueryExecRequest |
| map<string, DmlPartitionStatusPB> per_partition_status = 2; |
| } |
| |
| // Error message exchange format |
| message ErrorLogEntryPB { |
| // Number of error messages reported using the above identifier |
| optional int32 count = 1; |
| |
| // Sample messages from the above error code |
| repeated string messages = 2; |
| } |
| |
| // Represents the states that a fragment instance goes through during its execution. The |
| // current state gets sent back to the coordinator and will be presented to users through |
| // the debug webpages. The states are listed in the order to which they are transitioned. |
| // Not all states are necessarily transitioned through when there are errors. |
| enum FInstanceExecStatePB { |
| WAITING_FOR_EXEC = 0; |
| WAITING_FOR_PREPARE = 1; |
| WAITING_FOR_CODEGEN = 2; |
| WAITING_FOR_OPEN = 3; |
| WAITING_FOR_FIRST_BATCH = 4; |
| FIRST_BATCH_PRODUCED = 5; |
| PRODUCING_DATA = 6; |
| LAST_BATCH_SENT = 7; |
| FINISHED = 8; |
| } |
| |
| // Represents any part of the status report that isn't idempotent. If the executor thinks |
| // the report failed, we'll retransmit these parts, and this allows us to keep them |
| // associated with their original sequence number so that if the coordinator actually did |
| // receive the original report it won't reapply them. |
| message StatefulStatusPB { |
| // Sequence number prevents out-of-order or duplicated updates from being applied. |
| // 'report_seq_no' will be <= the 'report_seq_no' in the FragmentInstanceExecStatusPB |
| // that contains this StatefulStatusPB. |
| optional int64 report_seq_no = 1; |
| |
| // Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to |
| // the coordinator by this fragment instance. Not idempotent. |
| map<int32, ErrorLogEntryPB> error_log = 2; |
| |
| // Metadata associated with a failed fragment instance. Only set for failed fragment |
| // instances. |
| optional AuxErrorInfoPB aux_error_info = 3; |
| } |
| |
| // Per-node stats required for the exec summary. |
| message ExecSummaryDataPB { |
| // Plan node ID, set if this is for a PlanNode. |
| optional int32 plan_node_id = 1; |
| |
| // Plan node ID, set if this is for a DataSink. |
| optional int32 data_sink_id = 2; |
| |
| // Rows returned from this node, if this is a PlanNode. |
| optional int64 rows_returned = 3; |
| |
| // Peak memory usage in bytes of this PlanNode or DataSink. |
| optional int64 peak_mem_usage = 4; |
| |
| // Local time in nanoseconds spent in this plan node. |
| optional int64 local_time_ns = 5; |
| } |
| |
| // RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a |
| // RPC to another node failed. |
| message RPCErrorInfoPB { |
| // The address of the RPC's target node. |
| required NetworkAddressPB dest_node = 1; |
| |
| // The posix error code of the failed RPC. |
| required int32 posix_error_code = 2; |
| } |
| |
| // Error metadata that can be associated with a failed fragment instance. Used to store |
| // extra info about errors encountered during fragment execution. This information is |
| // used by the Coordinator to blacklist potentially unhealthy nodes. |
| message AuxErrorInfoPB { |
| // Set if the fragment instance failed because a RPC to another node failed. Only set |
| // if the RPC failed due to a network error. |
| optional RPCErrorInfoPB rpc_error_info = 1; |
| } |
| |
| message FragmentInstanceExecStatusPB { |
| // Sequence number prevents out-of-order or duplicated updates from being applied. |
| optional int64 report_seq_no = 1; |
| |
| // The ID of the fragment instance which this report contains |
| optional UniqueIdPB fragment_instance_id = 2; |
| |
| // If true, fragment finished executing. |
| optional bool done = 3; |
| |
| // The current state of this fragment instance's execution. |
| optional FInstanceExecStatePB current_state = 4; |
| |
| // Cumulative structural changes made by the table sink of this fragment |
| // instance. This is sent only when 'done' above is true. Not idempotent. |
| optional DmlExecStatusPB dml_exec_status = 5; |
| |
| // The non-idempotent parts of the report, and any prior reports that are not known to |
| // have been received by the coordinator. |
| repeated StatefulStatusPB stateful_report = 6; |
| |
| // Per-node stats required for the exec summary. |
| repeated ExecSummaryDataPB exec_summary_data = 7; |
| } |
| |
| message ReportExecStatusRequestPB { |
| // The query id which this report is for. |
| optional UniqueIdPB query_id = 1; |
| |
| // same as TExecQueryFInstancesParams.coord_state_idx |
| optional int32 coord_state_idx = 2; |
| |
| repeated FragmentInstanceExecStatusPB instance_exec_status = 3; |
| |
| // Sidecar index of the cumulative profiles of all fragment instances |
| // in instance_exec_status. |
| optional int32 thrift_profiles_sidecar_idx = 4; |
| |
| // Cumulative status for this backend. |
| // See QueryState::overall_status for details. |
| optional StatusPB overall_status = 5; |
| |
| // The fragment instance id of the first failed fragment instance. This corresponds to |
| // the fragment which sets 'overall_status' above. Not set if 'overall_status' is a |
| // general error (e.g. failure to start fragment instances). |
| optional UniqueIdPB fragment_instance_id = 6; |
| |
| // Peak memory usage for this query on this backend in bytes. |
| optional int64 peak_mem_consumption = 7; |
| |
| // User CPU utilization for the query on this backend in ns. |
| optional int64 cpu_user_ns = 8; |
| |
| // System CPU utilization for the query on this backend in ns. |
| optional int64 cpu_sys_ns = 9; |
| |
| // Sum of BytesRead counters on this backend. |
| optional int64 bytes_read = 10; |
| |
| // Total scan ranges completed on this backend. |
| optional int64 scan_ranges_complete = 11; |
| |
| // Total bytes sent by instances that did not contain a scan node. |
| optional int64 exchange_bytes_sent = 12; |
| |
| // Total bytes sent by instances that contained a scan node. |
| optional int64 scan_bytes_sent = 13; |
| |
| } |
| |
| message ReportExecStatusResponsePB { |
| optional StatusPB status = 1; |
| } |
| |
| message CancelQueryFInstancesRequestPB { |
| // The query id of the query being cancelled. |
| optional UniqueIdPB query_id = 1; |
| } |
| |
| message CancelQueryFInstancesResponsePB { |
| optional StatusPB status = 1; |
| } |
| |
| message RemoteShutdownParamsPB { |
| // Deadline for the shutdown. After this deadline expires (starting at the time when |
| // this remote shutdown command is received), the Impala daemon exits immediately |
| // regardless of whether queries are still executing. |
| optional int64 deadline_s = 1; |
| } |
| |
| // The current status of a shutdown operation. |
| message ShutdownStatusPB { |
| // Milliseconds remaining in startup grace period. 0 if the period has expired. |
| optional int64 grace_remaining_ms = 1; |
| |
| // Milliseconds remaining in shutdown deadline. 0 if the deadline has expired. |
| optional int64 deadline_remaining_ms = 2; |
| |
| // Number of fragment instances still executing. |
| optional int64 finstances_executing = 3; |
| |
| // Number of client requests still registered with the Impala server that is being shut |
| // down. |
| optional int64 client_requests_registered = 4; |
| |
| // Number of queries still executing on backend. |
| optional int64 backend_queries_executing = 5; |
| } |
| |
| message RemoteShutdownResultPB { |
| // Success or failure of the operation. |
| optional StatusPB status = 1; |
| |
| // If status is OK, additional info about the shutdown status. |
| optional ShutdownStatusPB shutdown_status = 2; |
| } |
| |
| // Specification of one output destination of a plan fragment |
| message PlanFragmentDestinationPB { |
| // The globally unique fragment instance id. |
| optional UniqueIdPB fragment_instance_id = 1; |
| |
| // Hostname + port of the Thrift based ImpalaInteralService on the destination. |
| optional NetworkAddressPB thrift_backend = 2; |
| |
| // IP address + port of the KRPC based ImpalaInternalService on the destination. |
| optional NetworkAddressPB krpc_backend = 3; |
| } |
| |
| // Context to collect information that is shared among all instances of a particular plan |
| // fragment. Corresponds to a TPlanFragment with the same idx in the |
| // TExecPlanFragmentInfo. |
| message PlanFragmentCtxPB { |
| // Ordinal number of corresponding fragment in the query. |
| optional int32 fragment_idx = 1; |
| |
| // Output destinations, one per output partition. The partitioning of the output is |
| // specified by TPlanFragment.output_sink.output_partition in the corresponding |
| // TPlanFragment. The number of output partitions is destinations.size(). |
| repeated PlanFragmentDestinationPB destinations = 2; |
| } |
| |
| // A scan range plus the parameters needed to execute that scan. |
| message ScanRangeParamsPB { |
| optional ScanRangePB scan_range = 1; |
| optional int32 volume_id = 2 [default = -1]; |
| optional bool try_hdfs_cache = 3 [default = false]; |
| optional bool is_remote = 4; |
| } |
| |
| // List of ScanRangeParamsPB. This is needed so that per_node_scan_ranges in |
| // PlanFragmentInstanceCtxPB can be a map since protobuf doesn't support repeated map |
| // values. |
| message ScanRangesPB { |
| repeated ScanRangeParamsPB scan_ranges = 1; |
| } |
| |
| // Information about the input fragment instance of a join node. |
| message JoinBuildInputPB { |
| // The join node id that will consume this join build. |
| optional int32 join_node_id = 1; |
| |
| // Fragment instance id of the input fragment instance. |
| optional UniqueIdPB input_finstance_id = 2; |
| } |
| |
| // Protobuf portion of the execution parameters of a single fragment instance. Every |
| // fragment instance will also have a corresponding TPlanFragmentInstanceCtx with the same |
| // fragment_idx. |
| message PlanFragmentInstanceCtxPB { |
| // Ordinal number of corresponding fragment in the query. |
| optional int32 fragment_idx = 1; |
| |
| // Map from plan node id to initial scan ranges for each scan node in |
| // TPlanFragment.plan_tree |
| map<int32, ScanRangesPB> per_node_scan_ranges = 2; |
| |
| // List of input join build finstances for joins in this finstance. |
| repeated JoinBuildInputPB join_build_inputs = 3; |
| } |
| |
| // ExecQueryFInstances |
| message ExecQueryFInstancesRequestPB { |
| // This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to |
| // the coordinator. |
| optional int32 coord_state_idx = 1; |
| |
| // Sidecar index of the TQueryCtx. |
| optional int32 query_ctx_sidecar_idx = 2; |
| |
| // Sidecar index of the TExecPlanFragmentInfo. |
| optional int32 plan_fragment_info_sidecar_idx = 3; |
| |
| // The minimum query-wide memory reservation (in bytes) required for the backend |
| // executing the instances in fragment_instance_ctxs. This is the peak minimum |
| // reservation that may be required by the concurrently-executing operators at any |
| // point in query execution. It may be less than the initial reservation total claims |
| // (below) if execution of some operators never overlaps, which allows reuse of |
| // reservations. |
| optional int64 min_mem_reservation_bytes = 4; |
| |
| // Total of the initial buffer reservations that we expect to be claimed on this |
| // backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all |
| // operators in all fragment instances that execute on this backend. This is used for |
| // an optimization in InitialReservation. Measured in bytes. |
| optional int64 initial_mem_reservation_total_claims = 5; |
| |
| // The backend memory limit (in bytes) as set by the admission controller. Used by the |
| // query mem tracker to enforce the memory limit. |
| optional int64 per_backend_mem_limit = 6; |
| |
| // General execution parameters for different fragments. Corresponds to 'fragments' in |
| // the TExecPlanFragmentInfo sidecar. |
| repeated PlanFragmentCtxPB fragment_ctxs = 7; |
| |
| // Execution parameters for specific fragment instances. Corresponds to |
| // 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar. |
| repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8; |
| |
| // The Backend ID of the coordinator. |
| optional UniqueIdPB coord_backend_id = 9; |
| } |
| |
| message ExecQueryFInstancesResponsePB { |
| // Success or failure of the operation. |
| optional StatusPB status = 1; |
| } |
| |
| service ControlService { |
| // Override the default authorization method. |
| option (kudu.rpc.default_authz_method) = "Authorize"; |
| |
| // Called by coord to start asynchronous execution of a query's fragment instances in |
| // backend. Returns as soon as all incoming data streams have been set up. |
| rpc ExecQueryFInstances(ExecQueryFInstancesRequestPB) |
| returns (ExecQueryFInstancesResponsePB); |
| |
| // Update the coordinator with the query status of the backend. |
| rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB); |
| |
| // Called by coordinator to cancel execution of a single query's fragment instances, |
| // which the coordinator initiated with a prior call to ExecQueryFInstances. |
| // Cancellation is asynchronous (in the sense that this call may return before the |
| // fragment instance has completely stopped executing). |
| rpc CancelQueryFInstances(CancelQueryFInstancesRequestPB) |
| returns (CancelQueryFInstancesResponsePB); |
| |
| // Called to initiate shutdown of this backend. |
| rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB); |
| } |