blob: 132d366f7071def9d4a12e160554104d92337413 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Protocol Buffers describing the Fn API and boostrapping.
*
* TODO: Usage of plural names in lists looks awkward in Java
* e.g. getOutputsMap, addCodersBuilder
*
* TODO: gRPC / proto field names conflict with generated code
* e.g. "class" in java, "output" in python
*/
syntax = "proto3";
/* TODO: Consider consolidating common components in another package
* and lanaguage namespaces for re-use with Runner Api.
*/
package org.apache.beam.model.fn_execution.v1;
option go_package = "fnexecution_v1";
option java_package = "org.apache.beam.model.fnexecution.v1";
option java_outer_classname = "BeamFnApi";
import "beam_runner_api.proto";
import "endpoints.proto";
import "google/protobuf/timestamp.proto";
/*
* Constructs that define the pipeline shape.
*
* These are mostly unstable due to the missing pieces to be shared with
* the Runner Api like windowing strategy, display data, .... There are still
* some modelling questions related to whether a side input is modelled
* as another field on a PrimitiveTransform or as part of inputs and we
* still are missing things like the CompositeTransform.
*/
// A representation of an input or output definition on a primitive transform.
// Stable
message Target {
// A repeated list of target definitions.
message List {
repeated Target target = 1;
}
// (Required) The id of the PrimitiveTransform which is the target.
string primitive_transform_reference = 1;
// (Required) The local name of an input or output defined on the primitive
// transform.
string name = 2;
}
// A descriptor for connecting to a remote port using the Beam Fn Data API.
// Allows for communication between two environments (for example between the
// runner and the SDK).
// Stable
message RemoteGrpcPort {
// (Required) An API descriptor which describes where to
// connect to including any authentication that is required.
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor api_service_descriptor = 1;
}
/*
* Control Plane API
*
* Progress reporting and splitting still need further vetting. Also, this may change
* with the addition of new types of instructions/responses related to metrics.
*/
// An API that describes the work that a SDK harness is meant to do.
// Stable
service BeamFnControl {
// Instructions sent by the runner to the SDK requesting different types
// of work.
rpc Control(
// A stream of responses to instructions the SDK was asked to be performed.
stream InstructionResponse
) returns (
// A stream of instructions requested of the SDK to be performed.
stream InstructionRequest
) {}
}
// A request sent by a runner which the SDK is asked to fulfill.
// For any unsupported request type, an error should be returned with a
// matching instruction id.
// Stable
message InstructionRequest {
// (Required) An unique identifier provided by the runner which represents
// this requests execution. The InstructionResponse MUST have the matching id.
string instruction_id = 1;
// (Required) A request that the SDK Harness needs to interpret.
oneof request {
RegisterRequest register = 1000;
ProcessBundleRequest process_bundle = 1001;
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
}
}
// The response for an associated request the SDK had been asked to fulfill.
// Stable
message InstructionResponse {
// (Required) A reference provided by the runner which represents a requests
// execution. The InstructionResponse MUST have the matching id when
// responding to the runner.
string instruction_id = 1;
// If this is specified, then this instruction has failed.
// A human readable string representing the reason as to why processing has
// failed.
string error = 2;
// If the instruction did not fail, it is required to return an equivalent
// response type depending on the request this matches.
oneof response {
RegisterResponse register = 1000;
ProcessBundleResponse process_bundle = 1001;
ProcessBundleProgressResponse process_bundle_progress = 1002;
ProcessBundleSplitResponse process_bundle_split = 1003;
}
}
// A list of objects which can be referred to by the runner in
// future requests.
// Stable
message RegisterRequest {
// (Optional) The set of descriptors used to process bundles.
repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
}
// Stable
message RegisterResponse {
}
// Definitions that should be used to construct the bundle processing graph.
message ProcessBundleDescriptor {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) A map from pipeline-scoped id to PTransform.
map<string, org.apache.beam.model.pipeline.v1.PTransform> transforms = 2;
// (Required) A map from pipeline-scoped id to PCollection.
map<string, org.apache.beam.model.pipeline.v1.PCollection> pcollections = 3;
// (Required) A map from pipeline-scoped id to WindowingStrategy.
map<string, org.apache.beam.model.pipeline.v1.WindowingStrategy> windowing_strategies = 4;
// (Required) A map from pipeline-scoped id to Coder.
map<string, org.apache.beam.model.pipeline.v1.Coder> coders = 5;
// (Required) A map from pipeline-scoped id to Environment.
map<string, org.apache.beam.model.pipeline.v1.Environment> environments = 6;
// A descriptor describing the end point to use for State API
// calls. Required if the Runner intends to send remote references over the
// data plane or if any of the transforms rely on user state or side inputs.
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_api_service_descriptor = 7;
}
// A request to process a given bundle.
// Stable
message ProcessBundleRequest {
// (Required) A reference to the process bundle descriptor that must be
// instantiated and executed by the SDK harness.
string process_bundle_descriptor_reference = 1;
// (Optional) A list of cache tokens that can be used by an SDK to reuse
// cached data returned by the State API across multiple bundles.
repeated bytes cache_tokens = 2;
}
// Stable
message ProcessBundleResponse {
// (Optional) If metrics reporting is supported by the SDK, this represents
// the final metrics to record for this bundle.
Metrics metrics = 1;
}
// A request to report progress information for a given bundle.
// This is an optional request to be handled and is used to support advanced
// SDK features such as SplittableDoFn, user level metrics etc.
message ProcessBundleProgressRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_reference = 1;
}
message Metrics {
// PTransform level metrics.
// These metrics are split into processed and active element groups for
// progress reporting purposes. This allows a Runner to see what is measured,
// what is estimated and what can be extrapolated to be able to accurately
// estimate the backlog of remaining work.
message PTransform {
// Metrics that are measured for processed and active element groups.
message Measured {
// (Optional) Map from local input name to number of elements processed
// from this input.
// If unset, assumed to be the sum of the outputs of all producers to
// this transform (for ProcessedElements) and 0 (for ActiveElements).
map<string, int64> input_element_counts = 1;
// (Required) Map from local output name to number of elements produced
// for this output.
map<string, int64> output_element_counts = 2;
// (Optional) The total time spent so far in processing the elements in
// this group, in seconds.
double total_time_spent = 3;
// TODO: Add other element group level metrics.
}
// Metrics for fully processed elements.
message ProcessedElements {
// (Required)
Measured measured = 1;
}
// Metrics for active elements.
// An element is considered active if the SDK has started but not finished
// processing it yet.
message ActiveElements {
// (Required)
Measured measured = 1;
// Estimated metrics.
// (Optional) Sum of estimated fraction of known work remaining for all
// active elements, as reported by this transform.
// If not reported, a Runner could extrapolate this from the processed
// elements.
// TODO: Handle the case when known work is infinite.
double fraction_remaining = 2;
// (Optional) Map from local output name to sum of estimated number
// of elements remaining for this output from all active elements,
// as reported by this transform.
// If not reported, a Runner could extrapolate this from the processed
// elements.
map<string, int64> output_elements_remaining = 3;
}
// (Required): Metrics for processed elements.
ProcessedElements processed_elements = 1;
// (Required): Metrics for active elements.
ActiveElements active_elements = 2;
// (Optional): Map from local output name to its watermark.
// The watermarks reported are tentative, to get a better sense of progress
// while processing a bundle but before it is committed. At bundle commit
// time, a Runner needs to also take into account the timers set to compute
// the actual watermarks.
map<string, int64> watermarks = 3;
// TODO: Define other transform level system metrics.
}
// User defined metrics
message User {
// TODO: Define it.
}
map<string, PTransform> ptransforms = 1;
map<string, User> user = 2;
}
message ProcessBundleProgressResponse {
// (Required)
Metrics metrics = 1;
}
message ProcessBundleSplitRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_reference = 1;
// (Required) The fraction of work (when compared to the known amount of work)
// the process bundle request should try to split at.
double fraction = 2;
}
// urn:org.apache.beam:restriction:element-count:1.0
message ElementCountRestriction {
// A restriction representing the number of elements that should be processed.
// Effectively the range [0, count]
int64 count = 1;
}
// urn:org.apache.beam:restriction:element-count-skip:1.0
message ElementCountSkipRestriction {
// A restriction representing the number of elements that should be skipped.
// Effectively the range (count, infinity]
int64 count = 1;
}
// Each primitive transform that is splittable is defined by a restriction
// it is currently processing. During splitting, that currently active
// restriction (R_initial) is split into 2 components:
// * a restriction (R_done) representing all elements that will be fully
// processed
// * a restriction (R_todo) representing all elements that will not be fully
// processed
//
// where:
// R_initial = R_done ⋃ R_todo
message PrimitiveTransformSplit {
// (Required) A reference to a primitive transform with the given id that
// is part of the active process bundle request with the given instruction
// id.
string primitive_transform_reference = 1;
// (Required) A function specification describing the restriction
// that has been completed by the primitive transform.
//
// For example, a remote GRPC source will have a specific urn and data
// block containing an ElementCountRestriction.
org.apache.beam.model.pipeline.v1.FunctionSpec completed_restriction = 2;
// (Required) A function specification describing the restriction
// representing the remainder of work for the primitive transform.
//
// FOr example, a remote GRPC source will have a specific urn and data
// block contain an ElemntCountSkipRestriction.
org.apache.beam.model.pipeline.v1.FunctionSpec remaining_restriction = 3;
}
message ProcessBundleSplitResponse {
// (Optional) A set of split responses for a currently active work item.
//
// If primitive transform B is a descendant of primitive transform A and both
// A and B report a split. Then B's restriction is reported as an element
// restriction pair and thus the fully reported restriction is:
// R = A_done
// ⋃ (A_boundary ⋂ B_done)
// ⋃ (A_boundary ⋂ B_todo)
// ⋃ A_todo
// If there is a decendant of B named C, then C would similarly report a
// set of element pair restrictions.
//
// This restriction is processed and completed by the currently active process
// bundle request:
// A_done ⋃ (A_boundary ⋂ B_done)
// and these restrictions will be processed by future process bundle requests:
// A_boundary â‹‚ B_todo (passed to SDF B directly)
// A_todo (passed to SDF A directly)
// If primitive transform B and C are siblings and descendants of A and A, B,
// and C report a split. Then B and C's restrictions are relative to A's.
// R = A_done
// ⋃ (A_boundary ⋂ B_done)
// ⋃ (A_boundary ⋂ B_todo)
// ⋃ (A_boundary ⋂ B_todo)
// ⋃ (A_boundary ⋂ C_todo)
// ⋃ A_todo
// If there is no descendant of B or C also reporting a split, than
// B_boundary = ∅ and C_boundary = ∅
//
// This restriction is processed and completed by the currently active process
// bundle request:
// A_done ⋃ (A_boundary ⋂ B_done)
// ⋃ (A_boundary ⋂ C_done)
// and these restrictions will be processed by future process bundle requests:
// A_boundary â‹‚ B_todo (passed to SDF B directly)
// A_boundary â‹‚ C_todo (passed to SDF C directly)
// A_todo (passed to SDF A directly)
//
// Note that descendants splits should only be reported if it is inexpensive
// to compute the boundary restriction intersected with descendants splits.
// Also note, that the boundary restriction may represent a set of elements
// produced by a parent primitive transform which can not be split at each
// element or that there are intermediate unsplittable primitive transforms
// between an ancestor splittable function and a descendant splittable
// function which may have more than one output per element. Finally note
// that the descendant splits should only be reported if the split
// information is relatively compact.
repeated PrimitiveTransformSplit splits = 1;
}
/*
* Data Plane API
*/
// Messages used to represent logical byte streams.
// Stable
message Elements {
// Represents multiple encoded elements in nested context for a given named
// instruction and target.
message Data {
// (Required) A reference to an active instruction request with the given
// instruction id.
string instruction_reference = 1;
// (Required) A definition representing a consumer or producer of this data.
// If received by a harness, this represents the consumer within that
// harness that should consume these bytes. If sent by a harness, this
// represents the producer of these bytes.
//
// Note that a single element may span multiple Data messages.
//
// Note that a sending/receiving pair should share the same target
// identifier.
Target target = 2;
// (Optional) Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
//
// An empty data block represents the end of stream for the given
// instruction and target.
bytes data = 3;
}
// (Required) A list containing parts of logical byte streams.
repeated Data data = 1;
}
// Stable
service BeamFnData {
// Used to send data between harnesses.
rpc Data(
// A stream of data representing input.
stream Elements
) returns (
// A stream of data representing output.
stream Elements
) {}
}
/*
* State API
*/
message StateRequest {
// (Required) An unique identifier provided by the SDK which represents this
// requests execution. The StateResponse corresponding with this request
// will have the matching id.
string id = 1;
// (Required) The associated instruction id of the work that is currently
// being processed. This allows for the runner to associate any modifications
// to state to be committed with the appropriate work execution.
string instruction_reference = 2;
// (Required) The state key this request is for.
StateKey state_key = 3;
// (Required) The action to take on this request.
oneof request {
// A request to get state.
StateGetRequest get = 1000;
// A request to append to state.
StateAppendRequest append = 1001;
// A request to clear state.
StateClearRequest clear = 1002;
}
}
message StateResponse {
// (Required) A reference provided by the SDK which represents a requests
// execution. The StateResponse must have the matching id when responding
// to the SDK.
string id = 1;
// (Optional) If this is specified, then the state request has failed.
// A human readable string representing the reason as to why the request
// failed.
string error = 2;
// (Optional) If this is specified, then the result of this state request
// can be cached using the supplied token.
bytes cache_token = 3;
// A corresponding response matching the request will be populated.
oneof response {
// A response to getting state.
StateGetResponse get = 1000;
// A response to appending to state.
StateAppendResponse append = 1001;
// A response to clearing state.
StateClearResponse clear = 1002;
}
}
service BeamFnState {
// Used to get/append/clear state stored by the runner on behalf of the SDK.
rpc State(
// A stream of state instructions requested of the runner.
stream StateRequest
) returns (
// A stream of responses to state instructions the runner was asked to be
// performed.
stream StateResponse
) {}
}
message StateKey {
message Runner {
// (Required) Opaque information supplied by the runner. Used to support
// remote references.
bytes key = 1;
}
message MultimapSideInput {
// (Required) The id of the PTransform containing a side input.
string ptransform_id = 1;
// (Required) The id of the side input.
string side_input_id = 2;
// (Required) The window (after mapping the currently executing elements
// window into the side input windows domain) encoded in a nested context.
bytes window = 3;
// (Required) The key encoded in a nested context.
bytes key = 4;
}
message BagUserState {
// (Required) The id of the PTransform containing user state.
string ptransform_id = 1;
// (Required) The id of the user state.
string user_state_id = 2;
// (Required) The window encoded in a nested context.
bytes window = 3;
// (Required) The key of the currently executing element encoded in a
// nested context.
bytes key = 4;
}
// (Required) One of the following state keys must be set.
oneof type {
Runner runner = 1;
MultimapSideInput multimap_side_input = 2;
BagUserState bag_user_state = 3;
// TODO: represent a state key for user map state
}
}
// A request to get state.
message StateGetRequest {
// (Optional) If specified, signals to the runner that the response
// should resume from the following continuation token.
//
// If unspecified, signals to the runner that the response should start
// from the beginning of the logical continuable stream.
bytes continuation_token = 1;
}
// A response to get state representing a logical byte stream which can be
// continued using the state API.
message StateGetResponse {
// (Optional) If specified, represents a token which can be used with the
// state API to get the next chunk of this logical byte stream. The end of
// the logical byte stream is signalled by this field being unset.
bytes continuation_token = 1;
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
bytes data = 2;
}
// A request to append state.
message StateAppendRequest {
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// multiple append requests are concatenated together.
bytes data = 1;
}
// A response to append state.
message StateAppendResponse {
}
// A request to clear state.
message StateClearRequest {
}
// A response to clear state.
message StateClearResponse {
}
/*
* Logging API
*
* This is very stable. There can be some changes to how we define a LogEntry,
* to increase/decrease the severity types, the way we format an exception/stack
* trace, or the log site.
*/
// A log entry
message LogEntry {
// A list of log entries, enables buffering and batching of multiple
// log messages using the logging API.
message List {
// (Required) One or or more log messages.
repeated LogEntry log_entries = 1;
}
// The severity of the event described in a log entry, expressed as one of the
// severity levels listed below. For your reference, the levels are
// assigned the listed numeric values. The effect of using numeric values
// other than those listed is undefined.
//
// If you are writing log entries, you should map other severity encodings to
// one of these standard levels. For example, you might map all of
// Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
//
// This list is intentionally not comprehensive; the intent is to provide a
// common set of "good enough" severity levels so that logging front ends
// can provide filtering and searching across log types. Users of the API are
// free not to use all severity levels in their log messages.
message Severity {
enum Enum {
UNSPECIFIED = 0;
// Trace level information, also the default log level unless
// another severity is specified.
TRACE = 1;
// Debugging information.
DEBUG = 2;
// Normal events.
INFO = 3;
// Normal but significant events, such as start up, shut down, or
// configuration.
NOTICE = 4;
// Warning events might cause problems.
WARN = 5;
// Error events are likely to cause problems.
ERROR = 6;
// Critical events cause severe problems or brief outages and may
// indicate that a person must take action.
CRITICAL = 7;
}
}
// (Required) The severity of the log statement.
Severity.Enum severity = 1;
// (Required) The time at which this log statement occurred.
google.protobuf.Timestamp timestamp = 2;
// (Required) A human readable message.
string message = 3;
// (Optional) An optional trace of the functions involved. For example, in
// Java this can include multiple causes and multiple suppressed exceptions.
string trace = 4;
// (Optional) A reference to the instruction this log statement is associated
// with.
string instruction_reference = 5;
// (Optional) A reference to the primitive transform this log statement is
// associated with.
string primitive_transform_reference = 6;
// (Optional) Human-readable name of the function or method being invoked,
// with optional context such as the class or package name. The format can
// vary by language. For example:
// qual.if.ied.Class.method (Java)
// dir/package.func (Go)
// module.function (Python)
// file.cc:382 (C++)
string log_location = 7;
// (Optional) The name of the thread this log statement is associated with.
string thread = 8;
}
message LogControl {
}
// Stable
service BeamFnLogging {
// Allows for the SDK to emit log entries which the runner can
// associate with the active job.
rpc Logging(
// A stream of log entries batched into lists emitted by the SDK harness.
stream LogEntry.List
) returns (
// A stream of log control messages used to configure the SDK.
stream LogControl
) {}
}
/*
* Environment types
*/
// A Docker container configuration for launching the SDK harness to execute
// user specified functions.
message DockerContainer {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) The Docker container URI
// For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
string uri = 2;
// (Optional) Docker registry specification.
// If unspecified, the uri is expected to be able to be fetched without
// requiring additional configuration by a runner.
string registry_reference = 3;
}