// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//

syntax="proto2";

package impala;

import "common.proto";

import "kudu/rpc/rpc_header.proto";

message ParquetDmlStatsPB {
  // For each column, the on disk byte size
  map<string, int64> per_column_size = 1;
}

message KuduDmlStatsPB {
  // The number of reported per-row errors, i.e. this many rows were not modified.
  // Note that this aggregate is less useful than a breakdown of the number of errors by
  // error type, e.g. number of rows with duplicate key conflicts, number of rows
  // violating nullability constraints, etc., but it isn't possible yet to differentiate
  // all error types in the KuduTableSink yet.
  optional int64 num_row_errors = 1;
}

// ReportExecStatus

// Per partition DML stats
// TODO: this should include the table stats that we update the metastore with.
message DmlStatsPB {
  optional int64 bytes_written = 1;

  optional ParquetDmlStatsPB parquet_stats = 2;

  optional KuduDmlStatsPB kudu_stats = 3;
}

// Per-partition statistics and metadata resulting from DML statements.
message DmlPartitionStatusPB {
  // The id of the partition written to (may be -1 if the partition is created by this
  // query). See THdfsTable.partitions.
  optional int64 id = 1;

  // The number of rows modified in this partition
  optional int64 num_modified_rows = 2;

  // Detailed statistics gathered by table writers for this partition
  optional DmlStatsPB stats = 3;

  // Fully qualified URI to the base directory for this partition.
  optional string partition_base_dir = 4;

  // The latest observed Kudu timestamp reported by the local KuduSession.
  // This value is an unsigned int64.
  optional int64 kudu_latest_observed_ts = 5;
}

// The results of a DML statement, sent to the coordinator as part of
// ReportExecStatusRequestPB
message DmlExecStatusPB {
  // A map from temporary absolute file path to final absolute destination. The
  // coordinator performs these updates after the query completes.
  map<string, string> files_to_move = 1;

  // Per-partition details, used in finalization and reporting.
  // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
  // root's key in an unpartitioned table being ROOT_PARTITION_KEY.
  // The target table name is recorded in the corresponding TQueryExecRequest
  map<string, DmlPartitionStatusPB> per_partition_status = 2;
}

// Error message exchange format
message ErrorLogEntryPB {
  // Number of error messages reported using the above identifier
  optional int32 count = 1;

  // Sample messages from the above error code
  repeated string messages = 2;
}

// Represents the states that a fragment instance goes through during its execution. The
// current state gets sent back to the coordinator and will be presented to users through
// the debug webpages. The states are listed in the order to which they are transitioned.
// Not all states are necessarily transitioned through when there are errors.
enum FInstanceExecStatePB {
  WAITING_FOR_EXEC = 0;
  WAITING_FOR_PREPARE = 1;
  WAITING_FOR_CODEGEN = 2;
  WAITING_FOR_OPEN = 3;
  WAITING_FOR_FIRST_BATCH = 4;
  FIRST_BATCH_PRODUCED = 5;
  PRODUCING_DATA = 6;
  LAST_BATCH_SENT = 7;
  FINISHED = 8;
}

// Represents any part of the status report that isn't idempotent. If the executor thinks
// the report failed, we'll retransmit these parts, and this allows us to keep them
// associated with their original sequence number so that if the coordinator actually did
// receive the original report it won't reapply them.
message StatefulStatusPB {
  // Sequence number prevents out-of-order or duplicated updates from being applied.
  // 'report_seq_no' will be <= the 'report_seq_no' in the FragmentInstanceExecStatusPB
  // that contains this StatefulStatusPB.
  optional int64 report_seq_no = 1;

  // Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
  // the coordinator by this fragment instance. Not idempotent.
  map<int32, ErrorLogEntryPB> error_log = 2;
}

message FragmentInstanceExecStatusPB {
  // Sequence number prevents out-of-order or duplicated updates from being applied.
  optional int64 report_seq_no = 1;

  // The ID of the fragment instance which this report contains
  optional UniqueIdPB fragment_instance_id = 2;

  // If true, fragment finished executing.
  optional bool done = 3;

  // The current state of this fragment instance's execution.
  optional FInstanceExecStatePB current_state = 4;

  // Cumulative structural changes made by the table sink of this fragment
  // instance. This is sent only when 'done' above is true. Not idempotent.
  optional DmlExecStatusPB dml_exec_status = 5;

  // The non-idempotent parts of the report, and any prior reports that are not known to
  // have been received by the coordinator.
  repeated StatefulStatusPB stateful_report = 6;
}

message ReportExecStatusRequestPB {
  // The query id which this report is for.
  optional UniqueIdPB query_id = 1;

  // same as TExecQueryFInstancesParams.coord_state_idx
  optional int32 coord_state_idx = 2;

  repeated FragmentInstanceExecStatusPB instance_exec_status = 3;

  // Sidecar index of the cumulative profiles of all fragment instances
  // in instance_exec_status.
  optional int32 thrift_profiles_sidecar_idx = 4;

  // Cumulative status for this backend.
  // See QueryState::overall_status for details.
  optional StatusPB overall_status = 5;

  // The fragment instance id of the first failed fragment instance. This corresponds to
  // the fragment which sets 'overall_status' above. Not set if 'overall_status' is a
  // general error (e.g. failure to start fragment instances).
  optional UniqueIdPB fragment_instance_id = 6;
}

message ReportExecStatusResponsePB {
  optional StatusPB status = 1;
}

message CancelQueryFInstancesRequestPB {
  // The query id of the query being cancelled.
  optional UniqueIdPB query_id = 1;
}

message CancelQueryFInstancesResponsePB {
  optional StatusPB status = 1;
}

message RemoteShutdownParamsPB {
  // Deadline for the shutdown. After this deadline expires (starting at the time when
  // this remote shutdown command is received), the Impala daemon exits immediately
  // regardless of whether queries are still executing.
  optional int64 deadline_s = 1;
}

// The current status of a shutdown operation.
message ShutdownStatusPB {
  // Milliseconds remaining in startup grace period. 0 if the period has expired.
  optional int64 grace_remaining_ms = 1;

  // Milliseconds remaining in shutdown deadline. 0 if the deadline has expired.
  optional int64 deadline_remaining_ms = 2;

  // Number of fragment instances still executing.
  optional int64 finstances_executing = 3;

  // Number of client requests still registered with the Impala server that is being shut
  // down.
  optional int64 client_requests_registered = 4;

  // Number of queries still executing on backend.
  optional int64 backend_queries_executing = 5;
}

message RemoteShutdownResultPB {
  // Success or failure of the operation.
  optional StatusPB status = 1;

  // If status is OK, additional info about the shutdown status.
  optional ShutdownStatusPB shutdown_status = 2;
}

// ExecQueryFInstances
message ExecQueryFInstancesRequestPB {
  // This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
  // the coordinator.
  optional int32 coord_state_idx = 1;

  // Sidecar index of the TExecQueryFInstancesSidecar, which contains the query and plan
  // fragment contexts.
  optional int32 sidecar_idx = 2;

  // The minimum query-wide memory reservation (in bytes) required for the backend
  // executing the instances in fragment_instance_ctxs. This is the peak minimum
  // reservation that may be required by the concurrently-executing operators at any
  // point in query execution. It may be less than the initial reservation total claims
  // (below) if execution of some operators never overlaps, which allows reuse of
  // reservations.
  optional int64 min_mem_reservation_bytes = 3;

  // Total of the initial buffer reservations that we expect to be claimed on this
  // backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
  // operators in all fragment instances that execute on this backend. This is used for
  // an optimization in InitialReservation. Measured in bytes.
  optional int64 initial_mem_reservation_total_claims = 4;

  // The backend memory limit (in bytes) as set by the admission controller. Used by the
  // query mem tracker to enforce the memory limit.
  optional int64 per_backend_mem_limit = 5;
}

message ExecQueryFInstancesResponsePB {
  // Success or failure of the operation.
  optional StatusPB status = 1;
}

service ControlService {
  // Override the default authorization method.
  option (kudu.rpc.default_authz_method) = "Authorize";

  // Called by coord to start asynchronous execution of a query's fragment instances in
  // backend. Returns as soon as all incoming data streams have been set up.
  rpc ExecQueryFInstances(ExecQueryFInstancesRequestPB)
      returns (ExecQueryFInstancesResponsePB);

  // Update the coordinator with the query status of the backend.
  rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB);

  // Called by coordinator to cancel execution of a single query's fragment instances,
  // which the coordinator initiated with a prior call to ExecQueryFInstances.
  // Cancellation is asynchronous (in the sense that this call may return before the
  // fragment instance has completely stopped executing).
  rpc CancelQueryFInstances(CancelQueryFInstancesRequestPB)
      returns (CancelQueryFInstancesResponsePB);

  // Called to initiate shutdown of this backend.
  rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB);
}