* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
syntax = "proto3";
package ballista.protobuf;
option java_multiple_files = true;
option java_package = "org.ballistacompute.protobuf";
option java_outer_classname = "BallistaProto";
// Ballista Logical Plan
// logical expressions
message LogicalExprNode {
oneof ExprType {
// column references
string column_name = 1;
// alias
AliasNode alias = 2;
ScalarValue literal = 3;
// binary expressions
BinaryExprNode binary_expr = 4;
// aggregate expressions
AggregateExprNode aggregate_expr = 5;
// null checks
IsNull is_null_expr = 6;
IsNotNull is_not_null_expr = 7;
Not not_expr = 8;
BetweenNode between = 9;
CaseNode case_ = 10;
CastNode cast = 11;
SortExprNode sort = 12;
NegativeNode negative = 13;
InListNode in_list = 14;
bool wildcard = 15;
ScalarFunctionNode scalar_function = 16;
TryCastNode try_cast = 17;
message IsNull {
LogicalExprNode expr = 1;
message IsNotNull {
LogicalExprNode expr = 1;
message Not {
LogicalExprNode expr = 1;
message AliasNode {
LogicalExprNode expr = 1;
string alias = 2;
message BinaryExprNode {
LogicalExprNode l = 1;
LogicalExprNode r = 2;
string op = 3;
message NegativeNode {
LogicalExprNode expr = 1;
message InListNode {
LogicalExprNode expr = 1;
repeated LogicalExprNode list = 2;
bool negated = 3;
enum ScalarFunction {
SQRT = 0;
SIN = 1;
COS = 2;
TAN = 3;
ASIN = 4;
ACOS = 5;
ATAN = 6;
EXP = 7;
LOG = 8;
LOG2 = 9;
LOG10 = 10;
FLOOR = 11;
CEIL = 12;
ROUND = 13;
TRUNC = 14;
ABS = 15;
SIGNUM = 16;
CONCAT = 18;
LOWER = 19;
UPPER = 20;
TRIM = 21;
LTRIM = 22;
RTRIM = 23;
ARRAY = 25;
NULLIF = 26;
MD5 = 28;
SHA224 = 29;
SHA256 = 30;
SHA384 = 31;
SHA512 = 32;
message ScalarFunctionNode {
ScalarFunction fun = 1;
repeated LogicalExprNode expr = 2;
enum AggregateFunction {
MIN = 0;
MAX = 1;
SUM = 2;
AVG = 3;
COUNT = 4;
message AggregateExprNode {
AggregateFunction aggr_function = 1;
LogicalExprNode expr = 2;
message BetweenNode {
LogicalExprNode expr = 1;
bool negated = 2;
LogicalExprNode low = 3;
LogicalExprNode high = 4;
message CaseNode {
LogicalExprNode expr = 1;
repeated WhenThen when_then_expr = 2;
LogicalExprNode else_expr = 3;
message WhenThen {
LogicalExprNode when_expr = 1;
LogicalExprNode then_expr = 2;
message CastNode {
LogicalExprNode expr = 1;
ArrowType arrow_type = 2;
message TryCastNode {
LogicalExprNode expr = 1;
ArrowType arrow_type = 2;
message SortExprNode {
LogicalExprNode expr = 1;
bool asc = 2;
bool nulls_first = 3;
// LogicalPlan is a nested type
message LogicalPlanNode {
oneof LogicalPlanType {
CsvTableScanNode csv_scan = 1;
ParquetTableScanNode parquet_scan = 2;
ProjectionNode projection = 3;
SelectionNode selection = 4;
LimitNode limit = 5;
AggregateNode aggregate = 6;
JoinNode join = 7;
SortNode sort = 8;
RepartitionNode repartition = 9;
EmptyRelationNode empty_relation = 10;
CreateExternalTableNode create_external_table = 11;
ExplainNode explain = 12;
message ProjectionColumns {
repeated string columns = 1;
message CsvTableScanNode {
string table_name = 1;
string path = 2;
bool has_header = 3;
string delimiter = 4;
string file_extension = 5;
ProjectionColumns projection = 6;
Schema schema = 7;
repeated LogicalExprNode filters = 8;
message ParquetTableScanNode {
string table_name = 1;
string path = 2;
ProjectionColumns projection = 3;
Schema schema = 4;
repeated LogicalExprNode filters = 5;
message ProjectionNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
message SelectionNode {
LogicalPlanNode input = 1;
LogicalExprNode expr = 2;
message SortNode{
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
message RepartitionNode{
LogicalPlanNode input = 1;
oneof partition_method {
uint64 round_robin = 2;
HashRepartition hash = 3;
message HashRepartition {
repeated LogicalExprNode hash_expr = 1;
uint64 partition_count = 2;
message EmptyRelationNode{
bool produce_one_row = 1;
message CreateExternalTableNode{
string name = 1;
string location = 2;
FileType file_type = 3;
bool has_header = 4;
Schema schema = 5;
enum FileType{
NdJson = 0;
Parquet = 1;
CSV = 2;
message ExplainNode{
LogicalPlanNode input = 1;
bool verbose = 2;
message DfField{
string qualifier = 2;
Field field = 1;
message AggregateNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode group_expr = 2;
repeated LogicalExprNode aggr_expr = 3;
enum JoinType {
INNER = 0;
LEFT = 1;
RIGHT = 2;
message JoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
JoinType join_type = 3;
repeated string left_join_column = 4;
repeated string right_join_column = 5;
message LimitNode {
LogicalPlanNode input = 1;
uint32 limit = 2;
message SelectionExecNode {
LogicalExprNode expr = 1;
// Ballista Physical Plan
// PhysicalPlanNode is a nested type
message PhysicalPlanNode {
oneof PhysicalPlanType {
ParquetScanExecNode parquet_scan = 1;
CsvScanExecNode csv_scan = 2;
EmptyExecNode empty = 3;
ProjectionExecNode projection = 4;
GlobalLimitExecNode global_limit = 6;
LocalLimitExecNode local_limit = 7;
HashAggregateExecNode hash_aggregate = 8;
HashJoinExecNode hash_join = 9;
ShuffleReaderExecNode shuffle_reader = 10;
SortExecNode sort = 11;
CoalesceBatchesExecNode coalesce_batches = 12;
FilterExecNode filter = 13;
MergeExecNode merge = 14;
UnresolvedShuffleExecNode unresolved = 15;
RepartitionExecNode repartition = 16;
message UnresolvedShuffleExecNode {
repeated uint32 query_stage_ids = 1;
Schema schema = 2;
uint32 partition_count = 3;
message FilterExecNode {
PhysicalPlanNode input = 1;
LogicalExprNode expr = 2;
message ParquetScanExecNode {
repeated string filename = 1;
repeated uint32 projection = 2;
uint32 num_partitions = 3;
uint32 batch_size = 4;
message CsvScanExecNode {
string path = 1;
repeated uint32 projection = 2;
Schema schema = 3;
string file_extension = 4;
bool has_header = 5;
uint32 batch_size = 6;
string delimiter = 7;
// partition filenames
repeated string filename = 8;
message HashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
JoinType join_type = 4;
message JoinOn {
string left = 1;
string right = 2;
message EmptyExecNode {
bool produce_one_row = 1;
Schema schema = 2;
message ProjectionExecNode {
PhysicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
repeated string expr_name = 3;
enum AggregateMode {
FINAL = 1;
message HashAggregateExecNode {
repeated LogicalExprNode group_expr = 1;
repeated LogicalExprNode aggr_expr = 2;
AggregateMode mode = 3;
PhysicalPlanNode input = 4;
repeated string group_expr_name = 5;
repeated string aggr_expr_name = 6;
// we need the input schema to the partial aggregate to pass to the final aggregate
Schema input_schema = 7;
message ShuffleReaderExecNode {
repeated PartitionLocation partition_location = 1;
Schema schema = 2;
message GlobalLimitExecNode {
PhysicalPlanNode input = 1;
uint32 limit = 2;
message LocalLimitExecNode {
PhysicalPlanNode input = 1;
uint32 limit = 2;
message SortExecNode {
PhysicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
message CoalesceBatchesExecNode {
PhysicalPlanNode input = 1;
uint32 target_batch_size = 2;
message MergeExecNode {
PhysicalPlanNode input = 1;
message RepartitionExecNode{
PhysicalPlanNode input = 1;
oneof partition_method {
uint64 round_robin = 2;
HashRepartition hash = 3;
uint64 unknown = 4;
// Ballista Scheduling
message KeyValuePair {
string key = 1;
string value = 2;
message Action {
oneof ActionType {
// Execute a logical query plan
LogicalPlanNode query = 1;
// Execute one partition of a physical query plan
ExecutePartition execute_partition = 2;
// Fetch a partition from an executor
PartitionId fetch_partition = 3;
// configuration settings
repeated KeyValuePair settings = 100;
message ExecutePartition {
string job_id = 1;
uint32 stage_id = 2;
repeated uint32 partition_id = 3;
PhysicalPlanNode plan = 4;
// The task could need to read partitions from other executors
repeated PartitionLocation partition_location = 5;
// Mapping from partition id to executor id
message PartitionLocation {
PartitionId partition_id = 1;
ExecutorMetadata executor_meta = 2;
PartitionStats partition_stats = 3;
// Unique identifier for a materialized partition of data
message PartitionId {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 4;
message PartitionStats {
int64 num_rows = 1;
int64 num_batches = 2;
int64 num_bytes = 3;
repeated ColumnStats column_stats = 4;
message ColumnStats {
ScalarValue min_value = 1;
ScalarValue max_value = 2;
uint32 null_count = 3;
uint32 distinct_count = 4;
message ExecutorMetadata {
string id = 1;
string host = 2;
uint32 port = 3;
message GetExecutorMetadataParams {}
message GetExecutorMetadataResult {
repeated ExecutorMetadata metadata = 1;
message RunningTask {
string executor_id = 1;
message FailedTask {
string error = 1;
message CompletedTask {
string executor_id = 1;
message TaskStatus {
PartitionId partition_id = 1;
oneof status {
RunningTask running = 2;
FailedTask failed = 3;
CompletedTask completed = 4;
message PollWorkParams {
ExecutorMetadata metadata = 1;
bool can_accept_task = 2;
// All tasks must be reported until they reach the failed or completed state
repeated TaskStatus task_status = 3;
message TaskDefinition {
PartitionId task_id = 1;
PhysicalPlanNode plan = 2;
message PollWorkResult {
TaskDefinition task = 1;
message ExecuteQueryParams {
oneof query {
LogicalPlanNode logical_plan = 1;
string sql = 2;
message ExecuteSqlParams {
string sql = 1;
message ExecuteQueryResult {
string job_id = 1;
message GetJobStatusParams {
string job_id = 1;
message CompletedJob {
repeated PartitionLocation partition_location = 1;
message QueuedJob {}
// TODO: add progress report
message RunningJob {}
message FailedJob {
string error = 1;
message JobStatus {
oneof status {
QueuedJob queued = 1;
RunningJob running = 2;
FailedJob failed = 3;
CompletedJob completed = 4;
message GetJobStatusResult {
JobStatus status = 1;
message GetFileMetadataParams {
string path = 1;
FileType file_type = 2;
message GetFileMetadataResult {
Schema schema = 1;
repeated FilePartitionMetadata partitions = 2;
message FilePartitionMetadata {
repeated string filename = 1;
service SchedulerGrpc {
rpc GetExecutorsMetadata (GetExecutorMetadataParams) returns (GetExecutorMetadataResult) {}
// Executors must poll the scheduler for heartbeat and to receive tasks
rpc PollWork (PollWorkParams) returns (PollWorkResult) {}
rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}
rpc ExecuteQuery (ExecuteQueryParams) returns (ExecuteQueryResult) {}
rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {}
// Arrow Data Types
message Schema {
repeated Field columns = 1;
message Field {
// name of the field
string name = 1;
ArrowType arrow_type = 2;
bool nullable = 3;
// for complex data types like structs, unions
repeated Field children = 4;
message FixedSizeBinary{
int32 length = 1;
message Timestamp{
TimeUnit time_unit = 1;
string timezone = 2;
enum DateUnit{
Day = 0;
DateMillisecond = 1;
enum TimeUnit{
Second = 0;
TimeMillisecond = 1;
Microsecond = 2;
Nanosecond = 3;
enum IntervalUnit{
YearMonth = 0;
DayTime = 1;
message Decimal{
uint64 whole = 1;
uint64 fractional = 2;
message List{
Field field_type = 1;
message FixedSizeList{
Field field_type = 1;
int32 list_size = 2;
message Dictionary{
ArrowType key = 1;
ArrowType value = 2;
message Struct{
repeated Field sub_field_types = 1;
message Union{
repeated Field union_types = 1;
message ScalarListValue{
ScalarType datatype = 1;
repeated ScalarValue values = 2;
message ScalarValue{
oneof value{
bool bool_value = 1;
string utf8_value = 2;
string large_utf8_value = 3;
int32 int8_value = 4;
int32 int16_value = 5;
int32 int32_value = 6;
int64 int64_value = 7;
uint32 uint8_value = 8;
uint32 uint16_value = 9;
uint32 uint32_value = 10;
uint64 uint64_value = 11;
float float32_value = 12;
double float64_value = 13;
//Literal Date32 value always has a unit of day
int32 date_32_value = 14;
int64 time_microsecond_value = 15;
int64 time_nanosecond_value = 16;
ScalarListValue list_value = 17;
ScalarType null_list_value = 18;
PrimitiveScalarType null_value = 19;
// Contains all valid datafusion scalar type except for
// List
enum PrimitiveScalarType{
BOOL = 0; // arrow::Type::BOOL
UINT8 = 1; // arrow::Type::UINT8
INT8 = 2; // arrow::Type::INT8
UINT16 = 3; // represents arrow::Type fields in src/arrow/type.h
INT16 = 4;
UINT32 = 5;
INT32 = 6;
UINT64 = 7;
INT64 = 8;
FLOAT32 = 9;
FLOAT64 = 10;
UTF8 = 11;
LARGE_UTF8 = 12;
DATE32 = 13;
NULL = 16;
message ScalarType{
oneof datatype{
PrimitiveScalarType scalar = 1;
ScalarListType list = 2;
message ScalarListType{
repeated string field_names = 3;
PrimitiveScalarType deepest_type = 2;
// Broke out into multiple message types so that type
// metadata did not need to be in separate message
//All types that are of the empty message types contain no additional metadata
// about the type
message ArrowType{
oneof arrow_type_enum{
EmptyMessage NONE = 1; // arrow::Type::NA
EmptyMessage BOOL = 2; // arrow::Type::BOOL
EmptyMessage UINT8 = 3; // arrow::Type::UINT8
EmptyMessage INT8 = 4; // arrow::Type::INT8
EmptyMessage UINT16 =5; // represents arrow::Type fields in src/arrow/type.h
EmptyMessage INT16 = 6;
EmptyMessage UINT32 =7;
EmptyMessage INT32 = 8;
EmptyMessage UINT64 =9;
EmptyMessage INT64 =10 ;
EmptyMessage FLOAT16 =11 ;
EmptyMessage FLOAT32 =12 ;
EmptyMessage FLOAT64 =13 ;
EmptyMessage UTF8 =14 ;
EmptyMessage LARGE_UTF8 = 32;
EmptyMessage BINARY =15 ;
EmptyMessage LARGE_BINARY = 31;
EmptyMessage DATE32 =17 ;
EmptyMessage DATE64 =18 ;
TimeUnit DURATION = 19;
Timestamp TIMESTAMP =20 ;
TimeUnit TIME32 =21 ;
TimeUnit TIME64 =22 ;
IntervalUnit INTERVAL =23 ;
Decimal DECIMAL =24 ;
List LIST =25;
List LARGE_LIST = 26;
FixedSizeList FIXED_SIZE_LIST = 27;
Struct STRUCT =28;
Union UNION =29;
Dictionary DICTIONARY =30;
//Useful for representing an empty enum variant in rust
// E.G. enum example{One, Two(i32)}
// maps to
// message example{
// oneof{
// EmptyMessage One = 1;
// i32 Two = 2;
// }
message EmptyMessage{}