blob: ee4cdb4e2f6fc1d7fdb14da10bf210c1937d256a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
syntax = "proto2";
package impala;
import "common.proto";
import "control_service.proto";
import "statestore_service.proto";
// Execution parameters for a single fragment instance. Used to assemble the
// TPlanFragmentInstanceCtx/PlanFragmentInstanceCtxPB.
message FInstanceExecParamsPB {
// The fragment instance id.
optional UniqueIdPB instance_id = 1;
// Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
optional int32 fragment_idx = 2;
// Map from plan node id to a list of scan ranges.
map<int32, ScanRangesPB> per_node_scan_ranges = 5;
// 0-based ordinal number of this particular instance. This is within its fragment, not
// query-wide, so eg. there will be one instance '0' for each fragment.
optional int32 per_fragment_instance_idx = 6;
// In its role as a data sender, a fragment instance is assigned a "sender id" to
// uniquely identify it to a receiver. -1 = invalid.
optional int32 sender_id = 7 [default = -1];
// List of input join build finstances for joins in this finstance.
repeated JoinBuildInputPB join_build_inputs = 8;
// If this is a join build fragment, the number of fragment instances that consume the
// join build. -1 = invalid.
optional int32 num_join_build_outputs = 9 [default = -1];
}
// Execution parameters for a single backend. Used to construct the
// Coordinator::BackendStates.
message BackendExecParamsPB {
// The id of this backend.
optional UniqueIdPB backend_id = 1;
// The hostname + port of the KRPC backend service on this backend.
optional NetworkAddressPB address = 8;
// The IP address + port of the KRPC backend service on this backend.
optional NetworkAddressPB krpc_address = 9;
// The fragment instance params assigned to this backend. All instances of a
// particular fragment are contiguous in this list. This can be empty only for the
// coordinator backend, that is, if 'is_coord_backend' is true.
repeated FInstanceExecParamsPB instance_params = 2;
// The minimum query-wide buffer reservation size (in bytes) required for this backend.
// This is the peak minimum reservation that may be required by the
// concurrently-executing operators at any point in query execution. It may be less
// than the initial reservation total claims (below) if execution of some operators
// never overlaps, which allows reuse of reservations.
optional int64 min_mem_reservation_bytes = 3;
// Total of the initial buffer reservations that we expect to be claimed on this
// backend for all fragment instances in instance_params. I.e. the sum over all
// operators in all fragment instances that execute on this backend. This is used for
// an optimization in InitialReservation. Measured in bytes.
optional int64 initial_mem_reservation_total_claims = 4;
// Total thread reservation for fragment instances scheduled on this backend. This is
// the peak number of required threads that may be required by the
// concurrently-executing fragment instances at any point in query execution.
optional int64 thread_reservation = 5;
// Number of slots that this query should count for in admission control.
// This is calculated as the maximum # of instances of any fragment on this backend.
// I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified
// (but less if the query is not actually running with mt_dop instances on this node).
optional int32 slots_to_use = 6;
// Indicates whether this backend is the coordinator.
optional bool is_coord_backend = 7;
}
// Execution parameters shared between fragment instances
message FragmentExecParamsPB {
// Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
optional int32 fragment_idx = 1;
// Output destinations of this fragment.
repeated PlanFragmentDestinationPB destinations = 2;
// map from node id to the number of senders (node id expected to be for an
// ExchangeNode)
map<int32, int32> per_exch_num_senders = 3;
// List of fragment instance ids for all instances of this fragment.
repeated UniqueIdPB instances = 4;
// Total number of backends this fragment is scheduled on. Note that this represents
// the number of individual impalads, not the number of physical hosts.
optional int32 num_hosts = 5;
}
// Contains the output from scheduling and admission control that is used by the
// coordinator to start query execution.
message QuerySchedulePB {
optional UniqueIdPB query_id = 1;
// The per-fragment execution parameters for this schedule.
repeated FragmentExecParamsPB fragment_exec_params = 2;
// The per-backend execution parameters for this schedule.
repeated BackendExecParamsPB backend_exec_params = 3;
// Total number of scan ranges of this query.
optional int64 num_scan_ranges = 4;
// The memory limit per executor that will be imposed on the query.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. -1 means no limit.
optional int64 per_backend_mem_limit = 5;
// The per executor memory used for admission accounting.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. Can be zero if the query is only scheduled to run on the coordinator.
optional int64 per_backend_mem_to_admit = 6;
// The memory limit for the coordinator that will be imposed on the query. Used only if
// the query has a coordinator fragment.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully. -1 means no limit.
optional int64 coord_backend_mem_limit = 7;
// The coordinator memory used for admission accounting.
// Set by the admission controller with a value that is only valid if it was admitted
// successfully.
optional int64 coord_backend_mem_to_admit = 8;
}
message AdmitQueryRequestPB {
optional UniqueIdPB query_id = 1;
// The BackendId of the coordinator for this query.
optional UniqueIdPB coord_id = 2;
// Idx of the TQueryExecRequest sidecar.
optional int32 query_exec_request_sidecar_idx = 3;
// List of backends this query should not be scheduled on.
repeated NetworkAddressPB blacklisted_executor_addresses = 4;
}
message AdmitQueryResponsePB {
// Ok if the request was successfully handed off to the admission thread pool for
// processing
optional StatusPB status = 1;
}
message GetQueryStatusRequestPB {
optional UniqueIdPB query_id = 1;
}
message GetQueryStatusResponsePB {
// Error if the query was rejected or retrieving the status failed.
optional StatusPB status = 1;
// The results of scheduling and admisison control. WIll only be set if admission was
// successful and the query has not yet been released.
optional QuerySchedulePB query_schedule = 2;
// Idx of the TRuntimeProfileTree sidecar.
optional int32 summary_profile_sidecar_idx = 3;
}
message ReleaseQueryRequestPB {
optional UniqueIdPB query_id = 1;
// Corresponds to the 'peak_mem_consumption' parameter of
// AdmissionController::ReleaseQuery()
optional int64 peak_mem_consumption = 3;
}
message ReleaseQueryResponsePB {
optional StatusPB status = 1;
}
message ReleaseQueryBackendsRequestPB {
optional UniqueIdPB query_id = 1;
// List of backends that have completed. The resources for this query on these backends
// will be released.
repeated NetworkAddressPB host_addr = 2;
}
message ReleaseQueryBackendsResponsePB {
optional StatusPB status = 1;
}
message CancelAdmissionRequestPB {
optional UniqueIdPB query_id = 1;
}
message CancelAdmissionResponsePB {
optional StatusPB status = 1;
}
service AdmissionControlService {
/// Called by the coordinator to start scheduling. The actual work is done on a thread
/// pool, so this call returns immedately. TODO: there are some situations where we can
/// return the admission result quickly, eg. if the query is rejected. We should
/// evaluate the benefits of saving a call to GetQueryStatus() in those situations.
rpc AdmitQuery(AdmitQueryRequestPB) returns (AdmitQueryResponsePB);
/// Called by the coordinator after AdmitQuery() to monitor the admission status of the
/// query. The call will block for a configurable amount of time before returning. This
/// call is idempotent and will return the schedule on each call between successful
/// admission and the query getting released.
rpc GetQueryStatus(GetQueryStatusRequestPB) returns (GetQueryStatusResponsePB);
/// Called by the coordinator when the query has completely finished, releases all
/// remaining resources.
rpc ReleaseQuery(ReleaseQueryRequestPB) returns (ReleaseQueryResponsePB);
/// Called after individual backends have finished to release their resources while
/// other backends are running. Due to the use of Coordinator::BackendResourceState,
/// this will be called a max of log(# of backends) times per query. TODO: we can save
/// an rpc if we combine the release of the final batch of backends with the call to
/// ReleaseQuery.
rpc ReleaseQueryBackends(ReleaseQueryBackendsRequestPB)
returns (ReleaseQueryBackendsResponsePB);
/// Called by the coordinator to cancel scheduling of a query for which GetQueryStatus
/// has not yet returned a schedule.
rpc CancelAdmission(CancelAdmissionRequestPB) returns (CancelAdmissionResponsePB);
}