| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * Protocol Buffers describing the Fn API and boostrapping. |
| * |
| * TODO: Usage of plural names in lists looks awkward in Java |
| * e.g. getOutputsMap, addCodersBuilder |
| * |
| * TODO: gRPC / proto field names conflict with generated code |
| * e.g. "class" in java, "output" in python |
| */ |
| |
| syntax = "proto3"; |
| |
| /* TODO: Consider consolidating common components in another package |
| * and lanaguage namespaces for re-use with Runner Api. |
| */ |
| |
| package org.apache.beam.model.fn_execution.v1; |
| |
| option go_package = "fnexecution_v1"; |
| option java_package = "org.apache.beam.model.fnexecution.v1"; |
| option java_outer_classname = "BeamFnApi"; |
| |
| import "beam_runner_api.proto"; |
| import "endpoints.proto"; |
| import "google/protobuf/timestamp.proto"; |
| |
| /* |
| * Constructs that define the pipeline shape. |
| * |
| * These are mostly unstable due to the missing pieces to be shared with |
| * the Runner Api like windowing strategy, display data, .... There are still |
| * some modelling questions related to whether a side input is modelled |
| * as another field on a PrimitiveTransform or as part of inputs and we |
| * still are missing things like the CompositeTransform. |
| */ |
| |
| // A representation of an input or output definition on a primitive transform. |
| // Stable |
| message Target { |
| // A repeated list of target definitions. |
| message List { |
| repeated Target target = 1; |
| } |
| |
| // (Required) The id of the PrimitiveTransform which is the target. |
| string primitive_transform_reference = 1; |
| |
| // (Required) The local name of an input or output defined on the primitive |
| // transform. |
| string name = 2; |
| } |
| |
| // A descriptor for connecting to a remote port using the Beam Fn Data API. |
| // Allows for communication between two environments (for example between the |
| // runner and the SDK). |
| // Stable |
| message RemoteGrpcPort { |
| // (Required) An API descriptor which describes where to |
| // connect to including any authentication that is required. |
| org.apache.beam.model.pipeline.v1.ApiServiceDescriptor api_service_descriptor = 1; |
| } |
| |
| /* |
| * Control Plane API |
| * |
| * Progress reporting and splitting still need further vetting. Also, this may change |
| * with the addition of new types of instructions/responses related to metrics. |
| */ |
| |
| // An API that describes the work that a SDK harness is meant to do. |
| // Stable |
| service BeamFnControl { |
| // Instructions sent by the runner to the SDK requesting different types |
| // of work. |
| rpc Control( |
| // A stream of responses to instructions the SDK was asked to be performed. |
| stream InstructionResponse |
| ) returns ( |
| // A stream of instructions requested of the SDK to be performed. |
| stream InstructionRequest |
| ) {} |
| } |
| |
| // A request sent by a runner which the SDK is asked to fulfill. |
| // For any unsupported request type, an error should be returned with a |
| // matching instruction id. |
| // Stable |
| message InstructionRequest { |
| // (Required) An unique identifier provided by the runner which represents |
| // this requests execution. The InstructionResponse MUST have the matching id. |
| string instruction_id = 1; |
| |
| // (Required) A request that the SDK Harness needs to interpret. |
| oneof request { |
| RegisterRequest register = 1000; |
| ProcessBundleRequest process_bundle = 1001; |
| ProcessBundleProgressRequest process_bundle_progress = 1002; |
| ProcessBundleSplitRequest process_bundle_split = 1003; |
| } |
| } |
| |
| // The response for an associated request the SDK had been asked to fulfill. |
| // Stable |
| message InstructionResponse { |
| // (Required) A reference provided by the runner which represents a requests |
| // execution. The InstructionResponse MUST have the matching id when |
| // responding to the runner. |
| string instruction_id = 1; |
| |
| // If this is specified, then this instruction has failed. |
| // A human readable string representing the reason as to why processing has |
| // failed. |
| string error = 2; |
| |
| // If the instruction did not fail, it is required to return an equivalent |
| // response type depending on the request this matches. |
| oneof response { |
| RegisterResponse register = 1000; |
| ProcessBundleResponse process_bundle = 1001; |
| ProcessBundleProgressResponse process_bundle_progress = 1002; |
| ProcessBundleSplitResponse process_bundle_split = 1003; |
| } |
| } |
| |
| // A list of objects which can be referred to by the runner in |
| // future requests. |
| // Stable |
| message RegisterRequest { |
| // (Optional) The set of descriptors used to process bundles. |
| repeated ProcessBundleDescriptor process_bundle_descriptor = 1; |
| } |
| |
| // Stable |
| message RegisterResponse { |
| } |
| |
| // Definitions that should be used to construct the bundle processing graph. |
| message ProcessBundleDescriptor { |
| // (Required) A pipeline level unique id which can be used as a reference to |
| // refer to this. |
| string id = 1; |
| |
| // (Required) A map from pipeline-scoped id to PTransform. |
| map<string, org.apache.beam.model.pipeline.v1.PTransform> transforms = 2; |
| |
| // (Required) A map from pipeline-scoped id to PCollection. |
| map<string, org.apache.beam.model.pipeline.v1.PCollection> pcollections = 3; |
| |
| // (Required) A map from pipeline-scoped id to WindowingStrategy. |
| map<string, org.apache.beam.model.pipeline.v1.WindowingStrategy> windowing_strategies = 4; |
| |
| // (Required) A map from pipeline-scoped id to Coder. |
| map<string, org.apache.beam.model.pipeline.v1.Coder> coders = 5; |
| |
| // (Required) A map from pipeline-scoped id to Environment. |
| map<string, org.apache.beam.model.pipeline.v1.Environment> environments = 6; |
| |
| // A descriptor describing the end point to use for State API |
| // calls. Required if the Runner intends to send remote references over the |
| // data plane or if any of the transforms rely on user state or side inputs. |
| org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_api_service_descriptor = 7; |
| } |
| |
| // A request to process a given bundle. |
| // Stable |
| message ProcessBundleRequest { |
| // (Required) A reference to the process bundle descriptor that must be |
| // instantiated and executed by the SDK harness. |
| string process_bundle_descriptor_reference = 1; |
| |
| // (Optional) A list of cache tokens that can be used by an SDK to reuse |
| // cached data returned by the State API across multiple bundles. |
| repeated bytes cache_tokens = 2; |
| } |
| |
| // Stable |
| message ProcessBundleResponse { |
| // (Optional) If metrics reporting is supported by the SDK, this represents |
| // the final metrics to record for this bundle. |
| Metrics metrics = 1; |
| } |
| |
| // A request to report progress information for a given bundle. |
| // This is an optional request to be handled and is used to support advanced |
| // SDK features such as SplittableDoFn, user level metrics etc. |
| message ProcessBundleProgressRequest { |
| // (Required) A reference to an active process bundle request with the given |
| // instruction id. |
| string instruction_reference = 1; |
| } |
| |
| message Metrics { |
| // PTransform level metrics. |
| // These metrics are split into processed and active element groups for |
| // progress reporting purposes. This allows a Runner to see what is measured, |
| // what is estimated and what can be extrapolated to be able to accurately |
| // estimate the backlog of remaining work. |
| message PTransform { |
| // Metrics that are measured for processed and active element groups. |
| message Measured { |
| // (Optional) Map from local input name to number of elements processed |
| // from this input. |
| // If unset, assumed to be the sum of the outputs of all producers to |
| // this transform (for ProcessedElements) and 0 (for ActiveElements). |
| map<string, int64> input_element_counts = 1; |
| |
| // (Required) Map from local output name to number of elements produced |
| // for this output. |
| map<string, int64> output_element_counts = 2; |
| |
| // (Optional) The total time spent so far in processing the elements in |
| // this group, in seconds. |
| double total_time_spent = 3; |
| |
| // TODO: Add other element group level metrics. |
| } |
| |
| // Metrics for fully processed elements. |
| message ProcessedElements { |
| // (Required) |
| Measured measured = 1; |
| } |
| |
| // Metrics for active elements. |
| // An element is considered active if the SDK has started but not finished |
| // processing it yet. |
| message ActiveElements { |
| // (Required) |
| Measured measured = 1; |
| |
| // Estimated metrics. |
| |
| // (Optional) Sum of estimated fraction of known work remaining for all |
| // active elements, as reported by this transform. |
| // If not reported, a Runner could extrapolate this from the processed |
| // elements. |
| // TODO: Handle the case when known work is infinite. |
| double fraction_remaining = 2; |
| |
| // (Optional) Map from local output name to sum of estimated number |
| // of elements remaining for this output from all active elements, |
| // as reported by this transform. |
| // If not reported, a Runner could extrapolate this from the processed |
| // elements. |
| map<string, int64> output_elements_remaining = 3; |
| } |
| |
| // (Required): Metrics for processed elements. |
| ProcessedElements processed_elements = 1; |
| // (Required): Metrics for active elements. |
| ActiveElements active_elements = 2; |
| |
| // (Optional): Map from local output name to its watermark. |
| // The watermarks reported are tentative, to get a better sense of progress |
| // while processing a bundle but before it is committed. At bundle commit |
| // time, a Runner needs to also take into account the timers set to compute |
| // the actual watermarks. |
| map<string, int64> watermarks = 3; |
| |
| // TODO: Define other transform level system metrics. |
| } |
| |
| // User defined metrics |
| message User { |
| // TODO: Define it. |
| } |
| |
| map<string, PTransform> ptransforms = 1; |
| map<string, User> user = 2; |
| } |
| |
| message ProcessBundleProgressResponse { |
| // (Required) |
| Metrics metrics = 1; |
| } |
| |
| message ProcessBundleSplitRequest { |
| // (Required) A reference to an active process bundle request with the given |
| // instruction id. |
| string instruction_reference = 1; |
| |
| // (Required) The fraction of work (when compared to the known amount of work) |
| // the process bundle request should try to split at. |
| double fraction = 2; |
| } |
| |
| // urn:org.apache.beam:restriction:element-count:1.0 |
| message ElementCountRestriction { |
| // A restriction representing the number of elements that should be processed. |
| // Effectively the range [0, count] |
| int64 count = 1; |
| } |
| |
| // urn:org.apache.beam:restriction:element-count-skip:1.0 |
| message ElementCountSkipRestriction { |
| // A restriction representing the number of elements that should be skipped. |
| // Effectively the range (count, infinity] |
| int64 count = 1; |
| } |
| |
| // Each primitive transform that is splittable is defined by a restriction |
| // it is currently processing. During splitting, that currently active |
| // restriction (R_initial) is split into 2 components: |
| // * a restriction (R_done) representing all elements that will be fully |
| // processed |
| // * a restriction (R_todo) representing all elements that will not be fully |
| // processed |
| // |
| // where: |
| // R_initial = R_done ⋃ R_todo |
| message PrimitiveTransformSplit { |
| // (Required) A reference to a primitive transform with the given id that |
| // is part of the active process bundle request with the given instruction |
| // id. |
| string primitive_transform_reference = 1; |
| |
| // (Required) A function specification describing the restriction |
| // that has been completed by the primitive transform. |
| // |
| // For example, a remote GRPC source will have a specific urn and data |
| // block containing an ElementCountRestriction. |
| org.apache.beam.model.pipeline.v1.FunctionSpec completed_restriction = 2; |
| |
| // (Required) A function specification describing the restriction |
| // representing the remainder of work for the primitive transform. |
| // |
| // FOr example, a remote GRPC source will have a specific urn and data |
| // block contain an ElemntCountSkipRestriction. |
| org.apache.beam.model.pipeline.v1.FunctionSpec remaining_restriction = 3; |
| } |
| |
| message ProcessBundleSplitResponse { |
| // (Optional) A set of split responses for a currently active work item. |
| // |
| // If primitive transform B is a descendant of primitive transform A and both |
| // A and B report a split. Then B's restriction is reported as an element |
| // restriction pair and thus the fully reported restriction is: |
| // R = A_done |
| // ⋃ (A_boundary ⋂ B_done) |
| // ⋃ (A_boundary ⋂ B_todo) |
| // ⋃ A_todo |
| // If there is a decendant of B named C, then C would similarly report a |
| // set of element pair restrictions. |
| // |
| // This restriction is processed and completed by the currently active process |
| // bundle request: |
| // A_done ⋃ (A_boundary ⋂ B_done) |
| // and these restrictions will be processed by future process bundle requests: |
| // A_boundary â‹‚ B_todo (passed to SDF B directly) |
| // A_todo (passed to SDF A directly) |
| |
| // If primitive transform B and C are siblings and descendants of A and A, B, |
| // and C report a split. Then B and C's restrictions are relative to A's. |
| // R = A_done |
| // ⋃ (A_boundary ⋂ B_done) |
| // ⋃ (A_boundary ⋂ B_todo) |
| // ⋃ (A_boundary ⋂ B_todo) |
| // ⋃ (A_boundary ⋂ C_todo) |
| // ⋃ A_todo |
| // If there is no descendant of B or C also reporting a split, than |
| // B_boundary = ∅ and C_boundary = ∅ |
| // |
| // This restriction is processed and completed by the currently active process |
| // bundle request: |
| // A_done ⋃ (A_boundary ⋂ B_done) |
| // ⋃ (A_boundary ⋂ C_done) |
| // and these restrictions will be processed by future process bundle requests: |
| // A_boundary â‹‚ B_todo (passed to SDF B directly) |
| // A_boundary â‹‚ C_todo (passed to SDF C directly) |
| // A_todo (passed to SDF A directly) |
| // |
| // Note that descendants splits should only be reported if it is inexpensive |
| // to compute the boundary restriction intersected with descendants splits. |
| // Also note, that the boundary restriction may represent a set of elements |
| // produced by a parent primitive transform which can not be split at each |
| // element or that there are intermediate unsplittable primitive transforms |
| // between an ancestor splittable function and a descendant splittable |
| // function which may have more than one output per element. Finally note |
| // that the descendant splits should only be reported if the split |
| // information is relatively compact. |
| repeated PrimitiveTransformSplit splits = 1; |
| } |
| |
| /* |
| * Data Plane API |
| */ |
| |
| // Messages used to represent logical byte streams. |
| // Stable |
| message Elements { |
| // Represents multiple encoded elements in nested context for a given named |
| // instruction and target. |
| message Data { |
| // (Required) A reference to an active instruction request with the given |
| // instruction id. |
| string instruction_reference = 1; |
| |
| // (Required) A definition representing a consumer or producer of this data. |
| // If received by a harness, this represents the consumer within that |
| // harness that should consume these bytes. If sent by a harness, this |
| // represents the producer of these bytes. |
| // |
| // Note that a single element may span multiple Data messages. |
| // |
| // Note that a sending/receiving pair should share the same target |
| // identifier. |
| Target target = 2; |
| |
| // (Optional) Represents a part of a logical byte stream. Elements within |
| // the logical byte stream are encoded in the nested context and |
| // concatenated together. |
| // |
| // An empty data block represents the end of stream for the given |
| // instruction and target. |
| bytes data = 3; |
| } |
| |
| // (Required) A list containing parts of logical byte streams. |
| repeated Data data = 1; |
| } |
| |
| // Stable |
| service BeamFnData { |
| // Used to send data between harnesses. |
| rpc Data( |
| // A stream of data representing input. |
| stream Elements |
| ) returns ( |
| // A stream of data representing output. |
| stream Elements |
| ) {} |
| } |
| |
| /* |
| * State API |
| */ |
| |
| message StateRequest { |
| // (Required) An unique identifier provided by the SDK which represents this |
| // requests execution. The StateResponse corresponding with this request |
| // will have the matching id. |
| string id = 1; |
| |
| // (Required) The associated instruction id of the work that is currently |
| // being processed. This allows for the runner to associate any modifications |
| // to state to be committed with the appropriate work execution. |
| string instruction_reference = 2; |
| |
| // (Required) The state key this request is for. |
| StateKey state_key = 3; |
| |
| // (Required) The action to take on this request. |
| oneof request { |
| // A request to get state. |
| StateGetRequest get = 1000; |
| |
| // A request to append to state. |
| StateAppendRequest append = 1001; |
| |
| // A request to clear state. |
| StateClearRequest clear = 1002; |
| } |
| } |
| |
| message StateResponse { |
| // (Required) A reference provided by the SDK which represents a requests |
| // execution. The StateResponse must have the matching id when responding |
| // to the SDK. |
| string id = 1; |
| |
| // (Optional) If this is specified, then the state request has failed. |
| // A human readable string representing the reason as to why the request |
| // failed. |
| string error = 2; |
| |
| // (Optional) If this is specified, then the result of this state request |
| // can be cached using the supplied token. |
| bytes cache_token = 3; |
| |
| // A corresponding response matching the request will be populated. |
| oneof response { |
| // A response to getting state. |
| StateGetResponse get = 1000; |
| |
| // A response to appending to state. |
| StateAppendResponse append = 1001; |
| |
| // A response to clearing state. |
| StateClearResponse clear = 1002; |
| } |
| } |
| |
| service BeamFnState { |
| // Used to get/append/clear state stored by the runner on behalf of the SDK. |
| rpc State( |
| // A stream of state instructions requested of the runner. |
| stream StateRequest |
| ) returns ( |
| // A stream of responses to state instructions the runner was asked to be |
| // performed. |
| stream StateResponse |
| ) {} |
| } |
| |
| message StateKey { |
| message Runner { |
| // (Required) Opaque information supplied by the runner. Used to support |
| // remote references. |
| bytes key = 1; |
| } |
| |
| message MultimapSideInput { |
| // (Required) The id of the PTransform containing a side input. |
| string ptransform_id = 1; |
| // (Required) The id of the side input. |
| string side_input_id = 2; |
| // (Required) The window (after mapping the currently executing elements |
| // window into the side input windows domain) encoded in a nested context. |
| bytes window = 3; |
| // (Required) The key encoded in a nested context. |
| bytes key = 4; |
| } |
| |
| message BagUserState { |
| // (Required) The id of the PTransform containing user state. |
| string ptransform_id = 1; |
| // (Required) The id of the user state. |
| string user_state_id = 2; |
| // (Required) The window encoded in a nested context. |
| bytes window = 3; |
| // (Required) The key of the currently executing element encoded in a |
| // nested context. |
| bytes key = 4; |
| } |
| |
| // (Required) One of the following state keys must be set. |
| oneof type { |
| Runner runner = 1; |
| MultimapSideInput multimap_side_input = 2; |
| BagUserState bag_user_state = 3; |
| // TODO: represent a state key for user map state |
| } |
| } |
| |
| // A request to get state. |
| message StateGetRequest { |
| // (Optional) If specified, signals to the runner that the response |
| // should resume from the following continuation token. |
| // |
| // If unspecified, signals to the runner that the response should start |
| // from the beginning of the logical continuable stream. |
| bytes continuation_token = 1; |
| } |
| |
| // A response to get state representing a logical byte stream which can be |
| // continued using the state API. |
| message StateGetResponse { |
| // (Optional) If specified, represents a token which can be used with the |
| // state API to get the next chunk of this logical byte stream. The end of |
| // the logical byte stream is signalled by this field being unset. |
| bytes continuation_token = 1; |
| |
| // Represents a part of a logical byte stream. Elements within |
| // the logical byte stream are encoded in the nested context and |
| // concatenated together. |
| bytes data = 2; |
| } |
| |
| // A request to append state. |
| message StateAppendRequest { |
| // Represents a part of a logical byte stream. Elements within |
| // the logical byte stream are encoded in the nested context and |
| // multiple append requests are concatenated together. |
| bytes data = 1; |
| } |
| |
| // A response to append state. |
| message StateAppendResponse { |
| } |
| |
| // A request to clear state. |
| message StateClearRequest { |
| } |
| |
| // A response to clear state. |
| message StateClearResponse { |
| } |
| |
| /* |
| * Logging API |
| * |
| * This is very stable. There can be some changes to how we define a LogEntry, |
| * to increase/decrease the severity types, the way we format an exception/stack |
| * trace, or the log site. |
| */ |
| |
| // A log entry |
| message LogEntry { |
| // A list of log entries, enables buffering and batching of multiple |
| // log messages using the logging API. |
| message List { |
| // (Required) One or or more log messages. |
| repeated LogEntry log_entries = 1; |
| } |
| |
| // The severity of the event described in a log entry, expressed as one of the |
| // severity levels listed below. For your reference, the levels are |
| // assigned the listed numeric values. The effect of using numeric values |
| // other than those listed is undefined. |
| // |
| // If you are writing log entries, you should map other severity encodings to |
| // one of these standard levels. For example, you might map all of |
| // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`. |
| // |
| // This list is intentionally not comprehensive; the intent is to provide a |
| // common set of "good enough" severity levels so that logging front ends |
| // can provide filtering and searching across log types. Users of the API are |
| // free not to use all severity levels in their log messages. |
| message Severity { |
| enum Enum { |
| UNSPECIFIED = 0; |
| // Trace level information, also the default log level unless |
| // another severity is specified. |
| TRACE = 1; |
| // Debugging information. |
| DEBUG = 2; |
| // Normal events. |
| INFO = 3; |
| // Normal but significant events, such as start up, shut down, or |
| // configuration. |
| NOTICE = 4; |
| // Warning events might cause problems. |
| WARN = 5; |
| // Error events are likely to cause problems. |
| ERROR = 6; |
| // Critical events cause severe problems or brief outages and may |
| // indicate that a person must take action. |
| CRITICAL = 7; |
| } |
| } |
| |
| // (Required) The severity of the log statement. |
| Severity.Enum severity = 1; |
| |
| // (Required) The time at which this log statement occurred. |
| google.protobuf.Timestamp timestamp = 2; |
| |
| // (Required) A human readable message. |
| string message = 3; |
| |
| // (Optional) An optional trace of the functions involved. For example, in |
| // Java this can include multiple causes and multiple suppressed exceptions. |
| string trace = 4; |
| |
| // (Optional) A reference to the instruction this log statement is associated |
| // with. |
| string instruction_reference = 5; |
| |
| // (Optional) A reference to the primitive transform this log statement is |
| // associated with. |
| string primitive_transform_reference = 6; |
| |
| // (Optional) Human-readable name of the function or method being invoked, |
| // with optional context such as the class or package name. The format can |
| // vary by language. For example: |
| // qual.if.ied.Class.method (Java) |
| // dir/package.func (Go) |
| // module.function (Python) |
| // file.cc:382 (C++) |
| string log_location = 7; |
| |
| // (Optional) The name of the thread this log statement is associated with. |
| string thread = 8; |
| } |
| |
| message LogControl { |
| } |
| |
| // Stable |
| service BeamFnLogging { |
| // Allows for the SDK to emit log entries which the runner can |
| // associate with the active job. |
| rpc Logging( |
| // A stream of log entries batched into lists emitted by the SDK harness. |
| stream LogEntry.List |
| ) returns ( |
| // A stream of log control messages used to configure the SDK. |
| stream LogControl |
| ) {} |
| } |
| |
| /* |
| * Environment types |
| */ |
| // A Docker container configuration for launching the SDK harness to execute |
| // user specified functions. |
| message DockerContainer { |
| // (Required) A pipeline level unique id which can be used as a reference to |
| // refer to this. |
| string id = 1; |
| |
| // (Required) The Docker container URI |
| // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1" |
| string uri = 2; |
| |
| // (Optional) Docker registry specification. |
| // If unspecified, the uri is expected to be able to be fetched without |
| // requiring additional configuration by a runner. |
| string registry_reference = 3; |
| } |
| |