| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * Protocol Buffers describing the Fn API and boostrapping. |
| * |
| * TODO: Usage of plural names in lists looks awkward in Java |
| * e.g. getOutputsMap, addCodersBuilder |
| * |
| * TODO: gRPC / proto field names conflict with generated code |
| * e.g. "class" in java, "output" in python |
| */ |
| |
| syntax = "proto3"; |
| |
| /* TODO: Consider consolidating common components in another package |
| * and lanaguage namespaces for re-use with Runner Api. |
| */ |
| |
| package org.apache.beam.fn.v1; |
| |
| option java_package = "org.apache.beam.fn.v1"; |
| option java_outer_classname = "BeamFnApi"; |
| |
| import "google/protobuf/any.proto"; |
| import "google/protobuf/timestamp.proto"; |
| |
| /* |
| * Constructs that define the pipeline shape. |
| * |
| * These are mostly unstable due to the missing pieces to be shared with |
| * the Runner Api like windowing strategy, display data, .... There are still |
| * some modelling questions related to whether a side input is modelled |
| * as another field on a PrimitiveTransform or as part of inputs and we |
| * still are missing things like the CompositeTransform. |
| */ |
| |
| // A representation of an input or output definition on a primitive transform. |
| // Stable |
| message Target { |
| // A repeated list of target definitions. |
| message List { |
| repeated Target target = 1; |
| } |
| |
| // (Required) The id of the PrimitiveTransform which is the target. |
| string primitive_transform_reference = 1; |
| |
| // (Required) The local name of an input or output defined on the primitive |
| // transform. |
| string name = 2; |
| } |
| |
| // Information defining a PCollection |
| message PCollection { |
| // (Required) A reference to a coder. |
| string coder_reference = 1; |
| |
| // TODO: Windowing strategy, ... |
| } |
| |
| // A primitive transform within Apache Beam. |
| message PrimitiveTransform { |
| // (Required) A pipeline level unique id which can be used as a reference to |
| // refer to this. |
| string id = 1; |
| |
| // (Required) A function spec that is used by this primitive |
| // transform to process data. |
| FunctionSpec function_spec = 2; |
| |
| // A map of distinct input names to target definitions. |
| // For example, in CoGbk this represents the tag name associated with each |
| // distinct input name and a list of primitive transforms that are associated |
| // with the specified input. |
| map<string, Target.List> inputs = 3; |
| |
| // A map from local output name to PCollection definitions. For example, in |
| // DoFn this represents the tag name associated with each distinct output. |
| map<string, PCollection> outputs = 4; |
| |
| // TODO: Should we model side inputs as a special type of input for a |
| // primitive transform or should it be modeled as the relationship that |
| // the predecessor input will be a view primitive transform. |
| // A map of from side input names to side inputs. |
| map<string, SideInput> side_inputs = 5; |
| |
| // The user name of this step. |
| // TODO: This should really be in display data and not at this level |
| string step_name = 6; |
| } |
| |
| /* |
| * User Definable Functions |
| * |
| * This is still unstable mainly due to how we model the side input. |
| */ |
| |
| // Defines the common elements of user-definable functions, to allow the SDK to |
| // express the information the runner needs to execute work. |
| // Stable |
| message FunctionSpec { |
| // (Required) A pipeline level unique id which can be used as a reference to |
| // refer to this. |
| string id = 1; |
| |
| // (Required) A globally unique name representing this user definable |
| // function. |
| // |
| // User definable functions use the urn encodings registered such that another |
| // may implement the user definable function within another language. |
| // |
| // For example: |
| // urn:org.apache.beam:coder:kv:1.0 |
| string urn = 2; |
| |
| // (Required) Reference to specification of execution environment required to |
| // invoke this function. |
| string environment_reference = 3; |
| |
| // Data used to parameterize this function. Depending on the urn, this may be |
| // optional or required. |
| google.protobuf.Any data = 4; |
| } |
| |
| message SideInput { |
| // TODO: Coder? |
| |
| // For RunnerAPI. |
| Target input = 1; |
| |
| // For FnAPI. |
| FunctionSpec view_fn = 2; |
| } |
| |
| // Defines how to encode values into byte streams and decode values from byte |
| // streams. A coder can be parameterized by additional properties which may or |
| // may not be language agnostic. |
| // |
| // Coders using the urn:org.apache.beam:coder namespace must have their |
| // encodings registered such that another may implement the encoding within |
| // another language. |
| // |
| // For example: |
| // urn:org.apache.beam:coder:kv:1.0 |
| // urn:org.apache.beam:coder:iterable:1.0 |
| // Stable |
| message Coder { |
| // TODO: This looks weird when compared to the other function specs |
| // which use URN to differentiate themselves. Should "Coder" be embedded |
| // inside the FunctionSpec data block. |
| |
| // The data associated with this coder used to reconstruct it. |
| FunctionSpec function_spec = 1; |
| |
| // A list of component coder references. |
| // |
| // For a key-value coder, there must be exactly two component coder references |
| // where the first reference represents the key coder and the second reference |
| // is the value coder. |
| // |
| // For an iterable coder, there must be exactly one component coder reference |
| // representing the value coder. |
| // |
| // TODO: Perhaps this is redundant with the data of the FunctionSpec |
| // for known coders? |
| repeated string component_coder_reference = 2; |
| } |
| |
| // A descriptor for connecting to a remote port using the Beam Fn Data API. |
| // Allows for communication between two environments (for example between the |
| // runner and the SDK). |
| // Stable |
| message RemoteGrpcPort { |
| // (Required) An API descriptor which describes where to |
| // connect to including any authentication that is required. |
| ApiServiceDescriptor api_service_descriptor = 1; |
| } |
| |
| /* |
| * Control Plane API |
| * |
| * Progress reporting and splitting still need further vetting. Also, this may change |
| * with the addition of new types of instructions/responses related to metrics. |
| */ |
| |
| // An API that describes the work that a SDK harness is meant to do. |
| // Stable |
| service BeamFnControl { |
| // Instructions sent by the runner to the SDK requesting different types |
| // of work. |
| rpc Control( |
| // A stream of responses to instructions the SDK was asked to be performed. |
| stream InstructionResponse |
| ) returns ( |
| // A stream of instructions requested of the SDK to be performed. |
| stream InstructionRequest |
| ) {} |
| } |
| |
| // A request sent by a runner which it the SDK is asked to fulfill. |
| // Stable |
| message InstructionRequest { |
| // (Required) An unique identifier provided by the runner which represents |
| // this requests execution. The InstructionResponse MUST have the matching id. |
| string instruction_id = 1; |
| |
| // (Required) A request that the SDK Harness needs to interpret. |
| oneof request { |
| RegisterRequest register = 1000; |
| ProcessBundleRequest process_bundle = 1001; |
| ProcessBundleProgressRequest process_bundle_progress = 1002; |
| ProcessBundleSplitRequest process_bundle_split = 1003; |
| } |
| } |
| |
| // The response for an associated request the SDK had been asked to fulfill. |
| // Stable |
| message InstructionResponse { |
| // (Required) A reference provided by the runner which represents a requests |
| // execution. The InstructionResponse MUST have the matching id when |
| // responding to the runner. |
| string instruction_id = 1; |
| |
| // If this is specified, then this instruction has failed. |
| // A human readable string representing the reason as to why processing has |
| // failed. |
| string error = 2; |
| |
| // If the instruction did not fail, it is required to return an equivalent |
| // response type depending on the request this matches. |
| oneof response { |
| RegisterResponse register = 1000; |
| ProcessBundleResponse process_bundle = 1001; |
| ProcessBundleProgressResponse process_bundle_progress = 1002; |
| ProcessBundleSplitResponse process_bundle_split = 1003; |
| } |
| } |
| |
| // A list of objects which can be referred to by the runner in |
| // future requests. |
| // Stable |
| message RegisterRequest { |
| // (Optional) The set of descriptors used to process bundles. |
| repeated ProcessBundleDescriptor process_bundle_descriptor = 1; |
| } |
| |
| // Stable |
| message RegisterResponse { |
| } |
| |
| // A descriptor of references used when processing a bundle. |
| // Stable |
| message ProcessBundleDescriptor { |
| // (Required) A pipeline level unique id which can be used as a reference to |
| // refer to this. |
| string id = 1; |
| |
| // (Required) A list of primitive transforms that should |
| // be used to construct the bundle processing graph. |
| repeated PrimitiveTransform primitive_transform = 2; |
| |
| // (Required) The set of all coders referenced in this bundle. |
| repeated Coder coders = 4; |
| } |
| |
| // A request to process a given bundle. |
| // Stable |
| message ProcessBundleRequest { |
| // (Required) A reference to the process bundle descriptor that must be |
| // instantiated and executed by the SDK harness. |
| string process_bundle_descriptor_reference = 1; |
| |
| // (Optional) A list of cache tokens that can be used by an SDK to cache |
| // data looked up using the State API across multiple bundles. |
| repeated CacheToken cache_tokens = 2; |
| } |
| |
| // Stable |
| message ProcessBundleResponse { |
| } |
| |
| message ProcessBundleProgressRequest { |
| // (Required) A reference to an active process bundle request with the given |
| // instruction id. |
| string instruction_reference = 1; |
| } |
| |
| message ProcessBundleProgressResponse { |
| // (Required) The finished amount of work. A monotonically increasing |
| // unitless measure of work finished. |
| double finished_work = 1; |
| |
| // (Required) The known amount of backlog for the process bundle request. |
| // Computed as: |
| // (estimated known work - finish work) / finished work |
| double backlog = 2; |
| } |
| |
| message ProcessBundleSplitRequest { |
| // (Required) A reference to an active process bundle request with the given |
| // instruction id. |
| string instruction_reference = 1; |
| |
| // (Required) The fraction of work (when compared to the known amount of work) |
| // the process bundle request should try to split at. |
| double fraction = 2; |
| } |
| |
| // urn:org.apache.beam:restriction:element-count:1.0 |
| message ElementCountRestriction { |
| // A restriction representing the number of elements that should be processed. |
| // Effectively the range [0, count] |
| int64 count = 1; |
| } |
| |
| // urn:org.apache.beam:restriction:element-count-skip:1.0 |
| message ElementCountSkipRestriction { |
| // A restriction representing the number of elements that should be skipped. |
| // Effectively the range (count, infinity] |
| int64 count = 1; |
| } |
| |
| // Each primitive transform that is splittable is defined by a restriction |
| // it is currently processing. During splitting, that currently active |
| // restriction (R_initial) is split into 2 components: |
| // * a restriction (R_done) representing all elements that will be fully |
| // processed |
| // * a restriction (R_todo) representing all elements that will not be fully |
| // processed |
| // |
| // where: |
| // R_initial = R_done ⋃ R_todo |
| message PrimitiveTransformSplit { |
| // (Required) A reference to a primitive transform with the given id that |
| // is part of the active process bundle request with the given instruction |
| // id. |
| string primitive_transform_reference = 1; |
| |
| // (Required) A function specification describing the restriction |
| // that has been completed by the primitive transform. |
| // |
| // For example, a remote GRPC source will have a specific urn and data |
| // block containing an ElementCountRestriction. |
| FunctionSpec completed_restriction = 2; |
| |
| // (Required) A function specification describing the restriction |
| // representing the remainder of work for the primitive transform. |
| // |
| // FOr example, a remote GRPC source will have a specific urn and data |
| // block contain an ElemntCountSkipRestriction. |
| FunctionSpec remaining_restriction = 3; |
| } |
| |
| message ProcessBundleSplitResponse { |
| // (Optional) A set of split responses for a currently active work item. |
| // |
| // If primitive transform B is a descendant of primitive transform A and both |
| // A and B report a split. Then B's restriction is reported as an element |
| // restriction pair and thus the fully reported restriction is: |
| // R = A_done |
| // ⋃ (A_boundary ⋂ B_done) |
| // ⋃ (A_boundary ⋂ B_todo) |
| // ⋃ A_todo |
| // If there is a decendant of B named C, then C would similarly report a |
| // set of element pair restrictions. |
| // |
| // This restriction is processed and completed by the currently active process |
| // bundle request: |
| // A_done ⋃ (A_boundary ⋂ B_done) |
| // and these restrictions will be processed by future process bundle requests: |
| // A_boundary â‹‚ B_todo (passed to SDF B directly) |
| // A_todo (passed to SDF A directly) |
| |
| // If primitive transform B and C are siblings and descendants of A and A, B, |
| // and C report a split. Then B and C's restrictions are relative to A's. |
| // R = A_done |
| // ⋃ (A_boundary ⋂ B_done) |
| // ⋃ (A_boundary ⋂ B_todo) |
| // ⋃ (A_boundary ⋂ B_todo) |
| // ⋃ (A_boundary ⋂ C_todo) |
| // ⋃ A_todo |
| // If there is no descendant of B or C also reporting a split, than |
| // B_boundary = ∅ and C_boundary = ∅ |
| // |
| // This restriction is processed and completed by the currently active process |
| // bundle request: |
| // A_done ⋃ (A_boundary ⋂ B_done) |
| // ⋃ (A_boundary ⋂ C_done) |
| // and these restrictions will be processed by future process bundle requests: |
| // A_boundary â‹‚ B_todo (passed to SDF B directly) |
| // A_boundary â‹‚ C_todo (passed to SDF C directly) |
| // A_todo (passed to SDF A directly) |
| // |
| // Note that descendants splits should only be reported if it is inexpensive |
| // to compute the boundary restriction intersected with descendants splits. |
| // Also note, that the boundary restriction may represent a set of elements |
| // produced by a parent primitive transform which can not be split at each |
| // element or that there are intermediate unsplittable primitive transforms |
| // between an ancestor splittable function and a descendant splittable |
| // function which may have more than one output per element. Finally note |
| // that the descendant splits should only be reported if the split |
| // information is relatively compact. |
| repeated PrimitiveTransformSplit splits = 1; |
| } |
| |
| /* |
| * Data Plane API |
| */ |
| |
| // Messages used to represent logical byte streams. |
| // Stable |
| message Elements { |
| // Represents multiple encoded elements in nested context for a given named |
| // instruction and target. |
| message Data { |
| // (Required) A reference to an active instruction request with the given |
| // instruction id. |
| string instruction_reference = 1; |
| |
| // (Required) A definition representing a consumer or producer of this data. |
| // If received by a harness, this represents the consumer within that |
| // harness that should consume these bytes. If sent by a harness, this |
| // represents the producer of these bytes. |
| // |
| // Note that a single element may span multiple Data messages. |
| // |
| // Note that a sending/receiving pair should share the same target |
| // identifier. |
| Target target = 2; |
| |
| // (Optional) Represents a part of a logical byte stream. Elements within |
| // the logical byte stream are encoded in the nested context and |
| // concatenated together. |
| // |
| // An empty data block represents the end of stream for the given |
| // instruction and target. |
| bytes data = 3; |
| } |
| |
| // (Required) A list containing parts of logical byte streams. |
| repeated Data data = 1; |
| } |
| |
| // Stable |
| service BeamFnData { |
| // Used to send data between harnesses. |
| rpc Data( |
| // A stream of data representing input. |
| stream Elements |
| ) returns ( |
| // A stream of data representing output. |
| stream Elements |
| ) {} |
| } |
| |
| /* |
| * State API |
| */ |
| |
| message StateRequest { |
| // (Required) An unique identifier provided by the SDK which represents this |
| // requests execution. The StateResponse corresponding with this request |
| // will have the matching id. |
| string id = 1; |
| |
| // (Required) The associated instruction id of the work that is currently |
| // being processed. This allows for the runner to associate any modifications |
| // to state to be committed with the appropriate work execution. |
| string instruction_reference = 2; |
| |
| // (Required) The state key this request is for. |
| StateKey state_key = 3; |
| |
| // (Required) The action to take on this request. |
| oneof request { |
| // A request to get state. |
| StateGetRequest get = 1000; |
| |
| // A request to append to state. |
| StateAppendRequest append = 1001; |
| |
| // A request to clear state. |
| StateClearRequest clear = 1002; |
| } |
| } |
| |
| message StateResponse { |
| // (Required) A reference provided by the SDK which represents a requests |
| // execution. The StateResponse must have the matching id when responding |
| // to the SDK. |
| string id = 1; |
| |
| // (Optional) If this is specified, then the state request has failed. |
| // A human readable string representing the reason as to why the request |
| // failed. |
| string error = 2; |
| |
| // A corresponding response matching the request will be populated. |
| oneof response { |
| // A response to getting state. |
| StateGetResponse get = 1000; |
| |
| // A response to appending to state. |
| StateAppendResponse append = 1001; |
| |
| // A response to clearing state. |
| StateClearResponse clear = 1002; |
| } |
| } |
| |
| service BeamFnState { |
| // Used to get/append/clear state stored by the runner on behalf of the SDK. |
| rpc State( |
| // A stream of state instructions requested of the runner. |
| stream StateRequest |
| ) returns ( |
| // A stream of responses to state instructions the runner was asked to be |
| // performed. |
| stream StateResponse |
| ) {} |
| } |
| |
| message CacheToken { |
| // (Required) Represents the function spec and tag associated with this state |
| // key. |
| // |
| // By combining the function_spec_reference with the tag representing: |
| // * the input, we refer to the iterable portion of a large GBK |
| // * the side input, we refer to the side input |
| // * the user state, we refer to user state |
| Target target = 1; |
| |
| // (Required) An opaque identifier. |
| bytes token = 2; |
| } |
| |
| message StateKey { |
| // (Required) Represents the function spec and tag associated with this state |
| // key. |
| // |
| // By combining the function_spec_reference with the tag representing: |
| // * the input, we refer to fetching the iterable portion of a large GBK |
| // * the side input, we refer to fetching the side input |
| // * the user state, we refer to fetching user state |
| Target target = 1; |
| |
| // (Required) The bytes of the window which this state request is for encoded |
| // in the nested context. |
| bytes window = 2; |
| |
| // (Required) The user key encoded in the nested context. |
| bytes key = 3; |
| } |
| |
| // A logical byte stream which can be continued using the state API. |
| message ContinuableStream { |
| // (Optional) If specified, represents a token which can be used with the |
| // state API to get the next chunk of this logical byte stream. The end of |
| // the logical byte stream is signalled by this field being unset. |
| bytes continuation_token = 1; |
| |
| // Represents a part of a logical byte stream. Elements within |
| // the logical byte stream are encoded in the nested context and |
| // concatenated together. |
| bytes data = 2; |
| } |
| |
| // A request to get state. |
| message StateGetRequest { |
| // (Optional) If specified, signals to the runner that the response |
| // should resume from the following continuation token. |
| // |
| // If unspecified, signals to the runner that the response should start |
| // from the beginning of the logical continuable stream. |
| bytes continuation_token = 1; |
| } |
| |
| // A response to get state. |
| message StateGetResponse { |
| // (Required) The response containing a continuable logical byte stream. |
| ContinuableStream stream = 1; |
| } |
| |
| // A request to append state. |
| message StateAppendRequest { |
| // Represents a part of a logical byte stream. Elements within |
| // the logical byte stream are encoded in the nested context and |
| // multiple append requests are concatenated together. |
| bytes data = 1; |
| } |
| |
| // A response to append state. |
| message StateAppendResponse { |
| } |
| |
| // A request to clear state. |
| message StateClearRequest { |
| } |
| |
| // A response to clear state. |
| message StateClearResponse { |
| } |
| |
| /* |
| * Logging API |
| * |
| * This is very stable. There can be some changes to how we define a LogEntry, |
| * to increase/decrease the severity types, the way we format an exception/stack |
| * trace, or the log site. |
| */ |
| |
| // A log entry |
| message LogEntry { |
| // A list of log entries, enables buffering and batching of multiple |
| // log messages using the logging API. |
| message List { |
| // (Required) One or or more log messages. |
| repeated LogEntry log_entries = 1; |
| } |
| |
| // The severity of the event described in a log entry, expressed as one of the |
| // severity levels listed below. For your reference, the levels are |
| // assigned the listed numeric values. The effect of using numeric values |
| // other than those listed is undefined. |
| // |
| // If you are writing log entries, you should map other severity encodings to |
| // one of these standard levels. For example, you might map all of |
| // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`. |
| // |
| // This list is intentionally not comprehensive; the intent is to provide a |
| // common set of "good enough" severity levels so that logging front ends |
| // can provide filtering and searching across log types. Users of the API are |
| // free not to use all severity levels in their log messages. |
| enum Severity { |
| // Trace level information, also the default log level unless |
| // another severity is specified. |
| TRACE = 0; |
| // Debugging information. |
| DEBUG = 10; |
| // Normal events. |
| INFO = 20; |
| // Normal but significant events, such as start up, shut down, or |
| // configuration. |
| NOTICE = 30; |
| // Warning events might cause problems. |
| WARN = 40; |
| // Error events are likely to cause problems. |
| ERROR = 50; |
| // Critical events cause severe problems or brief outages and may |
| // indicate that a person must take action. |
| CRITICAL = 60; |
| } |
| |
| // (Required) The severity of the log statement. |
| Severity severity = 1; |
| |
| // (Required) The time at which this log statement occurred. |
| google.protobuf.Timestamp timestamp = 2; |
| |
| // (Required) A human readable message. |
| string message = 3; |
| |
| // (Optional) An optional trace of the functions involved. For example, in |
| // Java this can include multiple causes and multiple suppressed exceptions. |
| string trace = 4; |
| |
| // (Optional) A reference to the instruction this log statement is associated |
| // with. |
| string instruction_reference = 5; |
| |
| // (Optional) A reference to the primitive transform this log statement is |
| // associated with. |
| string primitive_transform_reference = 6; |
| |
| // (Optional) Human-readable name of the function or method being invoked, |
| // with optional context such as the class or package name. The format can |
| // vary by language. For example: |
| // qual.if.ied.Class.method (Java) |
| // dir/package.func (Go) |
| // module.function (Python) |
| // file.cc:382 (C++) |
| string log_location = 7; |
| |
| // (Optional) The name of the thread this log statement is associated with. |
| string thread = 8; |
| } |
| |
| message LogControl { |
| } |
| |
| // Stable |
| service BeamFnLogging { |
| // Allows for the SDK to emit log entries which the runner can |
| // associate with the active job. |
| rpc Logging( |
| // A stream of log entries batched into lists emitted by the SDK harness. |
| stream LogEntry.List |
| ) returns ( |
| // A stream of log control messages used to configure the SDK. |
| stream LogControl |
| ) {} |
| } |
| |
| /* |
| * Environment types |
| */ |
| message ApiServiceDescriptor { |
| // (Required) A pipeline level unique id which can be used as a reference to |
| // refer to this. |
| string id = 1; |
| |
| // (Required) The URL to connect to. |
| string url = 2; |
| |
| // (Optional) The method for authentication. If unspecified, access to the |
| // url is already being performed in a trusted context (e.g. localhost, |
| // private network). |
| oneof authentication { |
| OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3; |
| } |
| } |
| |
| message OAuth2ClientCredentialsGrant { |
| // (Required) The URL to submit a "client_credentials" grant type request for |
| // an OAuth access token which will be used as a bearer token for requests. |
| string url = 1; |
| } |
| |
| // A Docker container configuration for launching the SDK harness to execute |
| // user specified functions. |
| message DockerContainer { |
| // (Required) A pipeline level unique id which can be used as a reference to |
| // refer to this. |
| string id = 1; |
| |
| // (Required) The Docker container URI |
| // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1" |
| string uri = 2; |
| |
| // (Optional) Docker registry specification. |
| // If unspecified, the uri is expected to be able to be fetched without |
| // requiring additional configuration by a runner. |
| string registry_reference = 3; |
| } |
| |