blob: f84e33e632fcc5fdbd97406be7d51c4769b47599 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = 'proto3';
import "google/protobuf/any.proto";
import "spark/connect/common.proto";
import "spark/connect/expressions.proto";
import "spark/connect/relations.proto";
package spark.connect;
option java_multiple_files = true;
option java_package = "org.apache.kyuubi.shaded.spark.connect.proto";
option go_package = "internal/generated";
// A [[Command]] is an operation that is executed by the server that does not directly consume or
// produce a relational result.
message Command {
oneof command_type {
CommonInlineUserDefinedFunction register_function = 1;
WriteOperation write_operation = 2;
CreateDataFrameViewCommand create_dataframe_view = 3;
WriteOperationV2 write_operation_v2 = 4;
SqlCommand sql_command = 5;
WriteStreamOperationStart write_stream_operation_start = 6;
StreamingQueryCommand streaming_query_command = 7;
GetResourcesCommand get_resources_command = 8;
StreamingQueryManagerCommand streaming_query_manager_command = 9;
CommonInlineUserDefinedTableFunction register_table_function = 10;
StreamingQueryListenerBusCommand streaming_query_listener_bus_command = 11;
CommonInlineUserDefinedDataSource register_data_source = 12;
CreateResourceProfileCommand create_resource_profile_command = 13;
CheckpointCommand checkpoint_command = 14;
RemoveCachedRemoteRelationCommand remove_cached_remote_relation_command = 15;
MergeIntoTableCommand merge_into_table_command = 16;
// This field is used to mark extensions to the protocol. When plugins generate arbitrary
// Commands they can add them here. During the planning the correct resolution is done.
google.protobuf.Any extension = 999;
}
}
// A SQL Command is used to trigger the eager evaluation of SQL commands in Spark.
//
// When the SQL provide as part of the message is a command it will be immediately evaluated
// and the result will be collected and returned as part of a LocalRelation. If the result is
// not a command, the operation will simply return a SQL Relation. This allows the client to be
// almost oblivious to the server-side behavior.
message SqlCommand {
// (Required) SQL Query.
string sql = 1 [deprecated=true];
// (Optional) A map of parameter names to literal expressions.
map<string, Expression.Literal> args = 2 [deprecated=true];
// (Optional) A sequence of literal expressions for positional parameters in the SQL query text.
repeated Expression.Literal pos_args = 3 [deprecated=true];
// (Optional) A map of parameter names to expressions.
// It cannot coexist with `pos_arguments`.
map<string, Expression> named_arguments = 4 [deprecated=true];
// (Optional) A sequence of expressions for positional parameters in the SQL query text.
// It cannot coexist with `named_arguments`.
repeated Expression pos_arguments = 5 [deprecated=true];
// (Optional) The relation that this SQL command will be built on.
Relation input = 6;
}
// A command that can create DataFrame global temp view or local temp view.
message CreateDataFrameViewCommand {
// (Required) The relation that this view will be built on.
Relation input = 1;
// (Required) View name.
string name = 2;
// (Required) Whether this is global temp view or local temp view.
bool is_global = 3;
// (Required)
//
// If true, and if the view already exists, updates it; if false, and if the view
// already exists, throws exception.
bool replace = 4;
}
// As writes are not directly handled during analysis and planning, they are modeled as commands.
message WriteOperation {
// (Required) The output of the `input` relation will be persisted according to the options.
Relation input = 1;
// (Optional) Format value according to the Spark documentation. Examples are: text, parquet, delta.
optional string source = 2;
// (Optional)
//
// The destination of the write operation can be either a path or a table.
// If the destination is neither a path nor a table, such as jdbc and noop,
// the `save_type` should not be set.
oneof save_type {
string path = 3;
SaveTable table = 4;
}
// (Required) the save mode.
SaveMode mode = 5;
// (Optional) List of columns to sort the output by.
repeated string sort_column_names = 6;
// (Optional) List of columns for partitioning.
repeated string partitioning_columns = 7;
// (Optional) Bucketing specification. Bucketing must set the number of buckets and the columns
// to bucket by.
BucketBy bucket_by = 8;
// (Optional) A list of configuration options.
map<string, string> options = 9;
// (Optional) Columns used for clustering the table.
repeated string clustering_columns = 10;
message SaveTable {
// (Required) The table name.
string table_name = 1;
// (Required) The method to be called to write to the table.
TableSaveMethod save_method = 2;
enum TableSaveMethod {
TABLE_SAVE_METHOD_UNSPECIFIED = 0;
TABLE_SAVE_METHOD_SAVE_AS_TABLE = 1;
TABLE_SAVE_METHOD_INSERT_INTO = 2;
}
}
message BucketBy {
repeated string bucket_column_names = 1;
int32 num_buckets = 2;
}
enum SaveMode {
SAVE_MODE_UNSPECIFIED = 0;
SAVE_MODE_APPEND = 1;
SAVE_MODE_OVERWRITE = 2;
SAVE_MODE_ERROR_IF_EXISTS = 3;
SAVE_MODE_IGNORE = 4;
}
}
// As writes are not directly handled during analysis and planning, they are modeled as commands.
message WriteOperationV2 {
// (Required) The output of the `input` relation will be persisted according to the options.
Relation input = 1;
// (Required) The destination of the write operation must be either a path or a table.
string table_name = 2;
// (Optional) A provider for the underlying output data source. Spark's default catalog supports
// "parquet", "json", etc.
optional string provider = 3;
// (Optional) List of columns for partitioning for output table created by `create`,
// `createOrReplace`, or `replace`
repeated Expression partitioning_columns = 4;
// (Optional) A list of configuration options.
map<string, string> options = 5;
// (Optional) A list of table properties.
map<string, string> table_properties = 6;
// (Required) Write mode.
Mode mode = 7;
enum Mode {
MODE_UNSPECIFIED = 0;
MODE_CREATE = 1;
MODE_OVERWRITE = 2;
MODE_OVERWRITE_PARTITIONS = 3;
MODE_APPEND = 4;
MODE_REPLACE = 5;
MODE_CREATE_OR_REPLACE = 6;
}
// (Optional) A condition for overwrite saving mode
Expression overwrite_condition = 8;
// (Optional) Columns used for clustering the table.
repeated string clustering_columns = 9;
}
// Starts write stream operation as streaming query. Query ID and Run ID of the streaming
// query are returned.
message WriteStreamOperationStart {
// (Required) The output of the `input` streaming relation will be written.
Relation input = 1;
// The following fields directly map to API for DataStreamWriter().
// Consult API documentation unless explicitly documented here.
string format = 2;
map<string, string> options = 3;
repeated string partitioning_column_names = 4;
oneof trigger {
string processing_time_interval = 5;
bool available_now = 6;
bool once = 7;
string continuous_checkpoint_interval = 8;
}
string output_mode = 9;
string query_name = 10;
// The destination is optional. When set, it can be a path or a table name.
oneof sink_destination {
string path = 11;
string table_name = 12;
}
StreamingForeachFunction foreach_writer = 13;
StreamingForeachFunction foreach_batch = 14;
// (Optional) Columns used for clustering the table.
repeated string clustering_column_names = 15;
}
message StreamingForeachFunction {
oneof function {
PythonUDF python_function = 1;
ScalarScalaUDF scala_function = 2;
}
}
message WriteStreamOperationStartResult {
// (Required) Query instance. See `StreamingQueryInstanceId`.
StreamingQueryInstanceId query_id = 1;
// An optional query name.
string name = 2;
// Optional query started event if there is any listener registered on the client side.
optional string query_started_event_json = 3;
// TODO: How do we indicate errors?
// TODO: Consider adding status, last progress etc here.
}
// A tuple that uniquely identifies an instance of streaming query run. It consists of `id` that
// persists across the streaming runs and `run_id` that changes between each run of the
// streaming query that resumes from the checkpoint.
message StreamingQueryInstanceId {
// (Required) The unique id of this query that persists across restarts from checkpoint data.
// That is, this id is generated when a query is started for the first time, and
// will be the same every time it is restarted from checkpoint data.
string id = 1;
// (Required) The unique id of this run of the query. That is, every start/restart of a query
// will generate a unique run_id. Therefore, every time a query is restarted from
// checkpoint, it will have the same `id` but different `run_id`s.
string run_id = 2;
}
// Commands for a streaming query.
message StreamingQueryCommand {
// (Required) Query instance. See `StreamingQueryInstanceId`.
StreamingQueryInstanceId query_id = 1;
// See documentation for the corresponding API method in StreamingQuery.
oneof command {
// status() API.
bool status = 2;
// lastProgress() API.
bool last_progress = 3;
// recentProgress() API.
bool recent_progress = 4;
// stop() API. Stops the query.
bool stop = 5;
// processAllAvailable() API. Waits till all the available data is processed
bool process_all_available = 6;
// explain() API. Returns logical and physical plans.
ExplainCommand explain = 7;
// exception() API. Returns the exception in the query if any.
bool exception = 8;
// awaitTermination() API. Waits for the termination of the query.
AwaitTerminationCommand await_termination = 9;
}
message ExplainCommand {
// TODO: Consider reusing Explain from AnalyzePlanRequest message.
// We can not do this right now since it base.proto imports this file.
bool extended = 1;
}
message AwaitTerminationCommand {
optional int64 timeout_ms = 2;
}
}
// Response for commands on a streaming query.
message StreamingQueryCommandResult {
// (Required) Query instance id. See `StreamingQueryInstanceId`.
StreamingQueryInstanceId query_id = 1;
oneof result_type {
StatusResult status = 2;
RecentProgressResult recent_progress = 3;
ExplainResult explain = 4;
ExceptionResult exception = 5;
AwaitTerminationResult await_termination = 6;
}
message StatusResult {
// See documentation for these Scala 'StreamingQueryStatus' struct
string status_message = 1;
bool is_data_available = 2;
bool is_trigger_active = 3;
bool is_active = 4;
}
message RecentProgressResult {
// Progress reports as an array of json strings.
repeated string recent_progress_json = 5;
}
message ExplainResult {
// Logical and physical plans as string
string result = 1;
}
message ExceptionResult {
// (Optional) Exception message as string, maps to the return value of original
// StreamingQueryException's toString method
optional string exception_message = 1;
// (Optional) Exception error class as string
optional string error_class = 2;
// (Optional) Exception stack trace as string
optional string stack_trace = 3;
}
message AwaitTerminationResult {
bool terminated = 1;
}
}
// Commands for the streaming query manager.
message StreamingQueryManagerCommand {
// See documentation for the corresponding API method in StreamingQueryManager.
oneof command {
// active() API, returns a list of active queries.
bool active = 1;
// get() API, returns the StreamingQuery identified by id.
string get_query = 2;
// awaitAnyTermination() API, wait until any query terminates or timeout.
AwaitAnyTerminationCommand await_any_termination = 3;
// resetTerminated() API.
bool reset_terminated = 4;
// addListener API.
StreamingQueryListenerCommand add_listener = 5;
// removeListener API.
StreamingQueryListenerCommand remove_listener = 6;
// listListeners() API, returns a list of streaming query listeners.
bool list_listeners = 7;
}
message AwaitAnyTerminationCommand {
// (Optional) The waiting time in milliseconds to wait for any query to terminate.
optional int64 timeout_ms = 1;
}
message StreamingQueryListenerCommand {
bytes listener_payload = 1;
optional PythonUDF python_listener_payload = 2;
string id = 3;
}
}
// Response for commands on the streaming query manager.
message StreamingQueryManagerCommandResult {
oneof result_type {
ActiveResult active = 1;
StreamingQueryInstance query = 2;
AwaitAnyTerminationResult await_any_termination = 3;
bool reset_terminated = 4;
bool add_listener = 5;
bool remove_listener = 6;
ListStreamingQueryListenerResult list_listeners = 7;
}
message ActiveResult {
repeated StreamingQueryInstance active_queries = 1;
}
message StreamingQueryInstance {
// (Required) The id and runId of this query.
StreamingQueryInstanceId id = 1;
// (Optional) The name of this query.
optional string name = 2;
}
message AwaitAnyTerminationResult {
bool terminated = 1;
}
message StreamingQueryListenerInstance {
bytes listener_payload = 1;
}
message ListStreamingQueryListenerResult {
// (Required) Reference IDs of listener instances.
repeated string listener_ids = 1;
}
}
// The protocol for client-side StreamingQueryListener.
// This command will only be set when either the first listener is added to the client, or the last
// listener is removed from the client.
// The add_listener_bus_listener command will only be set true in the first case.
// The remove_listener_bus_listener command will only be set true in the second case.
message StreamingQueryListenerBusCommand {
oneof command {
bool add_listener_bus_listener = 1;
bool remove_listener_bus_listener = 2;
}
}
// The enum used for client side streaming query listener event
// There is no QueryStartedEvent defined here,
// it is added as a field in WriteStreamOperationStartResult
enum StreamingQueryEventType {
QUERY_PROGRESS_UNSPECIFIED = 0;
QUERY_PROGRESS_EVENT = 1;
QUERY_TERMINATED_EVENT = 2;
QUERY_IDLE_EVENT = 3;
}
// The protocol for the returned events in the long-running response channel.
message StreamingQueryListenerEvent {
// (Required) The json serialized event, all StreamingQueryListener events have a json method
string event_json = 1;
// (Required) Query event type used by client to decide how to deserialize the event_json
StreamingQueryEventType event_type = 2;
}
message StreamingQueryListenerEventsResult {
repeated StreamingQueryListenerEvent events = 1;
optional bool listener_bus_listener_added = 2;
}
// Command to get the output of 'SparkContext.resources'
message GetResourcesCommand { }
// Response for command 'GetResourcesCommand'.
message GetResourcesCommandResult {
map<string, ResourceInformation> resources = 1;
}
// Command to create ResourceProfile
message CreateResourceProfileCommand {
// (Required) The ResourceProfile to be built on the server-side.
ResourceProfile profile = 1;
}
// Response for command 'CreateResourceProfileCommand'.
message CreateResourceProfileCommandResult {
// (Required) Server-side generated resource profile id.
int32 profile_id = 1;
}
// Command to remove `CashedRemoteRelation`
message RemoveCachedRemoteRelationCommand {
// (Required) The remote to be related
CachedRemoteRelation relation = 1;
}
message CheckpointCommand {
// (Required) The logical plan to checkpoint.
Relation relation = 1;
// (Required) Locally checkpoint using a local temporary
// directory in Spark Connect server (Spark Driver)
bool local = 2;
// (Required) Whether to checkpoint this dataframe immediately.
bool eager = 3;
}
message MergeIntoTableCommand {
// (Required) The name of the target table.
string target_table_name = 1;
// (Required) The relation of the source table.
Relation source_table_plan = 2;
// (Required) The condition to match the source and target.
Expression merge_condition = 3;
// (Optional) The actions to be taken when the condition is matched.
repeated Expression match_actions = 4;
// (Optional) The actions to be taken when the condition is not matched.
repeated Expression not_matched_actions = 5;
// (Optional) The actions to be taken when the condition is not matched by source.
repeated Expression not_matched_by_source_actions = 6;
// (Required) Whether to enable schema evolution.
bool with_schema_evolution = 7;
}