| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| |
| syntax = "proto2"; |
| |
| package impala; |
| |
| import "common.proto"; |
| import "control_service.proto"; |
| import "statestore_service.proto"; |
| |
| // Execution parameters for a single fragment instance. Used to assemble the |
| // TPlanFragmentInstanceCtx/PlanFragmentInstanceCtxPB. |
| message FInstanceExecParamsPB { |
| // The fragment instance id. |
| optional UniqueIdPB instance_id = 1; |
| |
| // Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx. |
| optional int32 fragment_idx = 2; |
| |
| // Map from plan node id to a list of scan ranges. |
| map<int32, ScanRangesPB> per_node_scan_ranges = 5; |
| |
| // 0-based ordinal number of this particular instance. This is within its fragment, not |
| // query-wide, so eg. there will be one instance '0' for each fragment. |
| optional int32 per_fragment_instance_idx = 6; |
| |
| // In its role as a data sender, a fragment instance is assigned a "sender id" to |
| // uniquely identify it to a receiver. -1 = invalid. |
| optional int32 sender_id = 7 [default = -1]; |
| |
| // List of input join build finstances for joins in this finstance. |
| repeated JoinBuildInputPB join_build_inputs = 8; |
| |
| // If this is a join build fragment, the number of fragment instances that consume the |
| // join build. -1 = invalid. |
| optional int32 num_join_build_outputs = 9 [default = -1]; |
| } |
| |
| // Execution parameters for a single backend. Used to construct the |
| // Coordinator::BackendStates. |
| message BackendExecParamsPB { |
| // The id of this backend. |
| optional UniqueIdPB backend_id = 1; |
| |
| // The hostname + port of the KRPC backend service on this backend. |
| optional NetworkAddressPB address = 8; |
| |
| // The IP address + port of the KRPC backend service on this backend. |
| optional NetworkAddressPB krpc_address = 9; |
| |
| // The fragment instance params assigned to this backend. All instances of a |
| // particular fragment are contiguous in this list. This can be empty only for the |
| // coordinator backend, that is, if 'is_coord_backend' is true. |
| repeated FInstanceExecParamsPB instance_params = 2; |
| |
| // The minimum query-wide buffer reservation size (in bytes) required for this backend. |
| // This is the peak minimum reservation that may be required by the |
| // concurrently-executing operators at any point in query execution. It may be less |
| // than the initial reservation total claims (below) if execution of some operators |
| // never overlaps, which allows reuse of reservations. |
| optional int64 min_mem_reservation_bytes = 3; |
| |
| // Total of the initial buffer reservations that we expect to be claimed on this |
| // backend for all fragment instances in instance_params. I.e. the sum over all |
| // operators in all fragment instances that execute on this backend. This is used for |
| // an optimization in InitialReservation. Measured in bytes. |
| optional int64 initial_mem_reservation_total_claims = 4; |
| |
| // Total thread reservation for fragment instances scheduled on this backend. This is |
| // the peak number of required threads that may be required by the |
| // concurrently-executing fragment instances at any point in query execution. |
| optional int64 thread_reservation = 5; |
| |
| // Number of slots that this query should count for in admission control. |
| // This is calculated as the maximum # of instances of any fragment on this backend. |
| // I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified |
| // (but less if the query is not actually running with mt_dop instances on this node). |
| optional int32 slots_to_use = 6; |
| |
| // Indicates whether this backend is the coordinator. |
| optional bool is_coord_backend = 7; |
| } |
| |
| // Information about selected backend that designated as runtime filter preaggregator |
| // (before final aggregation in the coordinator) and fragment instances that must send |
| // filter update to them. This is populated by Scheduler::ComputeRandomKrpcForAggregation |
| // only for fragment having partitioned join that produce bloom filter and only if num |
| // backend executor (excluding coordinator) is at least 2x num aggregator (which default |
| // to 2). |
| message RuntimeFilterAggregatorInfoPB { |
| // Number of aggregator. |
| required int32 num_aggregators = 1; |
| |
| // hostname:port of designated aggregators. |
| repeated NetworkAddressPB aggregator_krpc_addresses = 2; |
| |
| // ip:port of designated aggregators. |
| repeated NetworkAddressPB aggregator_krpc_backends = 3; |
| |
| // Number of backend executor that report to each designated aggregator + 1 |
| // (including itself). |
| repeated int32 num_reporter_per_aggregator = 4; |
| |
| // Size must be equal to the size of FragmentExecParamsPB.instances(). |
| repeated int32 aggregator_idx_to_report = 5; |
| } |
| |
| // Execution parameters shared between fragment instances |
| message FragmentExecParamsPB { |
| // Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx. |
| optional int32 fragment_idx = 1; |
| |
| // Output destinations of this fragment. |
| repeated PlanFragmentDestinationPB destinations = 2; |
| |
| // map from node id to the number of senders (node id expected to be for an |
| // ExchangeNode) |
| map<int32, int32> per_exch_num_senders = 3; |
| |
| // List of fragment instance ids for all instances of this fragment. |
| repeated UniqueIdPB instances = 4; |
| |
| // Total number of backends this fragment is scheduled on. Note that this represents |
| // the number of individual impalads, not the number of physical hosts. |
| optional int32 num_hosts = 5; |
| |
| optional RuntimeFilterAggregatorInfoPB filter_agg_info = 6; |
| } |
| |
| // Contains the output from scheduling and admission control that is used by the |
| // coordinator to start query execution. |
| message QuerySchedulePB { |
| optional UniqueIdPB query_id = 1; |
| |
| // The per-fragment execution parameters for this schedule. |
| repeated FragmentExecParamsPB fragment_exec_params = 2; |
| |
| // The per-backend execution parameters for this schedule. |
| repeated BackendExecParamsPB backend_exec_params = 3; |
| |
| // Total number of scan ranges of this query. |
| optional int64 num_scan_ranges = 4; |
| |
| // The memory limit per executor that will be imposed on the query. |
| // Set by the admission controller with a value that is only valid if it was admitted |
| // successfully. -1 means no limit. |
| optional int64 per_backend_mem_limit = 5; |
| |
| // The per executor memory used for admission accounting. |
| // Set by the admission controller with a value that is only valid if it was admitted |
| // successfully. Can be zero if the query is only scheduled to run on the coordinator. |
| optional int64 per_backend_mem_to_admit = 6; |
| |
| // The memory limit for the coordinator that will be imposed on the query. Used only if |
| // the query has a coordinator fragment. |
| // Set by the admission controller with a value that is only valid if it was admitted |
| // successfully. -1 means no limit. |
| optional int64 coord_backend_mem_limit = 7; |
| |
| // The coordinator memory used for admission accounting. |
| // Set by the admission controller with a value that is only valid if it was admitted |
| // successfully. |
| optional int64 coord_backend_mem_to_admit = 8; |
| |
| /// The cluster wide estimated memory usage of this query. |
| optional int64 cluster_mem_est = 9; |
| |
| // Mapping to store which data file is read on which hosts, grouped by scan node ID. |
| map<int32, FilepathToHostsMapPB> by_node_filepath_to_hosts = 10; |
| } |
| |
| message AdmitQueryRequestPB { |
| optional UniqueIdPB query_id = 1; |
| |
| // The BackendId of the coordinator for this query. |
| optional UniqueIdPB coord_id = 2; |
| |
| // Idx of the TQueryExecRequest sidecar. |
| optional int32 query_exec_request_sidecar_idx = 3; |
| |
| // List of backends this query should not be scheduled on. |
| repeated NetworkAddressPB blacklisted_executor_addresses = 4; |
| } |
| |
| message AdmitQueryResponsePB { |
| // Ok if the request was successfully handed off to the admission thread pool for |
| // processing |
| optional StatusPB status = 1; |
| } |
| |
| message GetQueryStatusRequestPB { |
| optional UniqueIdPB query_id = 1; |
| } |
| |
| message GetQueryStatusResponsePB { |
| // Error if the query was rejected or retrieving the status failed. |
| optional StatusPB status = 1; |
| |
| // The results of scheduling and admisison control. WIll only be set if admission was |
| // successful and the query has not yet been released. |
| optional QuerySchedulePB query_schedule = 2; |
| |
| // Idx of the TRuntimeProfileTree sidecar. |
| optional int32 summary_profile_sidecar_idx = 3; |
| |
| // Start time of the query queuing, in Unix milliseconds. |
| optional int64 wait_start_time_ms = 4; |
| |
| // End time of the query queuing, in Unix milliseconds. |
| optional int64 wait_end_time_ms = 5; |
| } |
| |
| message ReleaseQueryRequestPB { |
| optional UniqueIdPB query_id = 1; |
| |
| // Corresponds to the 'peak_mem_consumption' parameter of |
| // AdmissionController::ReleaseQuery() |
| optional int64 peak_mem_consumption = 3; |
| } |
| |
| message ReleaseQueryResponsePB { |
| optional StatusPB status = 1; |
| } |
| |
| message ReleaseQueryBackendsRequestPB { |
| optional UniqueIdPB query_id = 1; |
| |
| // List of backends that have completed. The resources for this query on these backends |
| // will be released. |
| repeated NetworkAddressPB host_addr = 2; |
| } |
| |
| message ReleaseQueryBackendsResponsePB { |
| optional StatusPB status = 1; |
| } |
| |
| message CancelAdmissionRequestPB { |
| optional UniqueIdPB query_id = 1; |
| } |
| |
| message CancelAdmissionResponsePB { |
| optional StatusPB status = 1; |
| } |
| |
| message AdmissionHeartbeatRequestPB { |
| // The backend id for the coordinator sending this heartbeat. |
| optional UniqueIdPB host_id = 1; |
| |
| // The version number of this heartbeat. Incremented every time a new heartbeat is sent. |
| optional int64 version = 2; |
| |
| // A list of all queries registered at this coordinator. |
| repeated UniqueIdPB query_ids = 3; |
| } |
| |
| message AdmissionHeartbeatResponsePB { |
| optional StatusPB status = 1; |
| } |
| |
| service AdmissionControlService { |
| /// Called by the coordinator to start scheduling. The actual work is done on a thread |
| /// pool, so this call returns immedately. Idempotent - if the query has already been |
| /// submitted previously, returns OK without doing anything. TODO: there are some |
| /// situations where we can return the admission result quickly, eg. if the query is |
| /// rejected. We should evaluate the benefits of saving a call to GetQueryStatus() in |
| /// those situations. |
| rpc AdmitQuery(AdmitQueryRequestPB) returns (AdmitQueryResponsePB); |
| |
| /// Called by the coordinator after AdmitQuery() to monitor the admission status of the |
| /// query. The call will block for a configurable amount of time before returning. This |
| /// call is idempotent and will return the schedule on each call between successful |
| /// admission and the query getting released. |
| rpc GetQueryStatus(GetQueryStatusRequestPB) returns (GetQueryStatusResponsePB); |
| |
| /// Called by the coordinator when the query has completely finished, releases all |
| /// remaining resources. |
| rpc ReleaseQuery(ReleaseQueryRequestPB) returns (ReleaseQueryResponsePB); |
| |
| /// Called after individual backends have finished to release their resources while |
| /// other backends are running. Due to the use of Coordinator::BackendResourceState, |
| /// this will be called a max of log(# of backends) times per query. TODO: we can save |
| /// an rpc if we combine the release of the final batch of backends with the call to |
| /// ReleaseQuery. |
| rpc ReleaseQueryBackends(ReleaseQueryBackendsRequestPB) |
| returns (ReleaseQueryBackendsResponsePB); |
| |
| /// Called by the coordinator to cancel scheduling of a query for which GetQueryStatus |
| /// has not yet returned a schedule. |
| rpc CancelAdmission(CancelAdmissionRequestPB) returns (CancelAdmissionResponsePB); |
| |
| /// Used to ensure that the admission service and coordinator have a consistent view of |
| /// what resources are being used even in the face of possible rpc failures. |
| /// Periodically called by each coordinator with a list of query ids for all queries at |
| /// that coordinator. If the admissiond has resources allocated to a query that is not |
| /// included in the list, it assumes the query has completed and releases it's remaining |
| /// resources. Stale heartbeat messages are ignored. |
| rpc AdmissionHeartbeat(AdmissionHeartbeatRequestPB) |
| returns (AdmissionHeartbeatResponsePB); |
| } |