blob: 07ee5904048c545efa6ef7bd12b4ef7c8b157ed7 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
* Protocol Buffers describing the Fn API and boostrapping.
* TODO: Usage of plural names in lists looks awkward in Java
* e.g. getOutputsMap, addCodersBuilder
* TODO: gRPC / proto field names conflict with generated code
* e.g. "class" in java, "output" in python
syntax = "proto3";
/* TODO: Consider consolidating common components in another package
* and language namespaces for re-use with Runner Api.
package org.apache.beam.model.fn_execution.v1;
option go_package = "fnexecution_v1";
option java_package = "org.apache.beam.model.fnexecution.v1";
option java_outer_classname = "BeamFnApi";
import "beam_runner_api.proto";
import "endpoints.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";
import "metrics.proto";
// A descriptor for connecting to a remote port using the Beam Fn Data API.
// Allows for communication between two environments (for example between the
// runner and the SDK).
// Stable
message RemoteGrpcPort {
// (Required) An API descriptor which describes where to
// connect to including any authentication that is required.
api_service_descriptor = 1;
// (Required) The ID of the Coder that will be used to encode and decode data
// sent over this port.
string coder_id = 2;
* Control Plane API
* Progress reporting and splitting still need further vetting. Also, this may
* change with the addition of new types of instructions/responses related to
* metrics.
// An API that describes the work that a SDK harness is meant to do.
// Stable
service BeamFnControl {
// Instructions sent by the runner to the SDK requesting different types
// of work.
rpc Control(
// A stream of responses to instructions the SDK was asked to be
// performed.
stream InstructionResponse)
returns (
// A stream of instructions requested of the SDK to be performed.
stream InstructionRequest) {}
// A request sent by a runner which the SDK is asked to fulfill.
// For any unsupported request type, an error should be returned with a
// matching instruction id.
// Stable
message InstructionRequest {
// (Required) An unique identifier provided by the runner which represents
// this requests execution. The InstructionResponse MUST have the matching id.
string instruction_id = 1;
// (Required) A request that the SDK Harness needs to interpret.
oneof request {
RegisterRequest register = 1000;
ProcessBundleRequest process_bundle = 1001;
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
FinalizeBundleRequest finalize_bundle = 1004;
// The response for an associated request the SDK had been asked to fulfill.
// Stable
message InstructionResponse {
// (Required) A reference provided by the runner which represents a requests
// execution. The InstructionResponse MUST have the matching id when
// responding to the runner.
string instruction_id = 1;
// If this is specified, then this instruction has failed.
// A human readable string representing the reason as to why processing has
// failed.
string error = 2;
// If the instruction did not fail, it is required to return an equivalent
// response type depending on the request this matches.
oneof response {
RegisterResponse register = 1000;
ProcessBundleResponse process_bundle = 1001;
ProcessBundleProgressResponse process_bundle_progress = 1002;
ProcessBundleSplitResponse process_bundle_split = 1003;
FinalizeBundleResponse finalize_bundle = 1004;
// A list of objects which can be referred to by the runner in
// future requests.
// Stable
message RegisterRequest {
// (Optional) The set of descriptors used to process bundles.
repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
// Stable
message RegisterResponse {}
// Definitions that should be used to construct the bundle processing graph.
message ProcessBundleDescriptor {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) A map from pipeline-scoped id to PTransform.
map<string, org.apache.beam.model.pipeline.v1.PTransform> transforms = 2;
// (Required) A map from pipeline-scoped id to PCollection.
map<string, org.apache.beam.model.pipeline.v1.PCollection> pcollections = 3;
// (Required) A map from pipeline-scoped id to WindowingStrategy.
map<string, org.apache.beam.model.pipeline.v1.WindowingStrategy>
windowing_strategies = 4;
// (Required) A map from pipeline-scoped id to Coder.
map<string, org.apache.beam.model.pipeline.v1.Coder> coders = 5;
// (Required) A map from pipeline-scoped id to Environment.
map<string, org.apache.beam.model.pipeline.v1.Environment> environments = 6;
// A descriptor describing the end point to use for State API
// calls. Required if the Runner intends to send remote references over the
// data plane or if any of the transforms rely on user state or side inputs.
state_api_service_descriptor = 7;
// One of the applications specifying the scope of work for a bundle.
// See
// for further details.
message BundleApplication {
// (Required) The transform to which to pass the element
string transform_id = 1;
// (Required) Name of the transform's input to which to pass the element.
string input_id = 2;
// (Required) The encoded element to pass to the transform.
bytes element = 3;
// The map is keyed by the local output name of the PTransform. Each
// value represents a lower bound on the timestamps of elements that
// are produced by this PTransform into each of its output PCollections
// when invoked with this application.
// If there is no watermark reported from RestrictionTracker, the runner will
// use MIN_TIMESTAMP by default.
map<string, google.protobuf.Timestamp> output_watermarks = 4;
// (Required) Whether this application potentially produces an unbounded
// amount of data. Note that this should only be set to BOUNDED if and
// only if the application is known to produce a finite amount of output.
org.apache.beam.model.pipeline.v1.IsBounded.Enum is_bounded = 5;
// Contains additional monitoring information related to this application.
// Each application is able to report information that some runners
// will use when providing a UI or for making scaling and performance
// decisions. See for
// details about what types of signals may be useful to report.
repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 6;
// An Application should be scheduled for execution after a delay.
// Either an absolute timestamp or a relative timestamp can represent a
// scheduled execution time.
message DelayedBundleApplication {
// Recommended time at which the application should be scheduled to execute
// by the runner. Times in the past may be scheduled to execute immediately.
// TODO(BEAM-8536): Migrate usage of absolute time to requested_time_delay.
google.protobuf.Timestamp requested_execution_time = 1;
// (Required) The application that should be scheduled.
BundleApplication application = 2;
// Recommended time delay at which the application should be scheduled to
// execute by the runner. Time delay that equals 0 may be scheduled to execute
// immediately. The unit of time delay should be microsecond.
google.protobuf.Duration requested_time_delay = 3;
// A request to process a given bundle.
// Stable
message ProcessBundleRequest {
// (Required) A reference to the process bundle descriptor that must be
// instantiated and executed by the SDK harness.
string process_bundle_descriptor_id = 1;
// A cache token which can be used by an SDK to check for the validity
// of cached elements which have a cache token associated.
message CacheToken {
// A flag to indicate a cache token is valid for user state.
message UserState {}
// A flag to indicate a cache token is valid for a side input.
message SideInput {
// The id of a side input.
string side_input = 1;
// The scope of a cache token.
oneof type {
UserState user_state = 1;
SideInput side_input = 2;
// The cache token identifier which should be globally unique.
bytes token = 10;
// (Optional) A list of cache tokens that can be used by an SDK to reuse
// cached data returned by the State API across multiple bundles.
repeated CacheToken cache_tokens = 2;
message ProcessBundleResponse {
// (Optional) If metrics reporting is supported by the SDK, this represents
// the final metrics to record for this bundle.
Metrics metrics = 1;
// (Optional) Specifies that the bundle has not been completed and the
// following applications need to be scheduled and executed in the future.
// A runner that does not yet support residual roots MUST still check that
// this is empty for correctness.
repeated DelayedBundleApplication residual_roots = 2;
// (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
// (Optional) Specifies that the runner must callback to this worker
// once the output of the bundle is committed. The Runner must send a
// FinalizeBundleRequest with the instruction id of the ProcessBundleRequest
// that is related to this ProcessBundleResponse.
bool requires_finalization = 4;
// A request to report progress information for a given bundle.
// This is an optional request to be handled and is used to support advanced
// SDK features such as SplittableDoFn, user level metrics etc.
message ProcessBundleProgressRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_id = 1;
message Metrics {
// PTransform level metrics.
// These metrics are split into processed and active element groups for
// progress reporting purposes. This allows a Runner to see what is measured,
// what is estimated and what can be extrapolated to be able to accurately
// estimate the amount of remaining work.
message PTransform {
// Metrics that are measured for processed and active element groups.
message Measured {
// (Optional) Map from local input name to number of elements processed
// from this input.
// If unset, assumed to be the sum of the outputs of all producers to
// this transform (for ProcessedElements) and 0 (for ActiveElements).
map<string, int64> input_element_counts = 1;
// (Required) Map from local output name to number of elements produced
// for this output.
map<string, int64> output_element_counts = 2;
// (Optional) The total time spent so far in processing the elements in
// this group, in seconds.
double total_time_spent = 3;
// TODO: Add other element group level metrics.
// Metrics for fully processed elements.
message ProcessedElements {
// (Required)
Measured measured = 1;
// Metrics for active elements.
// An element is considered active if the SDK has started but not finished
// processing it yet.
message ActiveElements {
// (Required)
Measured measured = 1;
// Estimated metrics.
// (Optional) Sum of estimated fraction of known work remaining for all
// active elements, as reported by this transform.
// If not reported, a Runner could extrapolate this from the processed
// elements.
// TODO: Handle the case when known work is infinite.
double fraction_remaining = 2;
// (Optional) Map from local output name to sum of estimated number
// of elements remaining for this output from all active elements,
// as reported by this transform.
// If not reported, a Runner could extrapolate this from the processed
// elements.
map<string, int64> output_elements_remaining = 3;
// (Required): Metrics for processed elements.
ProcessedElements processed_elements = 1;
// (Required): Metrics for active elements.
ActiveElements active_elements = 2;
// (Optional): Map from local output name to its watermark.
// The watermarks reported are tentative, to get a better sense of progress
// while processing a bundle but before it is committed. At bundle commit
// time, a Runner needs to also take into account the timers set to compute
// the actual watermarks.
map<string, int64> watermarks = 3;
repeated User user = 4;
// TODO: Define other transform level system metrics.
// User defined metrics
message User {
// A key for identifying a metric at the most granular level.
message MetricName {
// (Required): The namespace of this metric.
string namespace = 2;
// (Required): The name of this metric.
string name = 3;
// Data associated with a counter metric.
message CounterData {
int64 value = 1;
// Data associated with a distribution metric.
message DistributionData {
int64 count = 1;
int64 sum = 2;
int64 min = 3;
int64 max = 4;
// Data associated with a Gauge metric.
message GaugeData {
int64 value = 1;
google.protobuf.Timestamp timestamp = 2;
// (Required) The identifier for this metric.
MetricName metric_name = 1;
// (Required) The data for this metric.
oneof data {
CounterData counter_data = 1001;
DistributionData distribution_data = 1002;
GaugeData gauge_data = 1003;
map<string, PTransform> ptransforms = 1;
message ProcessBundleProgressResponse {
// DEPRECATED (Required)
Metrics metrics = 1;
// (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
// The list of currently active primary roots that are being
// executed. Required to be populated for PTransforms which can be split.
repeated BundleApplication primary_roots = 4;
// Represents a request to the SDK to split a currently active bundle.
message ProcessBundleSplitRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_id = 1;
// A message specifying the desired split for a single transform.
message DesiredSplit {
// (Required) The fraction of known work remaining in this bundle
// for this transform that should be kept by the SDK after this split.
// Set to 0 to "checkpoint" as soon as possible (keeping as little work as
// possible and returning the remainder).
float fraction_of_remainder = 1;
// A set of allowed element indices where the SDK may split. When this is
// empty, there are no constraints on where to split.
repeated int64 allowed_split_points = 3;
// (Required for GrpcRead operations) Number of total elements expected
// to be sent to this GrpcRead operation, required to correctly account
// for unreceived data when determining where to split.
int64 estimated_input_elements = 2;
// TODO(SDF): Allow providing weights rather than sizes.
// (Required) Specifies the desired split for each transform.
// Currently only splits at GRPC read operations are supported.
// This may, of course, limit the amount of work downstream operations
// receive.
map<string, DesiredSplit> desired_splits = 3;
// Represents a partition of the bundle: a "primary" and
// a "residual", with the following properties:
// - The work in primary and residual doesn't overlap, and combined, adds up
// to the work in the current bundle if the split hadn't happened.
// - The current bundle, if it keeps executing, will have done none of the
// work under residual_roots.
// - The current bundle, if no further splits happen, will have done exactly
// the work under primary_roots.
// For more rigorous definitions see
message ProcessBundleSplitResponse {
// Root applications that should replace the current bundle.
repeated BundleApplication primary_roots = 1;
// Root applications that have been removed from the current bundle and
// have to be executed in a separate bundle (e.g. in parallel on a different
// worker, or after the current bundle completes, etc.)
repeated DelayedBundleApplication residual_roots = 2;
// Represents contiguous portions of the data channel that are either
// entirely processed or entirely unprocessed and belong to the primary
// or residual respectively.
// This affords both a more efficient representation over the FnAPI
// (if the bundle is large) and often a more efficient representation
// on the runner side (e.g. if the set of elements can be represented
// as some range in an underlying dataset).
message ChannelSplit {
// (Required) The grpc read transform reading this channel.
string transform_id = 1;
// The last element of the input channel that should be entirely considered
// part of the primary, identified by its absolute index in the (ordered)
// channel.
int32 last_primary_element = 2;
// The first element of the input channel that should be entirely considered
// part of the residual, identified by its absolute index in the (ordered)
// channel.
int32 first_residual_element = 3;
// Partitions of input data channels into primary and residual elements,
// if any. Should not include any elements represented in the bundle
// applications roots above.
repeated ChannelSplit channel_splits = 3;
message FinalizeBundleRequest {
// (Required) A reference to a completed process bundle request with the given
// instruction id.
string instruction_id = 1;
message FinalizeBundleResponse {
// Empty
* Data Plane API
// Messages used to represent logical byte streams.
// Stable
message Elements {
// Represents multiple encoded elements in nested context for a given named
// instruction and transform.
message Data {
// (Required) A reference to an active instruction request with the given
// instruction id.
string instruction_id = 1;
// (Required) A definition representing a consumer or producer of this data.
// If received by a harness, this represents the consumer within that
// harness that should consume these bytes. If sent by a harness, this
// represents the producer of these bytes.
// Note that a single element may span multiple Data messages.
// Note that a sending/receiving pair should share the same identifier.
string transform_id = 2;
// (Optional) Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
// An empty data block represents the end of stream for the given
// instruction and transform.
bytes data = 3;
// (Required) A list containing parts of logical byte streams.
repeated Data data = 1;
// Stable
service BeamFnData {
// Used to send data between harnesses.
rpc Data(
// A stream of data representing input.
stream Elements)
returns (
// A stream of data representing output.
stream Elements) {}
* State API
message StateRequest {
// (Required) An unique identifier provided by the SDK which represents this
// requests execution. The StateResponse corresponding with this request
// will have the matching id.
string id = 1;
// (Required) The associated instruction id of the work that is currently
// being processed. This allows for the runner to associate any modifications
// to state to be committed with the appropriate work execution.
string instruction_id = 2;
// (Required) The state key this request is for.
StateKey state_key = 3;
// (Required) The action to take on this request.
oneof request {
// A request to get state.
StateGetRequest get = 1000;
// A request to append to state.
StateAppendRequest append = 1001;
// A request to clear state.
StateClearRequest clear = 1002;
message StateResponse {
// (Required) A reference provided by the SDK which represents a requests
// execution. The StateResponse must have the matching id when responding
// to the SDK.
string id = 1;
// (Optional) If this is specified, then the state request has failed.
// A human readable string representing the reason as to why the request
// failed.
string error = 2;
// A corresponding response matching the request will be populated.
oneof response {
// A response to getting state.
StateGetResponse get = 1000;
// A response to appending to state.
StateAppendResponse append = 1001;
// A response to clearing state.
StateClearResponse clear = 1002;
service BeamFnState {
// Used to get/append/clear state stored by the runner on behalf of the SDK.
rpc State(
// A stream of state instructions requested of the runner.
stream StateRequest)
returns (
// A stream of responses to state instructions the runner was asked to
// be performed.
stream StateResponse) {}
message StateKey {
message Runner {
// (Required) Opaque information supplied by the runner. Used to support
// remote references.
// Used by state backed iterable. And in this use case, request type can
// only be of type get. Details see:
bytes key = 1;
// Represents a request for the values associated with a specified user key
// and window in a PCollection. See
// for further
// details.
// Can only be used to perform StateGetRequests on side inputs of the URN
// beam:side_input:multimap:v1.
// For a PCollection<KV<K, V>>, the response data stream will be a
// concatenation of all V's associated with the specified key K. See
// for further
// details.
message MultimapSideInput {
// (Required) The id of the PTransform containing a side input.
string transform_id = 1;
// (Required) The id of the side input.
string side_input_id = 2;
// (Required) The window (after mapping the currently executing elements
// window into the side input windows domain) encoded in a nested context.
bytes window = 3;
// (Required) The key encoded in a nested context.
bytes key = 4;
message BagUserState {
// (Required) The id of the PTransform containing user state.
string transform_id = 1;
// (Required) The id of the user state.
string user_state_id = 2;
// (Required) The window encoded in a nested context.
bytes window = 3;
// (Required) The key of the currently executing element encoded in a
// nested context.
bytes key = 4;
// Represents a request for the values associated with a specified window
// in a PCollection. See
// for further
// details.
// Can only be used to perform StateGetRequests on side inputs of the URN
// beam:side_input:iterable:v1 and beam:side_input:multimap:v1.
// For a PCollection<V>, the response data stream will be a concatenation
// of all V's. See
// for further details.
message IterableSideInput {
// (Required) The id of the PTransform containing a side input.
string transform_id = 1;
// (Required) The id of the side input.
string side_input_id = 2;
// (Required) The window (after mapping the currently executing elements
// window into the side input windows domain) encoded in a nested context.
bytes window = 3;
// (Required) One of the following state keys must be set.
oneof type {
Runner runner = 1;
MultimapSideInput multimap_side_input = 2;
BagUserState bag_user_state = 3;
IterableSideInput iterable_side_input = 4;
// TODO: represent a state key for user map state
// A request to get state.
message StateGetRequest {
// (Optional) If specified, signals to the runner that the response
// should resume from the following continuation token.
// If unspecified, signals to the runner that the response should start
// from the beginning of the logical continuable stream.
bytes continuation_token = 1;
// A response to get state representing a logical byte stream which can be
// continued using the state API.
message StateGetResponse {
// (Optional) If specified, represents a token which can be used with the
// state API to get the next chunk of this logical byte stream. The end of
// the logical byte stream is signalled by this field being unset.
bytes continuation_token = 1;
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
bytes data = 2;
// A request to append state.
message StateAppendRequest {
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// multiple append requests are concatenated together.
bytes data = 1;
// A response to append state.
message StateAppendResponse {}
// A request to clear state.
message StateClearRequest {}
// A response to clear state.
message StateClearResponse {}
* Logging API
* This is very stable. There can be some changes to how we define a LogEntry,
* to increase/decrease the severity types, the way we format an exception/stack
* trace, or the log site.
// A log entry
message LogEntry {
// A list of log entries, enables buffering and batching of multiple
// log messages using the logging API.
message List {
// (Required) One or or more log messages.
repeated LogEntry log_entries = 1;
// The severity of the event described in a log entry, expressed as one of the
// severity levels listed below. For your reference, the levels are
// assigned the listed numeric values. The effect of using numeric values
// other than those listed is undefined.
// If you are writing log entries, you should map other severity encodings to
// one of these standard levels. For example, you might map all of
// Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
// This list is intentionally not comprehensive; the intent is to provide a
// common set of "good enough" severity levels so that logging front ends
// can provide filtering and searching across log types. Users of the API are
// free not to use all severity levels in their log messages.
message Severity {
enum Enum {
// Unspecified level information. Will be logged at the TRACE level.
TRACE = 1;
// Debugging information.
DEBUG = 2;
// Normal events.
INFO = 3;
// Normal but significant events, such as start up, shut down, or
// configuration.
// Warning events might cause problems.
WARN = 5;
// Error events are likely to cause problems.
ERROR = 6;
// Critical events cause severe problems or brief outages and may
// indicate that a person must take action.
// (Required) The severity of the log statement.
Severity.Enum severity = 1;
// (Required) The time at which this log statement occurred.
google.protobuf.Timestamp timestamp = 2;
// (Required) A human readable message.
string message = 3;
// (Optional) An optional trace of the functions involved. For example, in
// Java this can include multiple causes and multiple suppressed exceptions.
string trace = 4;
// (Optional) A reference to the instruction this log statement is associated
// with.
string instruction_id = 5;
// (Optional) A reference to the transform this log statement is
// associated with.
string transform_id = 6;
// (Optional) Human-readable name of the function or method being invoked,
// with optional context such as the class or package name. The format can
// vary by language. For example:
// qual.if.ied.Class.method (Java)
// dir/package.func (Go)
// module.function (Python)
// (C++)
string log_location = 7;
// (Optional) The name of the thread this log statement is associated with.
string thread = 8;
message LogControl {}
// Stable
service BeamFnLogging {
// Allows for the SDK to emit log entries which the runner can
// associate with the active job.
rpc Logging(
// A stream of log entries batched into lists emitted by the SDK harness.
stream LogEntry.List)
returns (
// A stream of log control messages used to configure the SDK.
stream LogControl) {}
message StartWorkerRequest {
string worker_id = 1;
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor control_endpoint = 2;
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor logging_endpoint = 3;
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor artifact_endpoint = 4;
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor provision_endpoint = 5;
map<string, string> params = 10;
message StartWorkerResponse {
string error = 1;
message StopWorkerRequest {
string worker_id = 1;
message StopWorkerResponse {
string error = 1;
service BeamFnExternalWorkerPool {
// Start the SDK worker with the given ID.
rpc StartWorker (StartWorkerRequest) returns (StartWorkerResponse) {}
// Stop the SDK worker.
rpc StopWorker (StopWorkerRequest) returns (StopWorkerResponse) {}